summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/Receive.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Receive.java')
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java44
1 files changed, 21 insertions, 23 deletions
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index c9e1acf..d39ccf1 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -1,15 +1,14 @@
package derms.net.rmulticast;
-import derms.net.ConcurrentMulticastSocket;
+import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;
+import derms.io.Serial;
import derms.net.MessagePayload;
-import derms.net.Packet;
import java.io.IOException;
-import java.net.DatagramPacket;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.time.Duration;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -17,9 +16,8 @@ import java.util.logging.Logger;
class Receive<T extends MessagePayload> implements Runnable {
private static final int bufSize = 8192;
- private static final Duration sockTimeout = Duration.ofMillis(500);
- private final ConcurrentMulticastSocket inSock;
+ private final DatagramChannel sock;
private final Set<MessageID> acks; // Positively acknowledged messages.
private final Set<MessageID> nacks; // Negatively acknowledged messages.
private final ReceivedSet<T> received;
@@ -28,36 +26,36 @@ class Receive<T extends MessagePayload> implements Runnable {
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
- Receive(InetSocketAddress group, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException {
- this.inSock = new ConcurrentMulticastSocket(group.getPort());
- this.inSock.joinGroup(group.getAddress());
- this.inSock.setSoTimeout(sockTimeout);
-
+ Receive(DatagramChannel sock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException {
this.acks = acks;
this.nacks = nacks;
this.received = received;
this.retransmissions = retransmissions;
this.groupMembers = groupMembers;
+ this.sock = sock;
this.delivered = delivered;
this.log = Logger.getLogger(this.getClass().getName());
}
@Override
public void run() {
- log.info("Listening on " + inSock);
- DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
+ try {
+ log.info("Listening on " + sock.getLocalAddress());
+ } catch (IOException e) {
+ log.warning(e.getMessage());
+ }
+
+ ByteBuffer buf = ByteBuffer.allocate(bufSize);
for (;;) {
+ buf.clear();
try {
- inSock.receive(pkt);
- Message<?> msg = Packet.decode(pkt, Message.class);
+ sock.receive(buf);
+ Message<?> msg = Serial.decode(buf, Message.class);
receive(msg);
log.info("Received " + msg);
- } catch (SocketTimeoutException e) {
- if (Thread.interrupted()) {
- log.info("Interrupted. Shutting down.");
- inSock.close();
- return;
- }
+ } catch (ClosedChannelException e) {
+ log.info("Shutting down.");
+ return;
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
}