diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-14 11:49:19 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-14 11:49:19 -0500 |
| commit | 359d0017a920b1e7dab0103aa3479fd6e5a2b5ab (patch) | |
| tree | ad8d7eed5bbd8b8856e46901586bc1308e720b02 /src/main/java/derms/net | |
| parent | c6a6f1d4eb96d6d4854ee66fcf6156b638d80248 (diff) | |
| download | soen423-359d0017a920b1e7dab0103aa3479fd6e5a2b5ab.zip | |
ReliableMulticast: retransmit dropped messages
Diffstat (limited to 'src/main/java/derms/net')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 42 |
1 files changed, 37 insertions, 5 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 464cbb5..f20dd2a 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -18,7 +18,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { private final Set<MessageID> positiveAcks; // Positively acknowledged messages. private final Set<MessageID> negativeAcks; // Negatively acknowledged messages. private final Set<Message<T>> received; - private final Set<MessageID> retransmissions; // Messages pending retransmission. + private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final AtomicReference<Instant> lastSend; private final SocketAddress group; private final MulticastSocket inSock, outSock; @@ -30,7 +30,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.received = new ConcurrentHashMap<Message<T>, Void>().keySet(); - this.retransmissions = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.retransmissions = new LinkedBlockingQueue<Message<T>>(); this.lastSend = new AtomicReference<Instant>(Instant.now()); this.group = group; @@ -44,9 +44,11 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.laddr = laddr; this.delivered = new LinkedBlockingQueue<Message<T>>(); - (new Thread(new Receiver())).start(); this.log = Logger.getLogger(this.getClass().getName()); + + (new Thread(new Receive())).start(); + (new Thread(new Retransmit())).start(); } public void send(T payload) throws IOException { @@ -56,19 +58,49 @@ public class ReliableMulticast<T extends Serializable & Hashable> { positiveAcks.toArray(new MessageID[0]), negativeAcks.toArray(new MessageID[0])); DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + synchronized (outSock) { + outSock.send(pkt); + } positiveAcks.clear(); (new Thread(new Timeout(msg.id()))).start(); lastSend.set(Instant.now()); } - private class Receiver implements Runnable { + private class Receive implements Runnable { @Override public void run() { // TODO } } + /** Retransmit dropped messages. */ + private class Retransmit implements Runnable { + private final Logger log; + + private Retransmit() { + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + for (;;) { + Message<T> msg = retransmissions.take(); + try { + DatagramPacket pkt = Packet.encode(msg, group); + synchronized (outSock) { + outSock.send(pkt); + } + } catch (Exception e) { + log.warning(e.getMessage()); + } + } + } catch (InterruptedException e) { + log.info("Retransmit thread interrupted: "+e.getMessage()); + } + } + } + private class Timeout implements Runnable { MessageID mid; |