diff options
Diffstat (limited to 'src/main/java/derms')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Prune.java | 126 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Receive.java | 4 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReceivedSet.java | 57 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 13 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Timeout.java | 11 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Wait.java | 15 |
6 files changed, 202 insertions, 24 deletions
diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java new file mode 100644 index 0000000..d70f2e6 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Prune.java @@ -0,0 +1,126 @@ +package derms.net.rmulticast; + +import java.io.Serializable; +import java.net.InetAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.logging.Logger; + +/** Free memory from the received list. */ +class Prune<T extends Serializable & Hashable> implements Runnable { + private static final Duration period = Duration.ofMinutes(1); + + private final ReceivedSet<T> received; + private final Set<InetAddress> groupMembers; + private final Logger log; + + Prune(ReceivedSet<T> received, Set<InetAddress> groupMembers) { + this.received = received; + this.groupMembers = groupMembers; + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + for (; ; ) { + Wait.forDuration(period); + prune(); + } + } catch (InterruptedException e) { + log.info("Prune thread interrupted: " + e.getMessage()); + } + } + + private void prune() { + // Get the candidate for removal. + Message<T> a = received.peekOldest(); + if (a == null) + return; + + // Ensure all group members have received and acknowledge message a. + for (InetAddress member : groupMembers) { + try { + Message<T> c = received.mostRecentSentBy(member); + if (!receivedByMemberAtTimeOfSending(a, member, c)) + return; // Member has not received a -- cannot prune it. + } catch (NoSuchElementException e) { + log.warning("No message received from " + member); + return; + } + } + + // All group members have received and acknowledged message a. It is safe to delete. + received.remove(a); + } + + /** + * Return true if member has received and ack'ed message a at the time they sent message c. + * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990). + * + * @param a A message that may or may not have been received by member. + * @param member The process that sent c. + * @param c A message that was sent by member. + */ + private boolean receivedByMemberAtTimeOfSending(Message<T> a, InetAddress member, Message<T> c) { + List<Message<T>> seq = new ArrayList<Message<T>>(); + seq.add(c); + return OPDseq(seq, a); + } + + /** + * Try to extend the OPD sequence to contain a. + * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990). + * + * @param seq A sequence of messages each of which acknowledges its predecessor by the OPD. + * @return True if the sequence can extend to a. + */ + private boolean OPDseq(List<Message<T>> seq, Message<T> a) { + if (seq.contains(a)) + return true; + + // c -> ... -> b -> ... ?-> a + Message<T> c = seq.get(0); + Message<T> b = seq.get(seq.size() - 1); + + // All messages sent by the sender of b (that are not already in the sequence) can potentially by added to the sequence. + List<Message<T>> potentialPredecessors = received.allSentBy(b.sender); + potentialPredecessors.removeAll(seq); + + // Add messages that b positively acknowledged. + for (MessageID mid : b.positiveAcks) { + try { + Message<T> msg = received.getByID(mid); + potentialPredecessors.add(msg); + } catch (NoSuchElementException e) { + log.warning("message " + mid + ", acknowledged by " +b.id() + ", is not in the received list. Continuing anyway."); + } + } + + // Remove messages that c negatively acknowledged. + for (MessageID mid : c.negativeAcks) { + for (int i = 0; i < potentialPredecessors.size(); i++) { + Message<T> msg = potentialPredecessors.get(i); + if (msg.id().equals(mid)) { + potentialPredecessors.remove(i); + break; + } + } + } + + // Try and extend the sequence. + for (Message<T> predecessor : potentialPredecessors) { + seq.add(predecessor); + if (OPDseq(seq, a)) + return true; + // Failed; remove the predecessor and try another. + seq.remove(seq.size() - 1); + } + + // The sequence cannot be extended to include a. + return false; + } +} diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 996e0db..93a56a8 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -23,11 +23,11 @@ class Receive<T extends Serializable & Hashable> implements Runnable { private final BlockingQueue<Message<T>> delivered; private final Logger log; - Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) { + Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) { this.inSock = inSock; this.positiveAcks = positiveAcks; this.negativeAcks = negativeAcks; - this.received = new ReceivedSet<T>(); + this.received = received; this.retransmissions = retransmissions; this.groupMembers = groupMembers; this.delivered = delivered; diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java index 339a714..963b321 100644 --- a/src/main/java/derms/net/rmulticast/ReceivedSet.java +++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java @@ -1,14 +1,18 @@ package derms.net.rmulticast; import java.io.Serializable; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; class ReceivedSet<T extends Serializable & Hashable> { - private final Set<Message<T>> received; + private final Queue<Message<T>> received; ReceivedSet() { - this.received = new ConcurrentHashMap<Message<T>, Void>().keySet(); + this.received = new ConcurrentLinkedQueue<Message<T>>(); } void add(Message<T> e) { @@ -16,10 +20,49 @@ class ReceivedSet<T extends Serializable & Hashable> { } // TODO: faster search. - boolean contains(MessageID mid) { + Message<T> getByID(MessageID mid) throws NoSuchElementException { for (Message<T> msg : received) if (msg.id().equals(mid)) - return true; - return false; + return msg; + throw new NoSuchElementException("message " + mid + " not in received list."); + } + + boolean contains(MessageID mid) { + try { + Message<T> msg = getByID(mid); + return true; + } catch (NoSuchElementException e) { + return false; + } + } + + /** Remove the specified message from the set, if it is present. */ + void remove(Message<T> msg) { + received.remove(msg); + } + + /** Retrieves, but does not remove, the oldest message, or returns null if the set is empty. */ + Message<T> peekOldest() { + return received.peek(); + } + + Message<T> mostRecentSentBy(InetAddress member) throws NoSuchElementException { + Message<T> recent = null; + for (Message<T> msg : received) { + if (msg.sender.equals(member)) + recent = msg; + } + if (recent == null) + throw new NoSuchElementException("no message from " + member + " in received list."); + return recent; + } + + List<Message<T>> allSentBy(InetAddress sender) { + List<Message<T>> sent = new ArrayList<Message<T>>(); + for (Message<T> msg : received) { + if (msg.sender.equals(sender)) + sent.add(msg); + } + return sent; } } diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index de07f39..0798e68 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -20,10 +20,11 @@ public class ReliableMulticast<T extends Serializable & Hashable> { private final InetAddress laddr; // Local address. private final Set<MessageID> positiveAcks; // Positively acknowledged messages. private final Set<MessageID> negativeAcks; // Negatively acknowledged messages. + private final ReceivedSet<T> received; private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. - private final AtomicReference<Instant> lastSend; - private final ConcurrentMulticastSocket outSock; private final Set<InetAddress> groupMembers; + private final ConcurrentMulticastSocket outSock; + private final AtomicReference<Instant> lastSend; private final BlockingQueue<Message<T>> delivered; private final Logger log; @@ -33,23 +34,25 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.received = new ReceivedSet<T>(); this.retransmissions = new LinkedBlockingQueue<Message<T>>(); + this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet(); this.lastSend = new AtomicReference<Instant>(Instant.now()); this.outSock = new ConcurrentMulticastSocket(group.getPort()); this.outSock.joinGroup(group.getAddress()); - this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet(); - this.delivered = new LinkedBlockingQueue<Message<T>>(); this.log = Logger.getLogger(this.getClass().getName()); ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket(); inSock.joinGroup(group.getAddress()); - (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, retransmissions, groupMembers, delivered))).start(); + (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions, groupMembers, delivered))).start(); (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start(); + + (new Thread(new Prune<T>(received, groupMembers))).start(); } public void send(T payload) throws IOException { diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java index 2f2d0ed..2ccccd1 100644 --- a/src/main/java/derms/net/rmulticast/Timeout.java +++ b/src/main/java/derms/net/rmulticast/Timeout.java @@ -27,7 +27,7 @@ class Timeout<T extends Serializable & Hashable> implements Runnable { public void run() { try { for (;;) { - waitUntilTimeout(); + Wait.forDuration(timeout); if (positiveAcks.contains(msg.id())) { log.info("Message " + msg.id() + "positively ack'ed."); return; @@ -40,13 +40,4 @@ class Timeout<T extends Serializable & Hashable> implements Runnable { log.info("Timeout thread interrupted: " + e.getMessage()); } } - - private void waitUntilTimeout() { - Instant start = Instant.now(); - Duration elapsed; - do { - Thread.yield(); - elapsed = Duration.between(start, Instant.now()); - } while (elapsed.compareTo(timeout) < 0); - } } diff --git a/src/main/java/derms/net/rmulticast/Wait.java b/src/main/java/derms/net/rmulticast/Wait.java new file mode 100644 index 0000000..f1180ed --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Wait.java @@ -0,0 +1,15 @@ +package derms.net.rmulticast; + +import java.time.Duration; +import java.time.Instant; + +class Wait { + static void forDuration(Duration dur) throws InterruptedException { + Instant start = Instant.now(); + Duration elapsed; + do { + Thread.yield(); + elapsed = Duration.between(start, Instant.now()); + } while (elapsed.compareTo(dur) < 0); + } +} |