From 359d0017a920b1e7dab0103aa3479fd6e5a2b5ab Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Thu, 14 Nov 2024 11:49:19 -0500 Subject: ReliableMulticast: retransmit dropped messages --- .../derms/net/rmulticast/ReliableMulticast.java | 42 +++++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 464cbb5..f20dd2a 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -18,7 +18,7 @@ public class ReliableMulticast { private final Set positiveAcks; // Positively acknowledged messages. private final Set negativeAcks; // Negatively acknowledged messages. private final Set> received; - private final Set retransmissions; // Messages pending retransmission. + private final BlockingQueue> retransmissions; // Messages pending retransmission. private final AtomicReference lastSend; private final SocketAddress group; private final MulticastSocket inSock, outSock; @@ -30,7 +30,7 @@ public class ReliableMulticast { this.positiveAcks = new ConcurrentHashMap().keySet(); this.negativeAcks = new ConcurrentHashMap().keySet(); this.received = new ConcurrentHashMap, Void>().keySet(); - this.retransmissions = new ConcurrentHashMap().keySet(); + this.retransmissions = new LinkedBlockingQueue>(); this.lastSend = new AtomicReference(Instant.now()); this.group = group; @@ -44,9 +44,11 @@ public class ReliableMulticast { this.laddr = laddr; this.delivered = new LinkedBlockingQueue>(); - (new Thread(new Receiver())).start(); this.log = Logger.getLogger(this.getClass().getName()); + + (new Thread(new Receive())).start(); + (new Thread(new Retransmit())).start(); } public void send(T payload) throws IOException { @@ -56,19 +58,49 @@ public class ReliableMulticast { positiveAcks.toArray(new MessageID[0]), negativeAcks.toArray(new MessageID[0])); DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + synchronized (outSock) { + outSock.send(pkt); + } positiveAcks.clear(); (new Thread(new Timeout(msg.id()))).start(); lastSend.set(Instant.now()); } - private class Receiver implements Runnable { + private class Receive implements Runnable { @Override public void run() { // TODO } } + /** Retransmit dropped messages. */ + private class Retransmit implements Runnable { + private final Logger log; + + private Retransmit() { + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + for (;;) { + Message msg = retransmissions.take(); + try { + DatagramPacket pkt = Packet.encode(msg, group); + synchronized (outSock) { + outSock.send(pkt); + } + } catch (Exception e) { + log.warning(e.getMessage()); + } + } + } catch (InterruptedException e) { + log.info("Retransmit thread interrupted: "+e.getMessage()); + } + } + } + private class Timeout implements Runnable { MessageID mid; -- cgit v1.2.3