diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-22 12:02:08 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-22 12:02:08 -0500 |
| commit | 43549dee0a8214c1bb02c874dcfd7ba37a8ef310 (patch) | |
| tree | 12cd75a25518c6d0cad495e0c5eed65d996765b2 /src/main/java/derms/net/runicast/Retransmit.java | |
| parent | 91967cbd407254358ab768e74ebcfda8d4a30bc8 (diff) | |
| download | soen423-43549dee0a8214c1bb02c874dcfd7ba37a8ef310.zip | |
ReliableUnicastSender
Diffstat (limited to 'src/main/java/derms/net/runicast/Retransmit.java')
| -rw-r--r-- | src/main/java/derms/net/runicast/Retransmit.java | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/src/main/java/derms/net/runicast/Retransmit.java b/src/main/java/derms/net/runicast/Retransmit.java new file mode 100644 index 0000000..affd00c --- /dev/null +++ b/src/main/java/derms/net/runicast/Retransmit.java @@ -0,0 +1,57 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; +import derms.util.Wait; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** Retransmit unacknowledged messages. */ +class Retransmit<T extends MessagePayload> implements Runnable { + private static final Duration timeout = Duration.ofMillis(500); + + private final AtomicLong unacked; + private final Queue<Message<T>> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + + Retransmit(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) { + this.unacked = unacked; + this.sent = sent; + this.sock = sock; + this.log = Logger.getLogger(getClass().getName()); + } + + @Override + public void run() { + try { + for (;;) { + Wait.forDuration(timeout); + + for (Message<T> msg : sent) { + if (msg.seq >= unacked.get()) { + retransmit(msg); + } + } + } + } catch (InterruptedException e) { + log.info("Interrupted."); + } + } + + private void retransmit(Message<T> msg) { + try { + DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); + sock.send(pkt); + log.info("Retransmitted " + msg); + } catch (IOException e) { + log.warning("Failed to retransmit " + msg + ": " + e.getMessage()); + } + } +} |