From 1305d223fafc79be7fad07aee1f4348b2eaeaae5 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 23 Nov 2024 13:12:29 -0500 Subject: rmulticast: use DatagramChannel --- src/main/java/derms/net/Net.java | 19 ++++++++++ src/main/java/derms/net/rmulticast/Heartbeat.java | 36 +++++++++--------- src/main/java/derms/net/rmulticast/Receive.java | 44 +++++++++++----------- .../derms/net/rmulticast/ReliableMulticast.java | 30 +++++++++------ src/main/java/derms/net/rmulticast/Retransmit.java | 35 ++++++++--------- src/main/java/derms/net/tomulticast/Receive.java | 3 +- .../derms/net/tomulticast/TotalOrderMulticast.java | 2 +- .../tomulticast/TotalOrderMulticastReceiver.java | 18 ++++----- 8 files changed, 105 insertions(+), 82 deletions(-) create mode 100644 src/main/java/derms/net/Net.java (limited to 'src') diff --git a/src/main/java/derms/net/Net.java b/src/main/java/derms/net/Net.java new file mode 100644 index 0000000..f2eb1fd --- /dev/null +++ b/src/main/java/derms/net/Net.java @@ -0,0 +1,19 @@ +package derms.net; + +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.NoSuchElementException; + +public class Net { + /** Return the first non-loopback multicast interface in the system, or throw exception if no such interface exists. */ + public static NetworkInterface getMulticastInterface() throws SocketException, NoSuchElementException { + Enumeration ifss = NetworkInterface.getNetworkInterfaces(); + while (ifss.hasMoreElements()) { + NetworkInterface ifs = ifss.nextElement(); + if (ifs.supportsMulticast() && !ifs.isLoopback() && ifs.isUp()) + return ifs; + } + throw new NoSuchElementException("no multicast interface available"); + } +} diff --git a/src/main/java/derms/net/rmulticast/Heartbeat.java b/src/main/java/derms/net/rmulticast/Heartbeat.java index 5dc4be2..166d10e 100644 --- a/src/main/java/derms/net/rmulticast/Heartbeat.java +++ b/src/main/java/derms/net/rmulticast/Heartbeat.java @@ -1,13 +1,14 @@ package derms.net.rmulticast; -import derms.net.ConcurrentMulticastSocket; -import derms.net.Packet; +import derms.io.Serial; import derms.util.Wait; import java.io.IOException; -import java.net.DatagramPacket; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.Set; import java.util.logging.Logger; @@ -23,31 +24,30 @@ class Heartbeat implements Runnable { private final InetSocketAddress group; private final InetAddress laddr; private final Set acks, nacks; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final Logger log; - Heartbeat(InetSocketAddress group, InetAddress laddr, Set acks, Set nacks, ConcurrentMulticastSocket outSock) throws IOException { + Heartbeat(InetSocketAddress group, InetAddress laddr, Set acks, Set nacks, DatagramChannel sock) throws IOException { this.group = group; this.laddr = laddr; this.acks = acks; this.nacks = nacks; - this.outSock = outSock; + this.sock = sock; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { - try { - for (;;) { - try { - Wait.forDuration(period); - send(); - } catch (IOException e) { - log.warning(e.getMessage()); - } + for (;;) { + try { + Wait.forDuration(period); + send(); + } catch (InterruptedException | ClosedChannelException e) { + log.info("Shutting down."); + return; + } catch (IOException e) { + log.warning(e.getMessage()); } - } catch (InterruptedException e) { - log.info("Interrupted. Shutting down."); } } @@ -56,8 +56,8 @@ class Heartbeat implements Runnable { laddr, acks.toArray(new MessageID[0]), nacks.toArray(new MessageID[0])); - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); acks.clear(); } } diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index c9e1acf..d39ccf1 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -1,15 +1,14 @@ package derms.net.rmulticast; -import derms.net.ConcurrentMulticastSocket; +import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import java.io.IOException; -import java.net.DatagramPacket; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.time.Duration; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -17,9 +16,8 @@ import java.util.logging.Logger; class Receive implements Runnable { private static final int bufSize = 8192; - private static final Duration sockTimeout = Duration.ofMillis(500); - private final ConcurrentMulticastSocket inSock; + private final DatagramChannel sock; private final Set acks; // Positively acknowledged messages. private final Set nacks; // Negatively acknowledged messages. private final ReceivedSet received; @@ -28,36 +26,36 @@ class Receive implements Runnable { private final BlockingQueue> delivered; private final Logger log; - Receive(InetSocketAddress group, Set acks, Set nacks, ReceivedSet received, BlockingQueue> retransmissions, Set groupMembers, BlockingQueue> delivered) throws IOException { - this.inSock = new ConcurrentMulticastSocket(group.getPort()); - this.inSock.joinGroup(group.getAddress()); - this.inSock.setSoTimeout(sockTimeout); - + Receive(DatagramChannel sock, Set acks, Set nacks, ReceivedSet received, BlockingQueue> retransmissions, Set groupMembers, BlockingQueue> delivered) throws IOException { this.acks = acks; this.nacks = nacks; this.received = received; this.retransmissions = retransmissions; this.groupMembers = groupMembers; + this.sock = sock; this.delivered = delivered; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { - log.info("Listening on " + inSock); - DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + try { + log.info("Listening on " + sock.getLocalAddress()); + } catch (IOException e) { + log.warning(e.getMessage()); + } + + ByteBuffer buf = ByteBuffer.allocate(bufSize); for (;;) { + buf.clear(); try { - inSock.receive(pkt); - Message msg = Packet.decode(pkt, Message.class); + sock.receive(buf); + Message msg = Serial.decode(buf, Message.class); receive(msg); log.info("Received " + msg); - } catch (SocketTimeoutException e) { - if (Thread.interrupted()) { - log.info("Interrupted. Shutting down."); - inSock.close(); - return; - } + } catch (ClosedChannelException e) { + log.info("Shutting down."); + return; } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index c23baaa..b194c96 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -1,12 +1,16 @@ package derms.net.rmulticast; +import derms.io.Serial; import derms.net.ConcurrentMulticastSocket; import derms.net.MessagePayload; +import derms.net.Net; import derms.net.Packet; import derms.util.ThreadPool; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.Set; import java.util.concurrent.*; @@ -26,7 +30,7 @@ public class ReliableMulticast { private final ReceivedSet received; private final BlockingQueue> retransmissions; // Messages pending retransmission. private final Set groupMembers; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final BlockingQueue> delivered; private final Logger log; private final ExecutorService pool; @@ -47,24 +51,28 @@ public class ReliableMulticast { this.retransmissions = new LinkedBlockingQueue>(); this.groupMembers = ConcurrentHashMap.newKeySet(); - this.outSock = new ConcurrentMulticastSocket(); - this.outSock.joinGroup(group.getAddress()); + NetworkInterface ifs = Net.getMulticastInterface(); + this.sock = DatagramChannel.open(StandardProtocolFamily.INET) + .setOption(StandardSocketOptions.SO_REUSEADDR, true) + .bind(new InetSocketAddress(group.getAddress(), group.getPort())) + .setOption(StandardSocketOptions.IP_MULTICAST_IF, ifs); + sock.join(group.getAddress(), ifs); this.delivered = new LinkedBlockingQueue>(); this.log = Logger.getLogger(this.getClass().getName()); this.pool = Executors.newCachedThreadPool(); - pool.execute(new Receive(group, acks, nacks, received, retransmissions, groupMembers, delivered)); - pool.execute(new Retransmit(retransmissions, outSock, group)); + pool.execute(new Receive(sock, acks, nacks, received, retransmissions, groupMembers, delivered)); + pool.execute(new Retransmit(retransmissions, sock, group)); pool.execute(new Prune(received, groupMembers)); - pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock)); + pool.execute(new Heartbeat(group, laddr, acks, nacks, sock)); } - public void close() { + public void close() throws IOException { log.info("Shutting down..."); + sock.close(); ThreadPool.shutdownNow(pool, log); - outSock.close(); log.info("Finished shutting down."); } @@ -75,11 +83,11 @@ public class ReliableMulticast { laddr, acks.toArray(new MessageID[0]), nacks.toArray(new MessageID[0])); - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); acks.clear(); (new Thread(new Timeout(msg, acks, retransmissions))).start(); - log.info("Sent " + msg + " from " + outSock + " to " + group); + log.info("Sent " + msg + " from " + sock.getLocalAddress() + " to " + group); } /** Receive a message from the group, blocking if necessary until a message arrives. */ diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java index 9d5e10b..0d2a604 100644 --- a/src/main/java/derms/net/rmulticast/Retransmit.java +++ b/src/main/java/derms/net/rmulticast/Retransmit.java @@ -1,43 +1,44 @@ package derms.net.rmulticast; -import derms.net.ConcurrentMulticastSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; -import java.net.DatagramPacket; +import java.io.IOException; import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; /** Retransmit dropped messages. */ class Retransmit implements Runnable { private final BlockingQueue> retransmissions; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final SocketAddress group; private final Logger log; - Retransmit(BlockingQueue> retransmissions, ConcurrentMulticastSocket outSock, SocketAddress group) { + Retransmit(BlockingQueue> retransmissions, DatagramChannel sock, SocketAddress group) { this.retransmissions = retransmissions; - this.outSock = outSock; + this.sock = sock; this.group = group; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { - try { - for (; ; ) { + for (;;) { + try { Message msg = retransmissions.take(); - try { - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); - log.info("Retransmitted " + msg); - } catch (Exception e) { - log.warning(e.getMessage()); - } + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); + log.info("Retransmitted " + msg); + } catch (InterruptedException | ClosedChannelException e) { + log.info("Shutting down."); + return; + } catch (IOException e) { + log.warning(e.getMessage()); } - } catch (InterruptedException e) { - log.info("Interrupted. Shutting down."); } } } diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java index aad0a0b..34c29bc 100644 --- a/src/main/java/derms/net/tomulticast/Receive.java +++ b/src/main/java/derms/net/tomulticast/Receive.java @@ -5,6 +5,7 @@ import derms.net.MessagePayload; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -32,7 +33,7 @@ class Receive extends TotalOrderMulticast implement tryDeliver(); } } catch (InterruptedException e) { - close(); + log.info("Shutting down."); } } diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java index d2f7e61..0d8b690 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java @@ -37,7 +37,7 @@ public abstract class TotalOrderMulticast { } /** Close the underlying socket. */ - public void close() { + public void close() throws IOException { sock.close(); } diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java index 83f2da5..549af16 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -1,6 +1,7 @@ package derms.net.tomulticast; import derms.net.MessagePayload; +import derms.util.ThreadPool; import java.io.IOException; import java.net.InetAddress; @@ -16,6 +17,7 @@ public class TotalOrderMulticastReceiver { private final BlockingQueue> deliver; private final Logger log; private final ExecutorService pool; + private final Receive receiver; /** * Join the specified totally-ordered multicast group as a receiver. @@ -28,20 +30,14 @@ public class TotalOrderMulticastReceiver { this.log = Logger.getLogger(getClass().getName()); this.pool = Executors.newSingleThreadExecutor(); - pool.execute(new Receive(group, laddr, deliver)); + this.receiver = new Receive(group, laddr, deliver); + pool.execute(receiver); } /** Close the underlying socket. */ - public void close() { - pool.shutdownNow(); - try { - if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) - log.warning("Thread pool did not terminate after " + terminationTimeout); - } catch (InterruptedException e) { - log.warning("Interrupted while terminating thread pool: " + e.getMessage()); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } + public void close() throws IOException { + receiver.close(); + ThreadPool.shutdown(pool, log); } /** Receive a message from the group, blocking if necessary until one arrives. */ -- cgit v1.2.3