diff options
Diffstat (limited to 'src/main/java/derms/net/runicast/Receive.java')
| -rw-r--r-- | src/main/java/derms/net/runicast/Receive.java | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/src/main/java/derms/net/runicast/Receive.java b/src/main/java/derms/net/runicast/Receive.java index 584861b..8620ebd 100644 --- a/src/main/java/derms/net/runicast/Receive.java +++ b/src/main/java/derms/net/runicast/Receive.java @@ -1,11 +1,13 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.logging.Logger; @@ -13,11 +15,11 @@ class Receive<T extends MessagePayload> implements Runnable { private static final int bufSize = 8192; private long seq; // Sequence number. - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final Queue<T> delivered; private final Logger log; - Receive(ConcurrentDatagramSocket sock, Queue<T> delivered) { + Receive(DatagramChannel sock, Queue<T> delivered) { this.seq = 0; this.sock = sock; this.delivered = delivered; @@ -26,18 +28,15 @@ class Receive<T extends MessagePayload> implements Runnable { @Override public void run() { - DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); for (;;) { + ByteBuffer buf = ByteBuffer.allocate(bufSize); try { - sock.receive(pkt); - Message<T> msg = (Message<T>) Packet.decode(pkt, Message.class); - SocketAddress sender = pkt.getSocketAddress(); + SocketAddress sender = sock.receive(buf); + Message<T> msg = (Message<T>) Serial.decode(buf, Message.class); recv(msg, sender); - } catch (SocketTimeoutException e) { - if (Thread.interrupted()) { - log.info("Interrupted"); - return; - } + } catch (ClosedChannelException e) { + log.info("Shutting down."); + return; } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } @@ -45,16 +44,19 @@ class Receive<T extends MessagePayload> implements Runnable { } private void recv(Message<T> msg, SocketAddress sender) throws IOException { + log.info("Received " + msg); if (msg.seq == seq) { delivered.add(msg.payload); + log.info("Delivered " + msg); ack(msg, sender); + log.info("Acked " + msg); seq++; } } private void ack(Message<T> msg, SocketAddress sender) throws IOException { Ack ack = new Ack(msg.seq); - DatagramPacket pkt = Packet.encode(ack, sender); - sock.send(pkt); + ByteBuffer buf = Serial.encode(ack); + sock.send(buf, sender); } } |