diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-15 10:30:46 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-15 10:30:46 -0500 |
| commit | cc22810588d7d4df07cf4292895035128d93d673 (patch) | |
| tree | d6871fa4b942614a540c0d66dee3b7e4bf657b45 | |
| parent | 99878058ebffc848fa723b2d96367519483901a0 (diff) | |
| download | soen423-cc22810588d7d4df07cf4292895035128d93d673.zip | |
reliable multicast: Timeout
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 14 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Timeout.java | 37 |
2 files changed, 38 insertions, 13 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 22e16a2..85d4f31 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -61,7 +61,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { DatagramPacket pkt = Packet.encode(msg, group); outSock.send(pkt); positiveAcks.clear(); - (new Thread(new Timeout(msg.id()))).start(); + (new Thread(new Timeout<T>(msg, positiveAcks, retransmissions))).start(); lastSend.set(Instant.now()); } @@ -72,16 +72,4 @@ public class ReliableMulticast<T extends Serializable & Hashable> { } } - private class Timeout implements Runnable { - MessageID mid; - - private Timeout(MessageID mid) { - this.mid = mid; - } - - @Override - public void run() { - // TODO - } - } } diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java new file mode 100644 index 0000000..7eaac3e --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Timeout.java @@ -0,0 +1,37 @@ +package derms.net.rmulticast; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Logger; + +/** If a message is not positively acknowledged after some time, Timeout puts it in the retransmissions list. */ +class Timeout<T extends Serializable & Hashable> implements Runnable { + private static final Duration timeout = Duration.ofSeconds(1); + + private final Message<T> msg; + private final Set<MessageID> positiveAcks; + private final BlockingQueue<Message<T>> retransmissions; + private final Logger log; + + Timeout(Message<T> msg, Set<MessageID> positiveAcks, BlockingQueue<Message<T>> retransmissions) { + this.msg = msg; + this.positiveAcks = positiveAcks; + this.retransmissions = retransmissions; + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + Thread.sleep(timeout.toMillis()); + if (!positiveAcks.contains(msg.id())) { + log.info("Message "+msg.id()+" not ack'ed after "+timeout+"; retransmitting."); + retransmissions.put(msg); + } + } catch (InterruptedException e) { + log.info("Timeout thread interrupted: " + e.getMessage()); + } + } +} |