package derms.net.rmulticast; import java.io.Serializable; import java.time.Duration; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; /** If a message is not positively acknowledged after some time, Timeout puts it in the retransmissions list. */ class Timeout implements Runnable { private static final Duration timeout = Duration.ofSeconds(1); private final Message msg; private final Set positiveAcks; private final BlockingQueue> retransmissions; private final Logger log; Timeout(Message msg, Set positiveAcks, BlockingQueue> retransmissions) { this.msg = msg; this.positiveAcks = positiveAcks; this.retransmissions = retransmissions; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { try { Thread.sleep(timeout.toMillis()); if (!positiveAcks.contains(msg.id())) { log.info("Message "+msg.id()+" not ack'ed after "+timeout+"; retransmitting."); retransmissions.put(msg); } } catch (InterruptedException e) { log.info("Timeout thread interrupted: " + e.getMessage()); } } }