diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-23 13:12:29 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-23 13:12:29 -0500 |
| commit | 1305d223fafc79be7fad07aee1f4348b2eaeaae5 (patch) | |
| tree | dcd9b2377048573fa092488a5a7612f712183753 | |
| parent | 171c58d8ffb29c08ce55d789f0cc1b593c7f5e86 (diff) | |
| download | soen423-1305d223fafc79be7fad07aee1f4348b2eaeaae5.zip | |
rmulticast: use DatagramChannel
8 files changed, 105 insertions, 82 deletions
diff --git a/src/main/java/derms/net/Net.java b/src/main/java/derms/net/Net.java new file mode 100644 index 0000000..f2eb1fd --- /dev/null +++ b/src/main/java/derms/net/Net.java @@ -0,0 +1,19 @@ +package derms.net; + +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.NoSuchElementException; + +public class Net { + /** Return the first non-loopback multicast interface in the system, or throw exception if no such interface exists. */ + public static NetworkInterface getMulticastInterface() throws SocketException, NoSuchElementException { + Enumeration<NetworkInterface> ifss = NetworkInterface.getNetworkInterfaces(); + while (ifss.hasMoreElements()) { + NetworkInterface ifs = ifss.nextElement(); + if (ifs.supportsMulticast() && !ifs.isLoopback() && ifs.isUp()) + return ifs; + } + throw new NoSuchElementException("no multicast interface available"); + } +} diff --git a/src/main/java/derms/net/rmulticast/Heartbeat.java b/src/main/java/derms/net/rmulticast/Heartbeat.java index 5dc4be2..166d10e 100644 --- a/src/main/java/derms/net/rmulticast/Heartbeat.java +++ b/src/main/java/derms/net/rmulticast/Heartbeat.java @@ -1,13 +1,14 @@ package derms.net.rmulticast; -import derms.net.ConcurrentMulticastSocket; -import derms.net.Packet; +import derms.io.Serial; import derms.util.Wait; import java.io.IOException; -import java.net.DatagramPacket; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.Set; import java.util.logging.Logger; @@ -23,31 +24,30 @@ class Heartbeat implements Runnable { private final InetSocketAddress group; private final InetAddress laddr; private final Set<MessageID> acks, nacks; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final Logger log; - Heartbeat(InetSocketAddress group, InetAddress laddr, Set<MessageID> acks, Set<MessageID> nacks, ConcurrentMulticastSocket outSock) throws IOException { + Heartbeat(InetSocketAddress group, InetAddress laddr, Set<MessageID> acks, Set<MessageID> nacks, DatagramChannel sock) throws IOException { this.group = group; this.laddr = laddr; this.acks = acks; this.nacks = nacks; - this.outSock = outSock; + this.sock = sock; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { - try { - for (;;) { - try { - Wait.forDuration(period); - send(); - } catch (IOException e) { - log.warning(e.getMessage()); - } + for (;;) { + try { + Wait.forDuration(period); + send(); + } catch (InterruptedException | ClosedChannelException e) { + log.info("Shutting down."); + return; + } catch (IOException e) { + log.warning(e.getMessage()); } - } catch (InterruptedException e) { - log.info("Interrupted. Shutting down."); } } @@ -56,8 +56,8 @@ class Heartbeat implements Runnable { laddr, acks.toArray(new MessageID[0]), nacks.toArray(new MessageID[0])); - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); acks.clear(); } } diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index c9e1acf..d39ccf1 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -1,15 +1,14 @@ package derms.net.rmulticast; -import derms.net.ConcurrentMulticastSocket; +import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import java.io.IOException; -import java.net.DatagramPacket; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.time.Duration; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -17,9 +16,8 @@ import java.util.logging.Logger; class Receive<T extends MessagePayload> implements Runnable { private static final int bufSize = 8192; - private static final Duration sockTimeout = Duration.ofMillis(500); - private final ConcurrentMulticastSocket inSock; + private final DatagramChannel sock; private final Set<MessageID> acks; // Positively acknowledged messages. private final Set<MessageID> nacks; // Negatively acknowledged messages. private final ReceivedSet<T> received; @@ -28,36 +26,36 @@ class Receive<T extends MessagePayload> implements Runnable { private final BlockingQueue<Message<T>> delivered; private final Logger log; - Receive(InetSocketAddress group, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException { - this.inSock = new ConcurrentMulticastSocket(group.getPort()); - this.inSock.joinGroup(group.getAddress()); - this.inSock.setSoTimeout(sockTimeout); - + Receive(DatagramChannel sock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException { this.acks = acks; this.nacks = nacks; this.received = received; this.retransmissions = retransmissions; this.groupMembers = groupMembers; + this.sock = sock; this.delivered = delivered; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { - log.info("Listening on " + inSock); - DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + try { + log.info("Listening on " + sock.getLocalAddress()); + } catch (IOException e) { + log.warning(e.getMessage()); + } + + ByteBuffer buf = ByteBuffer.allocate(bufSize); for (;;) { + buf.clear(); try { - inSock.receive(pkt); - Message<?> msg = Packet.decode(pkt, Message.class); + sock.receive(buf); + Message<?> msg = Serial.decode(buf, Message.class); receive(msg); log.info("Received " + msg); - } catch (SocketTimeoutException e) { - if (Thread.interrupted()) { - log.info("Interrupted. Shutting down."); - inSock.close(); - return; - } + } catch (ClosedChannelException e) { + log.info("Shutting down."); + return; } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index c23baaa..b194c96 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -1,12 +1,16 @@ package derms.net.rmulticast; +import derms.io.Serial; import derms.net.ConcurrentMulticastSocket; import derms.net.MessagePayload; +import derms.net.Net; import derms.net.Packet; import derms.util.ThreadPool; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.Set; import java.util.concurrent.*; @@ -26,7 +30,7 @@ public class ReliableMulticast<T extends MessagePayload> { private final ReceivedSet<T> received; private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final Set<InetAddress> groupMembers; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final BlockingQueue<Message<T>> delivered; private final Logger log; private final ExecutorService pool; @@ -47,24 +51,28 @@ public class ReliableMulticast<T extends MessagePayload> { this.retransmissions = new LinkedBlockingQueue<Message<T>>(); this.groupMembers = ConcurrentHashMap.newKeySet(); - this.outSock = new ConcurrentMulticastSocket(); - this.outSock.joinGroup(group.getAddress()); + NetworkInterface ifs = Net.getMulticastInterface(); + this.sock = DatagramChannel.open(StandardProtocolFamily.INET) + .setOption(StandardSocketOptions.SO_REUSEADDR, true) + .bind(new InetSocketAddress(group.getAddress(), group.getPort())) + .setOption(StandardSocketOptions.IP_MULTICAST_IF, ifs); + sock.join(group.getAddress(), ifs); this.delivered = new LinkedBlockingQueue<Message<T>>(); this.log = Logger.getLogger(this.getClass().getName()); this.pool = Executors.newCachedThreadPool(); - pool.execute(new Receive<T>(group, acks, nacks, received, retransmissions, groupMembers, delivered)); - pool.execute(new Retransmit<T>(retransmissions, outSock, group)); + pool.execute(new Receive<T>(sock, acks, nacks, received, retransmissions, groupMembers, delivered)); + pool.execute(new Retransmit<T>(retransmissions, sock, group)); pool.execute(new Prune<T>(received, groupMembers)); - pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock)); + pool.execute(new Heartbeat(group, laddr, acks, nacks, sock)); } - public void close() { + public void close() throws IOException { log.info("Shutting down..."); + sock.close(); ThreadPool.shutdownNow(pool, log); - outSock.close(); log.info("Finished shutting down."); } @@ -75,11 +83,11 @@ public class ReliableMulticast<T extends MessagePayload> { laddr, acks.toArray(new MessageID[0]), nacks.toArray(new MessageID[0])); - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); acks.clear(); (new Thread(new Timeout<T>(msg, acks, retransmissions))).start(); - log.info("Sent " + msg + " from " + outSock + " to " + group); + log.info("Sent " + msg + " from " + sock.getLocalAddress() + " to " + group); } /** Receive a message from the group, blocking if necessary until a message arrives. */ diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java index 9d5e10b..0d2a604 100644 --- a/src/main/java/derms/net/rmulticast/Retransmit.java +++ b/src/main/java/derms/net/rmulticast/Retransmit.java @@ -1,43 +1,44 @@ package derms.net.rmulticast; -import derms.net.ConcurrentMulticastSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; -import java.net.DatagramPacket; +import java.io.IOException; import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; /** Retransmit dropped messages. */ class Retransmit<T extends MessagePayload> implements Runnable { private final BlockingQueue<Message<T>> retransmissions; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final SocketAddress group; private final Logger log; - Retransmit(BlockingQueue<Message<T>> retransmissions, ConcurrentMulticastSocket outSock, SocketAddress group) { + Retransmit(BlockingQueue<Message<T>> retransmissions, DatagramChannel sock, SocketAddress group) { this.retransmissions = retransmissions; - this.outSock = outSock; + this.sock = sock; this.group = group; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { - try { - for (; ; ) { + for (;;) { + try { Message<T> msg = retransmissions.take(); - try { - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); - log.info("Retransmitted " + msg); - } catch (Exception e) { - log.warning(e.getMessage()); - } + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); + log.info("Retransmitted " + msg); + } catch (InterruptedException | ClosedChannelException e) { + log.info("Shutting down."); + return; + } catch (IOException e) { + log.warning(e.getMessage()); } - } catch (InterruptedException e) { - log.info("Interrupted. Shutting down."); } } } diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java index aad0a0b..34c29bc 100644 --- a/src/main/java/derms/net/tomulticast/Receive.java +++ b/src/main/java/derms/net/tomulticast/Receive.java @@ -5,6 +5,7 @@ import derms.net.MessagePayload; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -32,7 +33,7 @@ class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implement tryDeliver(); } } catch (InterruptedException e) { - close(); + log.info("Shutting down."); } } diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java index d2f7e61..0d8b690 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java @@ -37,7 +37,7 @@ public abstract class TotalOrderMulticast<T extends MessagePayload> { } /** Close the underlying socket. */ - public void close() { + public void close() throws IOException { sock.close(); } diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java index 83f2da5..549af16 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -1,6 +1,7 @@ package derms.net.tomulticast; import derms.net.MessagePayload; +import derms.util.ThreadPool; import java.io.IOException; import java.net.InetAddress; @@ -16,6 +17,7 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> { private final BlockingQueue<Message<T>> deliver; private final Logger log; private final ExecutorService pool; + private final Receive<T> receiver; /** * Join the specified totally-ordered multicast group as a receiver. @@ -28,20 +30,14 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> { this.log = Logger.getLogger(getClass().getName()); this.pool = Executors.newSingleThreadExecutor(); - pool.execute(new Receive<T>(group, laddr, deliver)); + this.receiver = new Receive<T>(group, laddr, deliver); + pool.execute(receiver); } /** Close the underlying socket. */ - public void close() { - pool.shutdownNow(); - try { - if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) - log.warning("Thread pool did not terminate after " + terminationTimeout); - } catch (InterruptedException e) { - log.warning("Interrupted while terminating thread pool: " + e.getMessage()); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } + public void close() throws IOException { + receiver.close(); + ThreadPool.shutdown(pool, log); } /** Receive a message from the group, blocking if necessary until one arrives. */ |