summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-23 13:12:29 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-23 13:12:29 -0500
commit1305d223fafc79be7fad07aee1f4348b2eaeaae5 (patch)
treedcd9b2377048573fa092488a5a7612f712183753
parent171c58d8ffb29c08ce55d789f0cc1b593c7f5e86 (diff)
downloadsoen423-1305d223fafc79be7fad07aee1f4348b2eaeaae5.zip
rmulticast: use DatagramChannel
-rw-r--r--src/main/java/derms/net/Net.java19
-rw-r--r--src/main/java/derms/net/rmulticast/Heartbeat.java36
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java44
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java30
-rw-r--r--src/main/java/derms/net/rmulticast/Retransmit.java35
-rw-r--r--src/main/java/derms/net/tomulticast/Receive.java3
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticast.java2
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java18
8 files changed, 105 insertions, 82 deletions
diff --git a/src/main/java/derms/net/Net.java b/src/main/java/derms/net/Net.java
new file mode 100644
index 0000000..f2eb1fd
--- /dev/null
+++ b/src/main/java/derms/net/Net.java
@@ -0,0 +1,19 @@
+package derms.net;
+
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+
+public class Net {
+ /** Return the first non-loopback multicast interface in the system, or throw exception if no such interface exists. */
+ public static NetworkInterface getMulticastInterface() throws SocketException, NoSuchElementException {
+ Enumeration<NetworkInterface> ifss = NetworkInterface.getNetworkInterfaces();
+ while (ifss.hasMoreElements()) {
+ NetworkInterface ifs = ifss.nextElement();
+ if (ifs.supportsMulticast() && !ifs.isLoopback() && ifs.isUp())
+ return ifs;
+ }
+ throw new NoSuchElementException("no multicast interface available");
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/Heartbeat.java b/src/main/java/derms/net/rmulticast/Heartbeat.java
index 5dc4be2..166d10e 100644
--- a/src/main/java/derms/net/rmulticast/Heartbeat.java
+++ b/src/main/java/derms/net/rmulticast/Heartbeat.java
@@ -1,13 +1,14 @@
package derms.net.rmulticast;
-import derms.net.ConcurrentMulticastSocket;
-import derms.net.Packet;
+import derms.io.Serial;
import derms.util.Wait;
import java.io.IOException;
-import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
import java.time.Duration;
import java.util.Set;
import java.util.logging.Logger;
@@ -23,31 +24,30 @@ class Heartbeat implements Runnable {
private final InetSocketAddress group;
private final InetAddress laddr;
private final Set<MessageID> acks, nacks;
- private final ConcurrentMulticastSocket outSock;
+ private final DatagramChannel sock;
private final Logger log;
- Heartbeat(InetSocketAddress group, InetAddress laddr, Set<MessageID> acks, Set<MessageID> nacks, ConcurrentMulticastSocket outSock) throws IOException {
+ Heartbeat(InetSocketAddress group, InetAddress laddr, Set<MessageID> acks, Set<MessageID> nacks, DatagramChannel sock) throws IOException {
this.group = group;
this.laddr = laddr;
this.acks = acks;
this.nacks = nacks;
- this.outSock = outSock;
+ this.sock = sock;
this.log = Logger.getLogger(this.getClass().getName());
}
@Override
public void run() {
- try {
- for (;;) {
- try {
- Wait.forDuration(period);
- send();
- } catch (IOException e) {
- log.warning(e.getMessage());
- }
+ for (;;) {
+ try {
+ Wait.forDuration(period);
+ send();
+ } catch (InterruptedException | ClosedChannelException e) {
+ log.info("Shutting down.");
+ return;
+ } catch (IOException e) {
+ log.warning(e.getMessage());
}
- } catch (InterruptedException e) {
- log.info("Interrupted. Shutting down.");
}
}
@@ -56,8 +56,8 @@ class Heartbeat implements Runnable {
laddr,
acks.toArray(new MessageID[0]),
nacks.toArray(new MessageID[0]));
- DatagramPacket pkt = Packet.encode(msg, group);
- outSock.send(pkt);
+ ByteBuffer buf = Serial.encode(msg);
+ sock.send(buf, group);
acks.clear();
}
}
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index c9e1acf..d39ccf1 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -1,15 +1,14 @@
package derms.net.rmulticast;
-import derms.net.ConcurrentMulticastSocket;
+import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;
+import derms.io.Serial;
import derms.net.MessagePayload;
-import derms.net.Packet;
import java.io.IOException;
-import java.net.DatagramPacket;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.time.Duration;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -17,9 +16,8 @@ import java.util.logging.Logger;
class Receive<T extends MessagePayload> implements Runnable {
private static final int bufSize = 8192;
- private static final Duration sockTimeout = Duration.ofMillis(500);
- private final ConcurrentMulticastSocket inSock;
+ private final DatagramChannel sock;
private final Set<MessageID> acks; // Positively acknowledged messages.
private final Set<MessageID> nacks; // Negatively acknowledged messages.
private final ReceivedSet<T> received;
@@ -28,36 +26,36 @@ class Receive<T extends MessagePayload> implements Runnable {
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
- Receive(InetSocketAddress group, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException {
- this.inSock = new ConcurrentMulticastSocket(group.getPort());
- this.inSock.joinGroup(group.getAddress());
- this.inSock.setSoTimeout(sockTimeout);
-
+ Receive(DatagramChannel sock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException {
this.acks = acks;
this.nacks = nacks;
this.received = received;
this.retransmissions = retransmissions;
this.groupMembers = groupMembers;
+ this.sock = sock;
this.delivered = delivered;
this.log = Logger.getLogger(this.getClass().getName());
}
@Override
public void run() {
- log.info("Listening on " + inSock);
- DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
+ try {
+ log.info("Listening on " + sock.getLocalAddress());
+ } catch (IOException e) {
+ log.warning(e.getMessage());
+ }
+
+ ByteBuffer buf = ByteBuffer.allocate(bufSize);
for (;;) {
+ buf.clear();
try {
- inSock.receive(pkt);
- Message<?> msg = Packet.decode(pkt, Message.class);
+ sock.receive(buf);
+ Message<?> msg = Serial.decode(buf, Message.class);
receive(msg);
log.info("Received " + msg);
- } catch (SocketTimeoutException e) {
- if (Thread.interrupted()) {
- log.info("Interrupted. Shutting down.");
- inSock.close();
- return;
- }
+ } catch (ClosedChannelException e) {
+ log.info("Shutting down.");
+ return;
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
}
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index c23baaa..b194c96 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -1,12 +1,16 @@
package derms.net.rmulticast;
+import derms.io.Serial;
import derms.net.ConcurrentMulticastSocket;
import derms.net.MessagePayload;
+import derms.net.Net;
import derms.net.Packet;
import derms.util.ThreadPool;
import java.io.IOException;
import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.*;
@@ -26,7 +30,7 @@ public class ReliableMulticast<T extends MessagePayload> {
private final ReceivedSet<T> received;
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
private final Set<InetAddress> groupMembers;
- private final ConcurrentMulticastSocket outSock;
+ private final DatagramChannel sock;
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
private final ExecutorService pool;
@@ -47,24 +51,28 @@ public class ReliableMulticast<T extends MessagePayload> {
this.retransmissions = new LinkedBlockingQueue<Message<T>>();
this.groupMembers = ConcurrentHashMap.newKeySet();
- this.outSock = new ConcurrentMulticastSocket();
- this.outSock.joinGroup(group.getAddress());
+ NetworkInterface ifs = Net.getMulticastInterface();
+ this.sock = DatagramChannel.open(StandardProtocolFamily.INET)
+ .setOption(StandardSocketOptions.SO_REUSEADDR, true)
+ .bind(new InetSocketAddress(group.getAddress(), group.getPort()))
+ .setOption(StandardSocketOptions.IP_MULTICAST_IF, ifs);
+ sock.join(group.getAddress(), ifs);
this.delivered = new LinkedBlockingQueue<Message<T>>();
this.log = Logger.getLogger(this.getClass().getName());
this.pool = Executors.newCachedThreadPool();
- pool.execute(new Receive<T>(group, acks, nacks, received, retransmissions, groupMembers, delivered));
- pool.execute(new Retransmit<T>(retransmissions, outSock, group));
+ pool.execute(new Receive<T>(sock, acks, nacks, received, retransmissions, groupMembers, delivered));
+ pool.execute(new Retransmit<T>(retransmissions, sock, group));
pool.execute(new Prune<T>(received, groupMembers));
- pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock));
+ pool.execute(new Heartbeat(group, laddr, acks, nacks, sock));
}
- public void close() {
+ public void close() throws IOException {
log.info("Shutting down...");
+ sock.close();
ThreadPool.shutdownNow(pool, log);
- outSock.close();
log.info("Finished shutting down.");
}
@@ -75,11 +83,11 @@ public class ReliableMulticast<T extends MessagePayload> {
laddr,
acks.toArray(new MessageID[0]),
nacks.toArray(new MessageID[0]));
- DatagramPacket pkt = Packet.encode(msg, group);
- outSock.send(pkt);
+ ByteBuffer buf = Serial.encode(msg);
+ sock.send(buf, group);
acks.clear();
(new Thread(new Timeout<T>(msg, acks, retransmissions))).start();
- log.info("Sent " + msg + " from " + outSock + " to " + group);
+ log.info("Sent " + msg + " from " + sock.getLocalAddress() + " to " + group);
}
/** Receive a message from the group, blocking if necessary until a message arrives. */
diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java
index 9d5e10b..0d2a604 100644
--- a/src/main/java/derms/net/rmulticast/Retransmit.java
+++ b/src/main/java/derms/net/rmulticast/Retransmit.java
@@ -1,43 +1,44 @@
package derms.net.rmulticast;
-import derms.net.ConcurrentMulticastSocket;
+import derms.io.Serial;
import derms.net.MessagePayload;
-import derms.net.Packet;
-import java.net.DatagramPacket;
+import java.io.IOException;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
/** Retransmit dropped messages. */
class Retransmit<T extends MessagePayload> implements Runnable {
private final BlockingQueue<Message<T>> retransmissions;
- private final ConcurrentMulticastSocket outSock;
+ private final DatagramChannel sock;
private final SocketAddress group;
private final Logger log;
- Retransmit(BlockingQueue<Message<T>> retransmissions, ConcurrentMulticastSocket outSock, SocketAddress group) {
+ Retransmit(BlockingQueue<Message<T>> retransmissions, DatagramChannel sock, SocketAddress group) {
this.retransmissions = retransmissions;
- this.outSock = outSock;
+ this.sock = sock;
this.group = group;
this.log = Logger.getLogger(this.getClass().getName());
}
@Override
public void run() {
- try {
- for (; ; ) {
+ for (;;) {
+ try {
Message<T> msg = retransmissions.take();
- try {
- DatagramPacket pkt = Packet.encode(msg, group);
- outSock.send(pkt);
- log.info("Retransmitted " + msg);
- } catch (Exception e) {
- log.warning(e.getMessage());
- }
+ ByteBuffer buf = Serial.encode(msg);
+ sock.send(buf, group);
+ log.info("Retransmitted " + msg);
+ } catch (InterruptedException | ClosedChannelException e) {
+ log.info("Shutting down.");
+ return;
+ } catch (IOException e) {
+ log.warning(e.getMessage());
}
- } catch (InterruptedException e) {
- log.info("Interrupted. Shutting down.");
}
}
}
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java
index aad0a0b..34c29bc 100644
--- a/src/main/java/derms/net/tomulticast/Receive.java
+++ b/src/main/java/derms/net/tomulticast/Receive.java
@@ -5,6 +5,7 @@ import derms.net.MessagePayload;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -32,7 +33,7 @@ class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implement
tryDeliver();
}
} catch (InterruptedException e) {
- close();
+ log.info("Shutting down.");
}
}
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
index d2f7e61..0d8b690 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
@@ -37,7 +37,7 @@ public abstract class TotalOrderMulticast<T extends MessagePayload> {
}
/** Close the underlying socket. */
- public void close() {
+ public void close() throws IOException {
sock.close();
}
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
index 83f2da5..549af16 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
@@ -1,6 +1,7 @@
package derms.net.tomulticast;
import derms.net.MessagePayload;
+import derms.util.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
@@ -16,6 +17,7 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> {
private final BlockingQueue<Message<T>> deliver;
private final Logger log;
private final ExecutorService pool;
+ private final Receive<T> receiver;
/**
* Join the specified totally-ordered multicast group as a receiver.
@@ -28,20 +30,14 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> {
this.log = Logger.getLogger(getClass().getName());
this.pool = Executors.newSingleThreadExecutor();
- pool.execute(new Receive<T>(group, laddr, deliver));
+ this.receiver = new Receive<T>(group, laddr, deliver);
+ pool.execute(receiver);
}
/** Close the underlying socket. */
- public void close() {
- pool.shutdownNow();
- try {
- if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
- log.warning("Thread pool did not terminate after " + terminationTimeout);
- } catch (InterruptedException e) {
- log.warning("Interrupted while terminating thread pool: " + e.getMessage());
- // Preserve interrupt status.
- Thread.currentThread().interrupt();
- }
+ public void close() throws IOException {
+ receiver.close();
+ ThreadPool.shutdown(pool, log);
}
/** Receive a message from the group, blocking if necessary until one arrives. */