summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/Retransmit.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Retransmit.java')
-rw-r--r--src/main/java/derms/net/rmulticast/Retransmit.java44
1 files changed, 44 insertions, 0 deletions
diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java
new file mode 100644
index 0000000..ad44eb1
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Retransmit.java
@@ -0,0 +1,44 @@
+package derms.net.rmulticast;
+
+import derms.net.Packet;
+
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.MulticastSocket;
+import java.net.SocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Logger;
+
+/** Retransmit dropped messages. */
+class Retransmit<T extends Serializable & Hashable> implements Runnable {
+ private final BlockingQueue<Message<T>> retransmissions;
+ private final MulticastSocket outSock;
+ private final SocketAddress group;
+ private final Logger log;
+
+ Retransmit(BlockingQueue<Message<T>> retransmissions, MulticastSocket outSock, SocketAddress group) {
+ this.retransmissions = retransmissions;
+ this.outSock = outSock;
+ this.group = group;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (; ; ) {
+ Message<T> msg = retransmissions.take();
+ try {
+ DatagramPacket pkt = Packet.encode(msg, group);
+ synchronized (outSock) {
+ outSock.send(pkt);
+ }
+ } catch (Exception e) {
+ log.warning(e.getMessage());
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Retransmit thread interrupted: " + e.getMessage());
+ }
+ }
+}