From 1305d223fafc79be7fad07aee1f4348b2eaeaae5 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 23 Nov 2024 13:12:29 -0500 Subject: rmulticast: use DatagramChannel --- src/main/java/derms/net/rmulticast/Receive.java | 44 ++++++++++++------------- 1 file changed, 21 insertions(+), 23 deletions(-) (limited to 'src/main/java/derms/net/rmulticast/Receive.java') diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index c9e1acf..d39ccf1 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -1,15 +1,14 @@ package derms.net.rmulticast; -import derms.net.ConcurrentMulticastSocket; +import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import java.io.IOException; -import java.net.DatagramPacket; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import java.time.Duration; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -17,9 +16,8 @@ import java.util.logging.Logger; class Receive implements Runnable { private static final int bufSize = 8192; - private static final Duration sockTimeout = Duration.ofMillis(500); - private final ConcurrentMulticastSocket inSock; + private final DatagramChannel sock; private final Set acks; // Positively acknowledged messages. private final Set nacks; // Negatively acknowledged messages. private final ReceivedSet received; @@ -28,36 +26,36 @@ class Receive implements Runnable { private final BlockingQueue> delivered; private final Logger log; - Receive(InetSocketAddress group, Set acks, Set nacks, ReceivedSet received, BlockingQueue> retransmissions, Set groupMembers, BlockingQueue> delivered) throws IOException { - this.inSock = new ConcurrentMulticastSocket(group.getPort()); - this.inSock.joinGroup(group.getAddress()); - this.inSock.setSoTimeout(sockTimeout); - + Receive(DatagramChannel sock, Set acks, Set nacks, ReceivedSet received, BlockingQueue> retransmissions, Set groupMembers, BlockingQueue> delivered) throws IOException { this.acks = acks; this.nacks = nacks; this.received = received; this.retransmissions = retransmissions; this.groupMembers = groupMembers; + this.sock = sock; this.delivered = delivered; this.log = Logger.getLogger(this.getClass().getName()); } @Override public void run() { - log.info("Listening on " + inSock); - DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + try { + log.info("Listening on " + sock.getLocalAddress()); + } catch (IOException e) { + log.warning(e.getMessage()); + } + + ByteBuffer buf = ByteBuffer.allocate(bufSize); for (;;) { + buf.clear(); try { - inSock.receive(pkt); - Message msg = Packet.decode(pkt, Message.class); + sock.receive(buf); + Message msg = Serial.decode(buf, Message.class); receive(msg); log.info("Received " + msg); - } catch (SocketTimeoutException e) { - if (Thread.interrupted()) { - log.info("Interrupted. Shutting down."); - inSock.close(); - return; - } + } catch (ClosedChannelException e) { + log.info("Shutting down."); + return; } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } -- cgit v1.2.3