summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-15 13:23:06 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-15 13:23:06 -0500
commitf02bf05c7b639f5edb86388c96dbb209a4622be9 (patch)
treefd0bef88337129bc8bf188860172c79729df2b3a
parentb767b34d97fdacd891702b041cc5eb1fdcb0b0a3 (diff)
downloadsoen423-f02bf05c7b639f5edb86388c96dbb209a4622be9.zip
reliable multicast: prune received list
-rw-r--r--src/main/java/derms/net/rmulticast/Prune.java126
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java4
-rw-r--r--src/main/java/derms/net/rmulticast/ReceivedSet.java57
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java13
-rw-r--r--src/main/java/derms/net/rmulticast/Timeout.java11
-rw-r--r--src/main/java/derms/net/rmulticast/Wait.java15
6 files changed, 202 insertions, 24 deletions
diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java
new file mode 100644
index 0000000..d70f2e6
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Prune.java
@@ -0,0 +1,126 @@
+package derms.net.rmulticast;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/** Free memory from the received list. */
+class Prune<T extends Serializable & Hashable> implements Runnable {
+ private static final Duration period = Duration.ofMinutes(1);
+
+ private final ReceivedSet<T> received;
+ private final Set<InetAddress> groupMembers;
+ private final Logger log;
+
+ Prune(ReceivedSet<T> received, Set<InetAddress> groupMembers) {
+ this.received = received;
+ this.groupMembers = groupMembers;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (; ; ) {
+ Wait.forDuration(period);
+ prune();
+ }
+ } catch (InterruptedException e) {
+ log.info("Prune thread interrupted: " + e.getMessage());
+ }
+ }
+
+ private void prune() {
+ // Get the candidate for removal.
+ Message<T> a = received.peekOldest();
+ if (a == null)
+ return;
+
+ // Ensure all group members have received and acknowledge message a.
+ for (InetAddress member : groupMembers) {
+ try {
+ Message<T> c = received.mostRecentSentBy(member);
+ if (!receivedByMemberAtTimeOfSending(a, member, c))
+ return; // Member has not received a -- cannot prune it.
+ } catch (NoSuchElementException e) {
+ log.warning("No message received from " + member);
+ return;
+ }
+ }
+
+ // All group members have received and acknowledged message a. It is safe to delete.
+ received.remove(a);
+ }
+
+ /**
+ * Return true if member has received and ack'ed message a at the time they sent message c.
+ * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
+ *
+ * @param a A message that may or may not have been received by member.
+ * @param member The process that sent c.
+ * @param c A message that was sent by member.
+ */
+ private boolean receivedByMemberAtTimeOfSending(Message<T> a, InetAddress member, Message<T> c) {
+ List<Message<T>> seq = new ArrayList<Message<T>>();
+ seq.add(c);
+ return OPDseq(seq, a);
+ }
+
+ /**
+ * Try to extend the OPD sequence to contain a.
+ * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
+ *
+ * @param seq A sequence of messages each of which acknowledges its predecessor by the OPD.
+ * @return True if the sequence can extend to a.
+ */
+ private boolean OPDseq(List<Message<T>> seq, Message<T> a) {
+ if (seq.contains(a))
+ return true;
+
+ // c -> ... -> b -> ... ?-> a
+ Message<T> c = seq.get(0);
+ Message<T> b = seq.get(seq.size() - 1);
+
+ // All messages sent by the sender of b (that are not already in the sequence) can potentially by added to the sequence.
+ List<Message<T>> potentialPredecessors = received.allSentBy(b.sender);
+ potentialPredecessors.removeAll(seq);
+
+ // Add messages that b positively acknowledged.
+ for (MessageID mid : b.positiveAcks) {
+ try {
+ Message<T> msg = received.getByID(mid);
+ potentialPredecessors.add(msg);
+ } catch (NoSuchElementException e) {
+ log.warning("message " + mid + ", acknowledged by " +b.id() + ", is not in the received list. Continuing anyway.");
+ }
+ }
+
+ // Remove messages that c negatively acknowledged.
+ for (MessageID mid : c.negativeAcks) {
+ for (int i = 0; i < potentialPredecessors.size(); i++) {
+ Message<T> msg = potentialPredecessors.get(i);
+ if (msg.id().equals(mid)) {
+ potentialPredecessors.remove(i);
+ break;
+ }
+ }
+ }
+
+ // Try and extend the sequence.
+ for (Message<T> predecessor : potentialPredecessors) {
+ seq.add(predecessor);
+ if (OPDseq(seq, a))
+ return true;
+ // Failed; remove the predecessor and try another.
+ seq.remove(seq.size() - 1);
+ }
+
+ // The sequence cannot be extended to include a.
+ return false;
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index 996e0db..93a56a8 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -23,11 +23,11 @@ class Receive<T extends Serializable & Hashable> implements Runnable {
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
- Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) {
+ Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) {
this.inSock = inSock;
this.positiveAcks = positiveAcks;
this.negativeAcks = negativeAcks;
- this.received = new ReceivedSet<T>();
+ this.received = received;
this.retransmissions = retransmissions;
this.groupMembers = groupMembers;
this.delivered = delivered;
diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java
index 339a714..963b321 100644
--- a/src/main/java/derms/net/rmulticast/ReceivedSet.java
+++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java
@@ -1,14 +1,18 @@
package derms.net.rmulticast;
import java.io.Serializable;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
class ReceivedSet<T extends Serializable & Hashable> {
- private final Set<Message<T>> received;
+ private final Queue<Message<T>> received;
ReceivedSet() {
- this.received = new ConcurrentHashMap<Message<T>, Void>().keySet();
+ this.received = new ConcurrentLinkedQueue<Message<T>>();
}
void add(Message<T> e) {
@@ -16,10 +20,49 @@ class ReceivedSet<T extends Serializable & Hashable> {
}
// TODO: faster search.
- boolean contains(MessageID mid) {
+ Message<T> getByID(MessageID mid) throws NoSuchElementException {
for (Message<T> msg : received)
if (msg.id().equals(mid))
- return true;
- return false;
+ return msg;
+ throw new NoSuchElementException("message " + mid + " not in received list.");
+ }
+
+ boolean contains(MessageID mid) {
+ try {
+ Message<T> msg = getByID(mid);
+ return true;
+ } catch (NoSuchElementException e) {
+ return false;
+ }
+ }
+
+ /** Remove the specified message from the set, if it is present. */
+ void remove(Message<T> msg) {
+ received.remove(msg);
+ }
+
+ /** Retrieves, but does not remove, the oldest message, or returns null if the set is empty. */
+ Message<T> peekOldest() {
+ return received.peek();
+ }
+
+ Message<T> mostRecentSentBy(InetAddress member) throws NoSuchElementException {
+ Message<T> recent = null;
+ for (Message<T> msg : received) {
+ if (msg.sender.equals(member))
+ recent = msg;
+ }
+ if (recent == null)
+ throw new NoSuchElementException("no message from " + member + " in received list.");
+ return recent;
+ }
+
+ List<Message<T>> allSentBy(InetAddress sender) {
+ List<Message<T>> sent = new ArrayList<Message<T>>();
+ for (Message<T> msg : received) {
+ if (msg.sender.equals(sender))
+ sent.add(msg);
+ }
+ return sent;
}
}
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index de07f39..0798e68 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -20,10 +20,11 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
private final InetAddress laddr; // Local address.
private final Set<MessageID> positiveAcks; // Positively acknowledged messages.
private final Set<MessageID> negativeAcks; // Negatively acknowledged messages.
+ private final ReceivedSet<T> received;
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
- private final AtomicReference<Instant> lastSend;
- private final ConcurrentMulticastSocket outSock;
private final Set<InetAddress> groupMembers;
+ private final ConcurrentMulticastSocket outSock;
+ private final AtomicReference<Instant> lastSend;
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
@@ -33,23 +34,25 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.received = new ReceivedSet<T>();
this.retransmissions = new LinkedBlockingQueue<Message<T>>();
+ this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet();
this.lastSend = new AtomicReference<Instant>(Instant.now());
this.outSock = new ConcurrentMulticastSocket(group.getPort());
this.outSock.joinGroup(group.getAddress());
- this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet();
-
this.delivered = new LinkedBlockingQueue<Message<T>>();
this.log = Logger.getLogger(this.getClass().getName());
ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket();
inSock.joinGroup(group.getAddress());
- (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, retransmissions, groupMembers, delivered))).start();
+ (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions, groupMembers, delivered))).start();
(new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
+
+ (new Thread(new Prune<T>(received, groupMembers))).start();
}
public void send(T payload) throws IOException {
diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java
index 2f2d0ed..2ccccd1 100644
--- a/src/main/java/derms/net/rmulticast/Timeout.java
+++ b/src/main/java/derms/net/rmulticast/Timeout.java
@@ -27,7 +27,7 @@ class Timeout<T extends Serializable & Hashable> implements Runnable {
public void run() {
try {
for (;;) {
- waitUntilTimeout();
+ Wait.forDuration(timeout);
if (positiveAcks.contains(msg.id())) {
log.info("Message " + msg.id() + "positively ack'ed.");
return;
@@ -40,13 +40,4 @@ class Timeout<T extends Serializable & Hashable> implements Runnable {
log.info("Timeout thread interrupted: " + e.getMessage());
}
}
-
- private void waitUntilTimeout() {
- Instant start = Instant.now();
- Duration elapsed;
- do {
- Thread.yield();
- elapsed = Duration.between(start, Instant.now());
- } while (elapsed.compareTo(timeout) < 0);
- }
}
diff --git a/src/main/java/derms/net/rmulticast/Wait.java b/src/main/java/derms/net/rmulticast/Wait.java
new file mode 100644
index 0000000..f1180ed
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Wait.java
@@ -0,0 +1,15 @@
+package derms.net.rmulticast;
+
+import java.time.Duration;
+import java.time.Instant;
+
+class Wait {
+ static void forDuration(Duration dur) throws InterruptedException {
+ Instant start = Instant.now();
+ Duration elapsed;
+ do {
+ Thread.yield();
+ elapsed = Duration.between(start, Instant.now());
+ } while (elapsed.compareTo(dur) < 0);
+ }
+}