summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/ReliableMulticast.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-15 13:23:06 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-15 13:23:06 -0500
commitf02bf05c7b639f5edb86388c96dbb209a4622be9 (patch)
treefd0bef88337129bc8bf188860172c79729df2b3a /src/main/java/derms/net/rmulticast/ReliableMulticast.java
parentb767b34d97fdacd891702b041cc5eb1fdcb0b0a3 (diff)
downloadsoen423-f02bf05c7b639f5edb86388c96dbb209a4622be9.zip
reliable multicast: prune received list
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java13
1 files changed, 8 insertions, 5 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index de07f39..0798e68 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -20,10 +20,11 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
private final InetAddress laddr; // Local address.
private final Set<MessageID> positiveAcks; // Positively acknowledged messages.
private final Set<MessageID> negativeAcks; // Negatively acknowledged messages.
+ private final ReceivedSet<T> received;
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
- private final AtomicReference<Instant> lastSend;
- private final ConcurrentMulticastSocket outSock;
private final Set<InetAddress> groupMembers;
+ private final ConcurrentMulticastSocket outSock;
+ private final AtomicReference<Instant> lastSend;
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
@@ -33,23 +34,25 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.received = new ReceivedSet<T>();
this.retransmissions = new LinkedBlockingQueue<Message<T>>();
+ this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet();
this.lastSend = new AtomicReference<Instant>(Instant.now());
this.outSock = new ConcurrentMulticastSocket(group.getPort());
this.outSock.joinGroup(group.getAddress());
- this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet();
-
this.delivered = new LinkedBlockingQueue<Message<T>>();
this.log = Logger.getLogger(this.getClass().getName());
ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket();
inSock.joinGroup(group.getAddress());
- (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, retransmissions, groupMembers, delivered))).start();
+ (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions, groupMembers, delivered))).start();
(new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
+
+ (new Thread(new Prune<T>(received, groupMembers))).start();
}
public void send(T payload) throws IOException {