summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/derms/net/rmulticast/Timeout.java23
1 files changed, 19 insertions, 4 deletions
diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java
index 7eaac3e..2f2d0ed 100644
--- a/src/main/java/derms/net/rmulticast/Timeout.java
+++ b/src/main/java/derms/net/rmulticast/Timeout.java
@@ -2,6 +2,7 @@ package derms.net.rmulticast;
import java.io.Serializable;
import java.time.Duration;
+import java.time.Instant;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
@@ -25,13 +26,27 @@ class Timeout<T extends Serializable & Hashable> implements Runnable {
@Override
public void run() {
try {
- Thread.sleep(timeout.toMillis());
- if (!positiveAcks.contains(msg.id())) {
- log.info("Message "+msg.id()+" not ack'ed after "+timeout+"; retransmitting.");
- retransmissions.put(msg);
+ for (;;) {
+ waitUntilTimeout();
+ if (positiveAcks.contains(msg.id())) {
+ log.info("Message " + msg.id() + "positively ack'ed.");
+ return;
+ } else {
+ log.info("Message " + msg.id() + " not ack'ed after " + timeout + "; retransmitting.");
+ retransmissions.put(msg);
+ }
}
} catch (InterruptedException e) {
log.info("Timeout thread interrupted: " + e.getMessage());
}
}
+
+ private void waitUntilTimeout() {
+ Instant start = Instant.now();
+ Duration elapsed;
+ do {
+ Thread.yield();
+ elapsed = Duration.between(start, Instant.now());
+ } while (elapsed.compareTo(timeout) < 0);
+ }
}