summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/ReliableMulticast.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java13
1 files changed, 6 insertions, 7 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index f63433f..22e16a2 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -1,5 +1,6 @@
package derms.net.rmulticast;
+import derms.net.ConcurrentMulticastSocket;
import derms.net.Packet;
import java.io.IOException;
@@ -21,7 +22,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
private final AtomicReference<Instant> lastSend;
private final SocketAddress group;
- private final MulticastSocket inSock, outSock;
+ private final ConcurrentMulticastSocket inSock, outSock;
private final InetAddress laddr; // Local address.
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
@@ -35,10 +36,10 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.group = group;
- this.inSock = new MulticastSocket();
+ this.inSock = new ConcurrentMulticastSocket();
this.inSock.joinGroup(group.getAddress());
- this.outSock = new MulticastSocket(group.getPort());
+ this.outSock = new ConcurrentMulticastSocket(group.getPort());
this.outSock.joinGroup(group.getAddress());
this.laddr = laddr;
@@ -48,7 +49,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.log = Logger.getLogger(this.getClass().getName());
(new Thread(new Receive())).start();
- (new Thread(new Retransmit(retransmissions, outSock, group))).start();
+ (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
}
public void send(T payload) throws IOException {
@@ -58,9 +59,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
positiveAcks.toArray(new MessageID[0]),
negativeAcks.toArray(new MessageID[0]));
DatagramPacket pkt = Packet.encode(msg, group);
- synchronized (outSock) {
- outSock.send(pkt);
- }
+ outSock.send(pkt);
positiveAcks.clear();
(new Thread(new Timeout(msg.id()))).start();
lastSend.set(Instant.now());