From d333754c8d4f8af9534300e457d026174a69cb65 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Thu, 14 Nov 2024 12:05:24 -0500 Subject: ConcurrentMulticastSocket --- src/main/java/derms/net/rmulticast/ReliableMulticast.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java') diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index f63433f..22e16a2 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -1,5 +1,6 @@ package derms.net.rmulticast; +import derms.net.ConcurrentMulticastSocket; import derms.net.Packet; import java.io.IOException; @@ -21,7 +22,7 @@ public class ReliableMulticast { private final BlockingQueue> retransmissions; // Messages pending retransmission. private final AtomicReference lastSend; private final SocketAddress group; - private final MulticastSocket inSock, outSock; + private final ConcurrentMulticastSocket inSock, outSock; private final InetAddress laddr; // Local address. private final BlockingQueue> delivered; private final Logger log; @@ -35,10 +36,10 @@ public class ReliableMulticast { this.group = group; - this.inSock = new MulticastSocket(); + this.inSock = new ConcurrentMulticastSocket(); this.inSock.joinGroup(group.getAddress()); - this.outSock = new MulticastSocket(group.getPort()); + this.outSock = new ConcurrentMulticastSocket(group.getPort()); this.outSock.joinGroup(group.getAddress()); this.laddr = laddr; @@ -48,7 +49,7 @@ public class ReliableMulticast { this.log = Logger.getLogger(this.getClass().getName()); (new Thread(new Receive())).start(); - (new Thread(new Retransmit(retransmissions, outSock, group))).start(); + (new Thread(new Retransmit(retransmissions, outSock, group))).start(); } public void send(T payload) throws IOException { @@ -58,9 +59,7 @@ public class ReliableMulticast { positiveAcks.toArray(new MessageID[0]), negativeAcks.toArray(new MessageID[0])); DatagramPacket pkt = Packet.encode(msg, group); - synchronized (outSock) { - outSock.send(pkt); - } + outSock.send(pkt); positiveAcks.clear(); (new Thread(new Timeout(msg.id()))).start(); lastSend.set(Instant.now()); -- cgit v1.2.3