From 24e443366e31b194acac8327cb2b7fdb6a2cca89 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Thu, 14 Nov 2024 11:53:43 -0500 Subject: refactor ReliableMulticast.Retransmit to top-level class --- src/main/java/derms/net/rmulticast/Retransmit.java | 44 ++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 src/main/java/derms/net/rmulticast/Retransmit.java (limited to 'src/main/java/derms/net/rmulticast/Retransmit.java') diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java new file mode 100644 index 0000000..ad44eb1 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Retransmit.java @@ -0,0 +1,44 @@ +package derms.net.rmulticast; + +import derms.net.Packet; + +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.MulticastSocket; +import java.net.SocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Logger; + +/** Retransmit dropped messages. */ +class Retransmit implements Runnable { + private final BlockingQueue> retransmissions; + private final MulticastSocket outSock; + private final SocketAddress group; + private final Logger log; + + Retransmit(BlockingQueue> retransmissions, MulticastSocket outSock, SocketAddress group) { + this.retransmissions = retransmissions; + this.outSock = outSock; + this.group = group; + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + for (; ; ) { + Message msg = retransmissions.take(); + try { + DatagramPacket pkt = Packet.encode(msg, group); + synchronized (outSock) { + outSock.send(pkt); + } + } catch (Exception e) { + log.warning(e.getMessage()); + } + } + } catch (InterruptedException e) { + log.info("Retransmit thread interrupted: " + e.getMessage()); + } + } +} -- cgit v1.2.3