summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-15 10:30:46 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-15 10:30:46 -0500
commitcc22810588d7d4df07cf4292895035128d93d673 (patch)
treed6871fa4b942614a540c0d66dee3b7e4bf657b45 /src/main/java/derms/net
parent99878058ebffc848fa723b2d96367519483901a0 (diff)
downloadsoen423-cc22810588d7d4df07cf4292895035128d93d673.zip
reliable multicast: Timeout
Diffstat (limited to 'src/main/java/derms/net')
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java14
-rw-r--r--src/main/java/derms/net/rmulticast/Timeout.java37
2 files changed, 38 insertions, 13 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index 22e16a2..85d4f31 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -61,7 +61,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
DatagramPacket pkt = Packet.encode(msg, group);
outSock.send(pkt);
positiveAcks.clear();
- (new Thread(new Timeout(msg.id()))).start();
+ (new Thread(new Timeout<T>(msg, positiveAcks, retransmissions))).start();
lastSend.set(Instant.now());
}
@@ -72,16 +72,4 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
}
}
- private class Timeout implements Runnable {
- MessageID mid;
-
- private Timeout(MessageID mid) {
- this.mid = mid;
- }
-
- @Override
- public void run() {
- // TODO
- }
- }
}
diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java
new file mode 100644
index 0000000..7eaac3e
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Timeout.java
@@ -0,0 +1,37 @@
+package derms.net.rmulticast;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Logger;
+
+/** If a message is not positively acknowledged after some time, Timeout puts it in the retransmissions list. */
+class Timeout<T extends Serializable & Hashable> implements Runnable {
+ private static final Duration timeout = Duration.ofSeconds(1);
+
+ private final Message<T> msg;
+ private final Set<MessageID> positiveAcks;
+ private final BlockingQueue<Message<T>> retransmissions;
+ private final Logger log;
+
+ Timeout(Message<T> msg, Set<MessageID> positiveAcks, BlockingQueue<Message<T>> retransmissions) {
+ this.msg = msg;
+ this.positiveAcks = positiveAcks;
+ this.retransmissions = retransmissions;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(timeout.toMillis());
+ if (!positiveAcks.contains(msg.id())) {
+ log.info("Message "+msg.id()+" not ack'ed after "+timeout+"; retransmitting.");
+ retransmissions.put(msg);
+ }
+ } catch (InterruptedException e) {
+ log.info("Timeout thread interrupted: " + e.getMessage());
+ }
+ }
+}