package derms.net.rmulticast; import java.io.Serializable; import java.time.Duration; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; /** If a message is not positively acknowledged after some time, Timeout puts it in the retransmissions list. */ class Timeout implements Runnable { private static final Duration timeout = Duration.ofSeconds(1); private final Message msg; private final Set acks; // Positively acknowledged messages. private final BlockingQueue> retransmissions; private final Logger log; /** * @param acks Positively acknowledged messages. */ Timeout(Message msg, Set acks, BlockingQueue> retransmissions) { this.msg = msg; this.acks = acks; this.retransmissions = retransmissions; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { try { for (;;) { Wait.forDuration(timeout); if (acks.contains(msg.id())) { log.info("Message " + msg.id() + "positively ack'ed."); return; } else { log.info("Message " + msg.id() + " not ack'ed after " + timeout + "; retransmitting."); retransmissions.put(msg); } } } catch (InterruptedException e) { log.info("Timeout thread interrupted: " + e.getMessage()); } } }