diff options
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 42 |
1 files changed, 37 insertions, 5 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 464cbb5..f20dd2a 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -18,7 +18,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { private final Set<MessageID> positiveAcks; // Positively acknowledged messages. private final Set<MessageID> negativeAcks; // Negatively acknowledged messages. private final Set<Message<T>> received; - private final Set<MessageID> retransmissions; // Messages pending retransmission. + private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final AtomicReference<Instant> lastSend; private final SocketAddress group; private final MulticastSocket inSock, outSock; @@ -30,7 +30,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.received = new ConcurrentHashMap<Message<T>, Void>().keySet(); - this.retransmissions = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.retransmissions = new LinkedBlockingQueue<Message<T>>(); this.lastSend = new AtomicReference<Instant>(Instant.now()); this.group = group; @@ -44,9 +44,11 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.laddr = laddr; this.delivered = new LinkedBlockingQueue<Message<T>>(); - (new Thread(new Receiver())).start(); this.log = Logger.getLogger(this.getClass().getName()); + + (new Thread(new Receive())).start(); + (new Thread(new Retransmit())).start(); } public void send(T payload) throws IOException { @@ -56,19 +58,49 @@ public class ReliableMulticast<T extends Serializable & Hashable> { positiveAcks.toArray(new MessageID[0]), negativeAcks.toArray(new MessageID[0])); DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + synchronized (outSock) { + outSock.send(pkt); + } positiveAcks.clear(); (new Thread(new Timeout(msg.id()))).start(); lastSend.set(Instant.now()); } - private class Receiver implements Runnable { + private class Receive implements Runnable { @Override public void run() { // TODO } } + /** Retransmit dropped messages. */ + private class Retransmit implements Runnable { + private final Logger log; + + private Retransmit() { + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + for (;;) { + Message<T> msg = retransmissions.take(); + try { + DatagramPacket pkt = Packet.encode(msg, group); + synchronized (outSock) { + outSock.send(pkt); + } + } catch (Exception e) { + log.warning(e.getMessage()); + } + } + } catch (InterruptedException e) { + log.info("Retransmit thread interrupted: "+e.getMessage()); + } + } + } + private class Timeout implements Runnable { MessageID mid; |