From f02bf05c7b639f5edb86388c96dbb209a4622be9 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Fri, 15 Nov 2024 13:23:06 -0500 Subject: reliable multicast: prune received list --- src/main/java/derms/net/rmulticast/Prune.java | 126 ++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 src/main/java/derms/net/rmulticast/Prune.java (limited to 'src/main/java/derms/net/rmulticast/Prune.java') diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java new file mode 100644 index 0000000..d70f2e6 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Prune.java @@ -0,0 +1,126 @@ +package derms.net.rmulticast; + +import java.io.Serializable; +import java.net.InetAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.logging.Logger; + +/** Free memory from the received list. */ +class Prune implements Runnable { + private static final Duration period = Duration.ofMinutes(1); + + private final ReceivedSet received; + private final Set groupMembers; + private final Logger log; + + Prune(ReceivedSet received, Set groupMembers) { + this.received = received; + this.groupMembers = groupMembers; + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + for (; ; ) { + Wait.forDuration(period); + prune(); + } + } catch (InterruptedException e) { + log.info("Prune thread interrupted: " + e.getMessage()); + } + } + + private void prune() { + // Get the candidate for removal. + Message a = received.peekOldest(); + if (a == null) + return; + + // Ensure all group members have received and acknowledge message a. + for (InetAddress member : groupMembers) { + try { + Message c = received.mostRecentSentBy(member); + if (!receivedByMemberAtTimeOfSending(a, member, c)) + return; // Member has not received a -- cannot prune it. + } catch (NoSuchElementException e) { + log.warning("No message received from " + member); + return; + } + } + + // All group members have received and acknowledged message a. It is safe to delete. + received.remove(a); + } + + /** + * Return true if member has received and ack'ed message a at the time they sent message c. + * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990). + * + * @param a A message that may or may not have been received by member. + * @param member The process that sent c. + * @param c A message that was sent by member. + */ + private boolean receivedByMemberAtTimeOfSending(Message a, InetAddress member, Message c) { + List> seq = new ArrayList>(); + seq.add(c); + return OPDseq(seq, a); + } + + /** + * Try to extend the OPD sequence to contain a. + * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990). + * + * @param seq A sequence of messages each of which acknowledges its predecessor by the OPD. + * @return True if the sequence can extend to a. + */ + private boolean OPDseq(List> seq, Message a) { + if (seq.contains(a)) + return true; + + // c -> ... -> b -> ... ?-> a + Message c = seq.get(0); + Message b = seq.get(seq.size() - 1); + + // All messages sent by the sender of b (that are not already in the sequence) can potentially by added to the sequence. + List> potentialPredecessors = received.allSentBy(b.sender); + potentialPredecessors.removeAll(seq); + + // Add messages that b positively acknowledged. + for (MessageID mid : b.positiveAcks) { + try { + Message msg = received.getByID(mid); + potentialPredecessors.add(msg); + } catch (NoSuchElementException e) { + log.warning("message " + mid + ", acknowledged by " +b.id() + ", is not in the received list. Continuing anyway."); + } + } + + // Remove messages that c negatively acknowledged. + for (MessageID mid : c.negativeAcks) { + for (int i = 0; i < potentialPredecessors.size(); i++) { + Message msg = potentialPredecessors.get(i); + if (msg.id().equals(mid)) { + potentialPredecessors.remove(i); + break; + } + } + } + + // Try and extend the sequence. + for (Message predecessor : potentialPredecessors) { + seq.add(predecessor); + if (OPDseq(seq, a)) + return true; + // Failed; remove the predecessor and try another. + seq.remove(seq.size() - 1); + } + + // The sequence cannot be extended to include a. + return false; + } +} -- cgit v1.2.3