summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/runicast/ReceiveAcks.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-23 11:34:42 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-23 11:34:42 -0500
commitd5a1ec8b54c1c3c516d07f1916276cd6e5a937e4 (patch)
tree34f4ed7975803f573d16a7215ae39a9b2791a9b9 /src/main/java/derms/net/runicast/ReceiveAcks.java
parente3df4a078afd37314d330daa2de0883f8dd1811b (diff)
downloadsoen423-d5a1ec8b54c1c3c516d07f1916276cd6e5a937e4.zip
runicast: use DatagramChannel
Diffstat (limited to 'src/main/java/derms/net/runicast/ReceiveAcks.java')
-rw-r--r--src/main/java/derms/net/runicast/ReceiveAcks.java27
1 files changed, 13 insertions, 14 deletions
diff --git a/src/main/java/derms/net/runicast/ReceiveAcks.java b/src/main/java/derms/net/runicast/ReceiveAcks.java
index 0f585ff..9d7b7de 100644
--- a/src/main/java/derms/net/runicast/ReceiveAcks.java
+++ b/src/main/java/derms/net/runicast/ReceiveAcks.java
@@ -1,12 +1,12 @@
package derms.net.runicast;
-import derms.net.ConcurrentDatagramSocket;
+import derms.io.Serial;
import derms.net.MessagePayload;
-import derms.net.Packet;
import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -17,10 +17,10 @@ class ReceiveAcks<T extends MessagePayload> implements Runnable {
private final AtomicLong unacked;
private final Queue<Message<T>> sent;
- private final ConcurrentDatagramSocket sock;
+ private final DatagramChannel sock;
private final Logger log;
- ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) {
+ ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, DatagramChannel sock) {
this.unacked = unacked;
this.sent = sent;
this.sock = sock;
@@ -29,17 +29,15 @@ class ReceiveAcks<T extends MessagePayload> implements Runnable {
@Override
public void run() {
- DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
for (;;) {
+ ByteBuffer buf = ByteBuffer.allocate(bufSize);
try {
- sock.receive(pkt);
- Ack ack = Packet.decode(pkt, Ack.class);
+ sock.receive(buf);
+ Ack ack = Serial.decode(buf, Ack.class);
recvAck(ack.seq);
- } catch (SocketTimeoutException e) {
- if (Thread.interrupted()) {
- log.info("Interrupted.");
- return;
- }
+ } catch (ClosedChannelException e) {
+ log.info("Shutting down.");
+ return;
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
}
@@ -47,6 +45,7 @@ class ReceiveAcks<T extends MessagePayload> implements Runnable {
}
private void recvAck(long ack) {
+ log.info("Received ack: " + ack);
unacked.updateAndGet((unacked) -> {
if (ack >= unacked)
return ack+1;