From 6bd335c7382c0da8486e94d253dffa28525ceec7 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Mon, 18 Nov 2024 10:35:22 -0500 Subject: reliable multicast: avoid receiving duplicate messages --- src/main/java/derms/net/rmulticast/Message.java | 8 +++++++- src/main/java/derms/net/rmulticast/Receive.java | 4 ++-- src/main/java/derms/net/rmulticast/ReceivedSet.java | 18 ++++++++++++++++-- 3 files changed, 25 insertions(+), 5 deletions(-) (limited to 'src/main/java/derms/net/rmulticast') diff --git a/src/main/java/derms/net/rmulticast/Message.java b/src/main/java/derms/net/rmulticast/Message.java index 329b62c..b4749a3 100644 --- a/src/main/java/derms/net/rmulticast/Message.java +++ b/src/main/java/derms/net/rmulticast/Message.java @@ -4,7 +4,12 @@ import java.io.Serializable; import java.net.InetAddress; class Message implements Serializable { + /* Used as a salt in the hash function. Allows different messages to have + * different IDs, even if they have the same payload. */ + private static int seq = 1; + T payload; + int salt; // Used in hash function. Allows messages to have unique IDs, even if they have the same payload. InetAddress sender; MessageID[] acks; // IDs of messages that this message positively acknowledges. MessageID[] nacks; // IDs of messages that this message negatively acknowledges. @@ -15,6 +20,7 @@ class Message implements Serializable { */ Message(T payload, InetAddress sender, MessageID[] acks, MessageID[] nacks) { this.payload = payload; + this.salt = seq++; this.sender = sender; this.acks = acks; this.nacks = nacks; @@ -26,7 +32,7 @@ class Message implements Serializable { @Override public int hashCode() { - return payload.hash(); + return payload.hash() * salt; } @Override diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index bf583dd..679df27 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -95,8 +95,8 @@ class Receive implements Runnable { private void deliver(Message msg) { acks.add(msg.id()); nacks.remove(msg.id()); - received.add(msg); retransmissions.remove(msg); - delivered.add(msg); + if (received.add(msg)) + delivered.add(msg); // First time seeing this message; deliver it. } } diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java index 831ecbe..5fd8ce5 100644 --- a/src/main/java/derms/net/rmulticast/ReceivedSet.java +++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java @@ -14,8 +14,18 @@ class ReceivedSet { this.received = new ConcurrentLinkedQueue>(); } - void add(Message e) { - received.add(e); + /** + * Add a message to the set if it is not already present. + * + * @param msg The message to add to the set. + * @return True if the set did not already contain the specified message. + */ + // TODO: faster insertion. + boolean add(Message msg) { + if (contains(msg)) + return false; + received.add(msg); + return true; } // TODO: faster search. @@ -35,6 +45,10 @@ class ReceivedSet { } } + boolean contains(Message msg) { + return contains(msg.id()); + } + /** Remove the specified message from the set, if it is present. */ void remove(Message msg) { received.remove(msg); -- cgit v1.2.3