diff options
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Receive.java')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Receive.java | 17 |
1 files changed, 15 insertions, 2 deletions
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 922f357..bf583dd 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -6,6 +6,9 @@ import derms.net.Packet; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.time.Duration; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -13,6 +16,7 @@ import java.util.logging.Logger; class Receive<T extends MessagePayload> implements Runnable { private static final int bufSize = 8192; + private static final Duration sockTimeout = Duration.ofMillis(500); private final ConcurrentMulticastSocket inSock; private final Set<MessageID> acks; @@ -23,8 +27,11 @@ class Receive<T extends MessagePayload> implements Runnable { private final BlockingQueue<Message<T>> delivered; private final Logger log; - Receive(ConcurrentMulticastSocket inSock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) { - this.inSock = inSock; + Receive(InetSocketAddress group, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException { + this.inSock = new ConcurrentMulticastSocket(group.getPort()); + this.inSock.joinGroup(group.getAddress()); + this.inSock.setSoTimeout(sockTimeout); + this.acks = acks; this.nacks = nacks; this.received = received; @@ -44,6 +51,12 @@ class Receive<T extends MessagePayload> implements Runnable { Message<?> msg = Packet.decode(pkt, Message.class); receive(msg); log.info("Received " + msg); + } catch (SocketTimeoutException e) { + if (Thread.interrupted()) { + log.info("Interrupted. Shutting down."); + inSock.close(); + return; + } } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } |