diff options
Diffstat (limited to 'src/main/java/derms/net/runicast/ReceiveAcks.java')
| -rw-r--r-- | src/main/java/derms/net/runicast/ReceiveAcks.java | 27 |
1 files changed, 13 insertions, 14 deletions
diff --git a/src/main/java/derms/net/runicast/ReceiveAcks.java b/src/main/java/derms/net/runicast/ReceiveAcks.java index 0f585ff..9d7b7de 100644 --- a/src/main/java/derms/net/runicast/ReceiveAcks.java +++ b/src/main/java/derms/net/runicast/ReceiveAcks.java @@ -1,12 +1,12 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -17,10 +17,10 @@ class ReceiveAcks<T extends MessagePayload> implements Runnable { private final AtomicLong unacked; private final Queue<Message<T>> sent; - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final Logger log; - ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) { + ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, DatagramChannel sock) { this.unacked = unacked; this.sent = sent; this.sock = sock; @@ -29,17 +29,15 @@ class ReceiveAcks<T extends MessagePayload> implements Runnable { @Override public void run() { - DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); for (;;) { + ByteBuffer buf = ByteBuffer.allocate(bufSize); try { - sock.receive(pkt); - Ack ack = Packet.decode(pkt, Ack.class); + sock.receive(buf); + Ack ack = Serial.decode(buf, Ack.class); recvAck(ack.seq); - } catch (SocketTimeoutException e) { - if (Thread.interrupted()) { - log.info("Interrupted."); - return; - } + } catch (ClosedChannelException e) { + log.info("Shutting down."); + return; } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } @@ -47,6 +45,7 @@ class ReceiveAcks<T extends MessagePayload> implements Runnable { } private void recvAck(long ack) { + log.info("Received ack: " + ack); unacked.updateAndGet((unacked) -> { if (ack >= unacked) return ack+1; |