diff options
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index c23baaa..b194c96 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -1,12 +1,16 @@ package derms.net.rmulticast; +import derms.io.Serial; import derms.net.ConcurrentMulticastSocket; import derms.net.MessagePayload; +import derms.net.Net; import derms.net.Packet; import derms.util.ThreadPool; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.Set; import java.util.concurrent.*; @@ -26,7 +30,7 @@ public class ReliableMulticast<T extends MessagePayload> { private final ReceivedSet<T> received; private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final Set<InetAddress> groupMembers; - private final ConcurrentMulticastSocket outSock; + private final DatagramChannel sock; private final BlockingQueue<Message<T>> delivered; private final Logger log; private final ExecutorService pool; @@ -47,24 +51,28 @@ public class ReliableMulticast<T extends MessagePayload> { this.retransmissions = new LinkedBlockingQueue<Message<T>>(); this.groupMembers = ConcurrentHashMap.newKeySet(); - this.outSock = new ConcurrentMulticastSocket(); - this.outSock.joinGroup(group.getAddress()); + NetworkInterface ifs = Net.getMulticastInterface(); + this.sock = DatagramChannel.open(StandardProtocolFamily.INET) + .setOption(StandardSocketOptions.SO_REUSEADDR, true) + .bind(new InetSocketAddress(group.getAddress(), group.getPort())) + .setOption(StandardSocketOptions.IP_MULTICAST_IF, ifs); + sock.join(group.getAddress(), ifs); this.delivered = new LinkedBlockingQueue<Message<T>>(); this.log = Logger.getLogger(this.getClass().getName()); this.pool = Executors.newCachedThreadPool(); - pool.execute(new Receive<T>(group, acks, nacks, received, retransmissions, groupMembers, delivered)); - pool.execute(new Retransmit<T>(retransmissions, outSock, group)); + pool.execute(new Receive<T>(sock, acks, nacks, received, retransmissions, groupMembers, delivered)); + pool.execute(new Retransmit<T>(retransmissions, sock, group)); pool.execute(new Prune<T>(received, groupMembers)); - pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock)); + pool.execute(new Heartbeat(group, laddr, acks, nacks, sock)); } - public void close() { + public void close() throws IOException { log.info("Shutting down..."); + sock.close(); ThreadPool.shutdownNow(pool, log); - outSock.close(); log.info("Finished shutting down."); } @@ -75,11 +83,11 @@ public class ReliableMulticast<T extends MessagePayload> { laddr, acks.toArray(new MessageID[0]), nacks.toArray(new MessageID[0])); - DatagramPacket pkt = Packet.encode(msg, group); - outSock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, group); acks.clear(); (new Thread(new Timeout<T>(msg, acks, retransmissions))).start(); - log.info("Sent " + msg + " from " + outSock + " to " + group); + log.info("Sent " + msg + " from " + sock.getLocalAddress() + " to " + group); } /** Receive a message from the group, blocking if necessary until a message arrives. */ |