summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/Prune.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Prune.java')
-rw-r--r--src/main/java/derms/net/rmulticast/Prune.java126
1 files changed, 126 insertions, 0 deletions
diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java
new file mode 100644
index 0000000..d70f2e6
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Prune.java
@@ -0,0 +1,126 @@
+package derms.net.rmulticast;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/** Free memory from the received list. */
+class Prune<T extends Serializable & Hashable> implements Runnable {
+ private static final Duration period = Duration.ofMinutes(1);
+
+ private final ReceivedSet<T> received;
+ private final Set<InetAddress> groupMembers;
+ private final Logger log;
+
+ Prune(ReceivedSet<T> received, Set<InetAddress> groupMembers) {
+ this.received = received;
+ this.groupMembers = groupMembers;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (; ; ) {
+ Wait.forDuration(period);
+ prune();
+ }
+ } catch (InterruptedException e) {
+ log.info("Prune thread interrupted: " + e.getMessage());
+ }
+ }
+
+ private void prune() {
+ // Get the candidate for removal.
+ Message<T> a = received.peekOldest();
+ if (a == null)
+ return;
+
+ // Ensure all group members have received and acknowledge message a.
+ for (InetAddress member : groupMembers) {
+ try {
+ Message<T> c = received.mostRecentSentBy(member);
+ if (!receivedByMemberAtTimeOfSending(a, member, c))
+ return; // Member has not received a -- cannot prune it.
+ } catch (NoSuchElementException e) {
+ log.warning("No message received from " + member);
+ return;
+ }
+ }
+
+ // All group members have received and acknowledged message a. It is safe to delete.
+ received.remove(a);
+ }
+
+ /**
+ * Return true if member has received and ack'ed message a at the time they sent message c.
+ * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
+ *
+ * @param a A message that may or may not have been received by member.
+ * @param member The process that sent c.
+ * @param c A message that was sent by member.
+ */
+ private boolean receivedByMemberAtTimeOfSending(Message<T> a, InetAddress member, Message<T> c) {
+ List<Message<T>> seq = new ArrayList<Message<T>>();
+ seq.add(c);
+ return OPDseq(seq, a);
+ }
+
+ /**
+ * Try to extend the OPD sequence to contain a.
+ * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
+ *
+ * @param seq A sequence of messages each of which acknowledges its predecessor by the OPD.
+ * @return True if the sequence can extend to a.
+ */
+ private boolean OPDseq(List<Message<T>> seq, Message<T> a) {
+ if (seq.contains(a))
+ return true;
+
+ // c -> ... -> b -> ... ?-> a
+ Message<T> c = seq.get(0);
+ Message<T> b = seq.get(seq.size() - 1);
+
+ // All messages sent by the sender of b (that are not already in the sequence) can potentially by added to the sequence.
+ List<Message<T>> potentialPredecessors = received.allSentBy(b.sender);
+ potentialPredecessors.removeAll(seq);
+
+ // Add messages that b positively acknowledged.
+ for (MessageID mid : b.positiveAcks) {
+ try {
+ Message<T> msg = received.getByID(mid);
+ potentialPredecessors.add(msg);
+ } catch (NoSuchElementException e) {
+ log.warning("message " + mid + ", acknowledged by " +b.id() + ", is not in the received list. Continuing anyway.");
+ }
+ }
+
+ // Remove messages that c negatively acknowledged.
+ for (MessageID mid : c.negativeAcks) {
+ for (int i = 0; i < potentialPredecessors.size(); i++) {
+ Message<T> msg = potentialPredecessors.get(i);
+ if (msg.id().equals(mid)) {
+ potentialPredecessors.remove(i);
+ break;
+ }
+ }
+ }
+
+ // Try and extend the sequence.
+ for (Message<T> predecessor : potentialPredecessors) {
+ seq.add(predecessor);
+ if (OPDseq(seq, a))
+ return true;
+ // Failed; remove the predecessor and try another.
+ seq.remove(seq.size() - 1);
+ }
+
+ // The sequence cannot be extended to include a.
+ return false;
+ }
+}