diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-15 11:39:46 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-15 11:39:46 -0500 |
| commit | 3ebcee7b335d0e23915f3ec8e15c9995cd8d3004 (patch) | |
| tree | 21de4fb61fc7adbf1cea7c8f4705f08132787d38 /src/main/java/derms/net/rmulticast/ReliableMulticast.java | |
| parent | ed304eca556ff4780cbcc8b2615ebc6d945d46ae (diff) | |
| download | soen423-3ebcee7b335d0e23915f3ec8e15c9995cd8d3004.zip | |
reliable multicast: Receive
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 14 |
1 files changed, 3 insertions, 11 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 85d4f31..44fc10c 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -18,7 +18,7 @@ import java.util.logging.Logger; public class ReliableMulticast<T extends Serializable & Hashable> { private final Set<MessageID> positiveAcks; // Positively acknowledged messages. private final Set<MessageID> negativeAcks; // Negatively acknowledged messages. - private final Set<Message<T>> received; + private final ReceivedSet<T> received; private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final AtomicReference<Instant> lastSend; private final SocketAddress group; @@ -30,7 +30,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); - this.received = new ConcurrentHashMap<Message<T>, Void>().keySet(); + this.received = new ReceivedSet<T>(); this.retransmissions = new LinkedBlockingQueue<Message<T>>(); this.lastSend = new AtomicReference<Instant>(Instant.now()); @@ -48,7 +48,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.log = Logger.getLogger(this.getClass().getName()); - (new Thread(new Receive())).start(); + (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions))).start(); (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start(); } @@ -64,12 +64,4 @@ public class ReliableMulticast<T extends Serializable & Hashable> { (new Thread(new Timeout<T>(msg, positiveAcks, retransmissions))).start(); lastSend.set(Instant.now()); } - - private class Receive implements Runnable { - @Override - public void run() { - // TODO - } - } - } |