summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms')
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java42
1 files changed, 37 insertions, 5 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index 464cbb5..f20dd2a 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -18,7 +18,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
private final Set<MessageID> positiveAcks; // Positively acknowledged messages.
private final Set<MessageID> negativeAcks; // Negatively acknowledged messages.
private final Set<Message<T>> received;
- private final Set<MessageID> retransmissions; // Messages pending retransmission.
+ private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
private final AtomicReference<Instant> lastSend;
private final SocketAddress group;
private final MulticastSocket inSock, outSock;
@@ -30,7 +30,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
this.received = new ConcurrentHashMap<Message<T>, Void>().keySet();
- this.retransmissions = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.retransmissions = new LinkedBlockingQueue<Message<T>>();
this.lastSend = new AtomicReference<Instant>(Instant.now());
this.group = group;
@@ -44,9 +44,11 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.laddr = laddr;
this.delivered = new LinkedBlockingQueue<Message<T>>();
- (new Thread(new Receiver())).start();
this.log = Logger.getLogger(this.getClass().getName());
+
+ (new Thread(new Receive())).start();
+ (new Thread(new Retransmit())).start();
}
public void send(T payload) throws IOException {
@@ -56,19 +58,49 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
positiveAcks.toArray(new MessageID[0]),
negativeAcks.toArray(new MessageID[0]));
DatagramPacket pkt = Packet.encode(msg, group);
- outSock.send(pkt);
+ synchronized (outSock) {
+ outSock.send(pkt);
+ }
positiveAcks.clear();
(new Thread(new Timeout(msg.id()))).start();
lastSend.set(Instant.now());
}
- private class Receiver implements Runnable {
+ private class Receive implements Runnable {
@Override
public void run() {
// TODO
}
}
+ /** Retransmit dropped messages. */
+ private class Retransmit implements Runnable {
+ private final Logger log;
+
+ private Retransmit() {
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ Message<T> msg = retransmissions.take();
+ try {
+ DatagramPacket pkt = Packet.encode(msg, group);
+ synchronized (outSock) {
+ outSock.send(pkt);
+ }
+ } catch (Exception e) {
+ log.warning(e.getMessage());
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Retransmit thread interrupted: "+e.getMessage());
+ }
+ }
+ }
+
private class Timeout implements Runnable {
MessageID mid;