summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/Receive.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-16 14:28:13 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-16 14:28:13 -0500
commit05c4e2b5770133228daff7c262945f078a4e4456 (patch)
treedb2252c3f7209baafc5a43edc7d2eced7b294b2f /src/main/java/derms/net/rmulticast/Receive.java
parentd1406ab917339aa1531060da2c91043790f66d16 (diff)
downloadsoen423-05c4e2b5770133228daff7c262945f078a4e4456.zip
ReliableMulticast.close()
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Receive.java')
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java17
1 files changed, 15 insertions, 2 deletions
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index 922f357..bf583dd 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -6,6 +6,9 @@ import derms.net.Packet;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -13,6 +16,7 @@ import java.util.logging.Logger;
class Receive<T extends MessagePayload> implements Runnable {
private static final int bufSize = 8192;
+ private static final Duration sockTimeout = Duration.ofMillis(500);
private final ConcurrentMulticastSocket inSock;
private final Set<MessageID> acks;
@@ -23,8 +27,11 @@ class Receive<T extends MessagePayload> implements Runnable {
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
- Receive(ConcurrentMulticastSocket inSock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) {
- this.inSock = inSock;
+ Receive(InetSocketAddress group, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException {
+ this.inSock = new ConcurrentMulticastSocket(group.getPort());
+ this.inSock.joinGroup(group.getAddress());
+ this.inSock.setSoTimeout(sockTimeout);
+
this.acks = acks;
this.nacks = nacks;
this.received = received;
@@ -44,6 +51,12 @@ class Receive<T extends MessagePayload> implements Runnable {
Message<?> msg = Packet.decode(pkt, Message.class);
receive(msg);
log.info("Received " + msg);
+ } catch (SocketTimeoutException e) {
+ if (Thread.interrupted()) {
+ log.info("Interrupted. Shutting down.");
+ inSock.close();
+ return;
+ }
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
}