summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/ReliableMulticast.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-23 13:12:29 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-23 13:12:29 -0500
commit1305d223fafc79be7fad07aee1f4348b2eaeaae5 (patch)
treedcd9b2377048573fa092488a5a7612f712183753 /src/main/java/derms/net/rmulticast/ReliableMulticast.java
parent171c58d8ffb29c08ce55d789f0cc1b593c7f5e86 (diff)
downloadsoen423-1305d223fafc79be7fad07aee1f4348b2eaeaae5.zip
rmulticast: use DatagramChannel
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java30
1 files changed, 19 insertions, 11 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index c23baaa..b194c96 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -1,12 +1,16 @@
package derms.net.rmulticast;
+import derms.io.Serial;
import derms.net.ConcurrentMulticastSocket;
import derms.net.MessagePayload;
+import derms.net.Net;
import derms.net.Packet;
import derms.util.ThreadPool;
import java.io.IOException;
import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.*;
@@ -26,7 +30,7 @@ public class ReliableMulticast<T extends MessagePayload> {
private final ReceivedSet<T> received;
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
private final Set<InetAddress> groupMembers;
- private final ConcurrentMulticastSocket outSock;
+ private final DatagramChannel sock;
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
private final ExecutorService pool;
@@ -47,24 +51,28 @@ public class ReliableMulticast<T extends MessagePayload> {
this.retransmissions = new LinkedBlockingQueue<Message<T>>();
this.groupMembers = ConcurrentHashMap.newKeySet();
- this.outSock = new ConcurrentMulticastSocket();
- this.outSock.joinGroup(group.getAddress());
+ NetworkInterface ifs = Net.getMulticastInterface();
+ this.sock = DatagramChannel.open(StandardProtocolFamily.INET)
+ .setOption(StandardSocketOptions.SO_REUSEADDR, true)
+ .bind(new InetSocketAddress(group.getAddress(), group.getPort()))
+ .setOption(StandardSocketOptions.IP_MULTICAST_IF, ifs);
+ sock.join(group.getAddress(), ifs);
this.delivered = new LinkedBlockingQueue<Message<T>>();
this.log = Logger.getLogger(this.getClass().getName());
this.pool = Executors.newCachedThreadPool();
- pool.execute(new Receive<T>(group, acks, nacks, received, retransmissions, groupMembers, delivered));
- pool.execute(new Retransmit<T>(retransmissions, outSock, group));
+ pool.execute(new Receive<T>(sock, acks, nacks, received, retransmissions, groupMembers, delivered));
+ pool.execute(new Retransmit<T>(retransmissions, sock, group));
pool.execute(new Prune<T>(received, groupMembers));
- pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock));
+ pool.execute(new Heartbeat(group, laddr, acks, nacks, sock));
}
- public void close() {
+ public void close() throws IOException {
log.info("Shutting down...");
+ sock.close();
ThreadPool.shutdownNow(pool, log);
- outSock.close();
log.info("Finished shutting down.");
}
@@ -75,11 +83,11 @@ public class ReliableMulticast<T extends MessagePayload> {
laddr,
acks.toArray(new MessageID[0]),
nacks.toArray(new MessageID[0]));
- DatagramPacket pkt = Packet.encode(msg, group);
- outSock.send(pkt);
+ ByteBuffer buf = Serial.encode(msg);
+ sock.send(buf, group);
acks.clear();
(new Thread(new Timeout<T>(msg, acks, retransmissions))).start();
- log.info("Sent " + msg + " from " + outSock + " to " + group);
+ log.info("Sent " + msg + " from " + sock.getLocalAddress() + " to " + group);
}
/** Receive a message from the group, blocking if necessary until a message arrives. */