From 1305d223fafc79be7fad07aee1f4348b2eaeaae5 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 23 Nov 2024 13:12:29 -0500 Subject: rmulticast: use DatagramChannel --- .../derms/net/rmulticast/ReliableMulticast.java | 30 ++++++++++++++-------- 1 file changed, 19 insertions(+), 11 deletions(-) (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java') diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index c23baaa..b194c96 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -1,12 +1,16 @@ package derms.net.rmulticast; +import derms.io.Serial; import derms.net.ConcurrentMulticastSocket; import derms.net.MessagePayload; +import derms.net.Net; import derms.net.Packet; import derms.util.ThreadPool; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.Set; import java.util.concurrent.*; @@ -26,7 +30,7 @@ public class ReliableMulticast { private final ReceivedSet received; private final BlockingQueue> retransmissions; // Messages pending retransmission. private final Set groupMembers; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final BlockingQueue> delivered; private final Logger log; private final ExecutorService pool; @@ -47,24 +51,28 @@ public class ReliableMulticast { this.retransmissions = new LinkedBlockingQueue>(); this.groupMembers = ConcurrentHashMap.newKeySet(); - this.outSock = new ConcurrentMulticastSocket(); - this.outSock.joinGroup(group.getAddress()); + NetworkInterface ifs = Net.getMulticastInterface(); + this.sock = DatagramChannel.open(StandardProtocolFamily.INET) + .setOption(StandardSocketOptions.SO_REUSEADDR, true) + .bind(new InetSocketAddress(group.getAddress(), group.getPort())) + .setOption(StandardSocketOptions.IP_MULTICAST_IF, ifs); + sock.join(group.getAddress(), ifs); this.delivered = new LinkedBlockingQueue>(); this.log = Logger.getLogger(this.getClass().getName()); this.pool = Executors.newCachedThreadPool(); - pool.execute(new Receive(group, acks, nacks, received, retransmissions, groupMembers, delivered)); - pool.execute(new Retransmit(retransmissions, outSock, group)); + pool.execute(new Receive(sock, acks, nacks, received, retransmissions, groupMembers, delivered)); + pool.execute(new Retransmit(retransmissions, sock, group)); pool.execute(new Prune(received, groupMembers)); - pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock)); + pool.execute(new Heartbeat(group, laddr, acks, nacks, sock)); } - public void close() { + public void close() throws IOException { log.info("Shutting down..."); + sock.close(); ThreadPool.shutdownNow(pool, log); - outSock.close(); log.info("Finished shutting down."); } @@ -75,11 +83,11 @@ public class ReliableMulticast { laddr, acks.toArray(new MessageID[0]), nacks.toArray(new MessageID[0])); - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); acks.clear(); (new Thread(new Timeout(msg, acks, retransmissions))).start(); - log.info("Sent " + msg + " from " + outSock + " to " + group); + log.info("Sent " + msg + " from " + sock.getLocalAddress() + " to " + group); } /** Receive a message from the group, blocking if necessary until a message arrives. */ -- cgit v1.2.3