summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-18 10:35:22 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-18 10:35:22 -0500
commit6bd335c7382c0da8486e94d253dffa28525ceec7 (patch)
tree0324179a04d97664bd7f10faf8850d10d3b0555f /src
parente62c171662c1fd14bf1aa0887ccb73e219f8f189 (diff)
downloadsoen423-6bd335c7382c0da8486e94d253dffa28525ceec7.zip
reliable multicast: avoid receiving duplicate messages
Diffstat (limited to 'src')
-rw-r--r--src/main/java/derms/net/rmulticast/Message.java8
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java4
-rw-r--r--src/main/java/derms/net/rmulticast/ReceivedSet.java18
3 files changed, 25 insertions, 5 deletions
diff --git a/src/main/java/derms/net/rmulticast/Message.java b/src/main/java/derms/net/rmulticast/Message.java
index 329b62c..b4749a3 100644
--- a/src/main/java/derms/net/rmulticast/Message.java
+++ b/src/main/java/derms/net/rmulticast/Message.java
@@ -4,7 +4,12 @@ import java.io.Serializable;
import java.net.InetAddress;
class Message<T extends MessagePayload> implements Serializable {
+ /* Used as a salt in the hash function. Allows different messages to have
+ * different IDs, even if they have the same payload. */
+ private static int seq = 1;
+
T payload;
+ int salt; // Used in hash function. Allows messages to have unique IDs, even if they have the same payload.
InetAddress sender;
MessageID[] acks; // IDs of messages that this message positively acknowledges.
MessageID[] nacks; // IDs of messages that this message negatively acknowledges.
@@ -15,6 +20,7 @@ class Message<T extends MessagePayload> implements Serializable {
*/
Message(T payload, InetAddress sender, MessageID[] acks, MessageID[] nacks) {
this.payload = payload;
+ this.salt = seq++;
this.sender = sender;
this.acks = acks;
this.nacks = nacks;
@@ -26,7 +32,7 @@ class Message<T extends MessagePayload> implements Serializable {
@Override
public int hashCode() {
- return payload.hash();
+ return payload.hash() * salt;
}
@Override
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index bf583dd..679df27 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -95,8 +95,8 @@ class Receive<T extends MessagePayload> implements Runnable {
private void deliver(Message<T> msg) {
acks.add(msg.id());
nacks.remove(msg.id());
- received.add(msg);
retransmissions.remove(msg);
- delivered.add(msg);
+ if (received.add(msg))
+ delivered.add(msg); // First time seeing this message; deliver it.
}
}
diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java
index 831ecbe..5fd8ce5 100644
--- a/src/main/java/derms/net/rmulticast/ReceivedSet.java
+++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java
@@ -14,8 +14,18 @@ class ReceivedSet<T extends MessagePayload> {
this.received = new ConcurrentLinkedQueue<Message<T>>();
}
- void add(Message<T> e) {
- received.add(e);
+ /**
+ * Add a message to the set if it is not already present.
+ *
+ * @param msg The message to add to the set.
+ * @return True if the set did not already contain the specified message.
+ */
+ // TODO: faster insertion.
+ boolean add(Message<T> msg) {
+ if (contains(msg))
+ return false;
+ received.add(msg);
+ return true;
}
// TODO: faster search.
@@ -35,6 +45,10 @@ class ReceivedSet<T extends MessagePayload> {
}
}
+ boolean contains(Message<T> msg) {
+ return contains(msg.id());
+ }
+
/** Remove the specified message from the set, if it is present. */
void remove(Message<T> msg) {
received.remove(msg);