diff options
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Timeout.java')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Timeout.java | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java new file mode 100644 index 0000000..7eaac3e --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Timeout.java @@ -0,0 +1,37 @@ +package derms.net.rmulticast; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Logger; + +/** If a message is not positively acknowledged after some time, Timeout puts it in the retransmissions list. */ +class Timeout<T extends Serializable & Hashable> implements Runnable { + private static final Duration timeout = Duration.ofSeconds(1); + + private final Message<T> msg; + private final Set<MessageID> positiveAcks; + private final BlockingQueue<Message<T>> retransmissions; + private final Logger log; + + Timeout(Message<T> msg, Set<MessageID> positiveAcks, BlockingQueue<Message<T>> retransmissions) { + this.msg = msg; + this.positiveAcks = positiveAcks; + this.retransmissions = retransmissions; + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + Thread.sleep(timeout.toMillis()); + if (!positiveAcks.contains(msg.id())) { + log.info("Message "+msg.id()+" not ack'ed after "+timeout+"; retransmitting."); + retransmissions.put(msg); + } + } catch (InterruptedException e) { + log.info("Timeout thread interrupted: " + e.getMessage()); + } + } +} |