From 05c4e2b5770133228daff7c262945f078a4e4456 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 16 Nov 2024 14:28:13 -0500 Subject: ReliableMulticast.close() --- src/main/java/derms/net/rmulticast/Receive.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'src/main/java/derms/net/rmulticast/Receive.java') diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 922f357..bf583dd 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -6,6 +6,9 @@ import derms.net.Packet; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.time.Duration; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -13,6 +16,7 @@ import java.util.logging.Logger; class Receive implements Runnable { private static final int bufSize = 8192; + private static final Duration sockTimeout = Duration.ofMillis(500); private final ConcurrentMulticastSocket inSock; private final Set acks; @@ -23,8 +27,11 @@ class Receive implements Runnable { private final BlockingQueue> delivered; private final Logger log; - Receive(ConcurrentMulticastSocket inSock, Set acks, Set nacks, ReceivedSet received, BlockingQueue> retransmissions, Set groupMembers, BlockingQueue> delivered) { - this.inSock = inSock; + Receive(InetSocketAddress group, Set acks, Set nacks, ReceivedSet received, BlockingQueue> retransmissions, Set groupMembers, BlockingQueue> delivered) throws IOException { + this.inSock = new ConcurrentMulticastSocket(group.getPort()); + this.inSock.joinGroup(group.getAddress()); + this.inSock.setSoTimeout(sockTimeout); + this.acks = acks; this.nacks = nacks; this.received = received; @@ -44,6 +51,12 @@ class Receive implements Runnable { Message msg = Packet.decode(pkt, Message.class); receive(msg); log.info("Received " + msg); + } catch (SocketTimeoutException e) { + if (Thread.interrupted()) { + log.info("Interrupted. Shutting down."); + inSock.close(); + return; + } } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } -- cgit v1.2.3