summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/runicast/Receive.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-23 11:34:42 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-23 11:34:42 -0500
commitd5a1ec8b54c1c3c516d07f1916276cd6e5a937e4 (patch)
tree34f4ed7975803f573d16a7215ae39a9b2791a9b9 /src/main/java/derms/net/runicast/Receive.java
parente3df4a078afd37314d330daa2de0883f8dd1811b (diff)
downloadsoen423-d5a1ec8b54c1c3c516d07f1916276cd6e5a937e4.zip
runicast: use DatagramChannel
Diffstat (limited to 'src/main/java/derms/net/runicast/Receive.java')
-rw-r--r--src/main/java/derms/net/runicast/Receive.java32
1 files changed, 17 insertions, 15 deletions
diff --git a/src/main/java/derms/net/runicast/Receive.java b/src/main/java/derms/net/runicast/Receive.java
index 584861b..8620ebd 100644
--- a/src/main/java/derms/net/runicast/Receive.java
+++ b/src/main/java/derms/net/runicast/Receive.java
@@ -1,11 +1,13 @@
package derms.net.runicast;
-import derms.net.ConcurrentDatagramSocket;
+import derms.io.Serial;
import derms.net.MessagePayload;
-import derms.net.Packet;
import java.io.IOException;
import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.DatagramChannel;
import java.util.Queue;
import java.util.logging.Logger;
@@ -13,11 +15,11 @@ class Receive<T extends MessagePayload> implements Runnable {
private static final int bufSize = 8192;
private long seq; // Sequence number.
- private final ConcurrentDatagramSocket sock;
+ private final DatagramChannel sock;
private final Queue<T> delivered;
private final Logger log;
- Receive(ConcurrentDatagramSocket sock, Queue<T> delivered) {
+ Receive(DatagramChannel sock, Queue<T> delivered) {
this.seq = 0;
this.sock = sock;
this.delivered = delivered;
@@ -26,18 +28,15 @@ class Receive<T extends MessagePayload> implements Runnable {
@Override
public void run() {
- DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
for (;;) {
+ ByteBuffer buf = ByteBuffer.allocate(bufSize);
try {
- sock.receive(pkt);
- Message<T> msg = (Message<T>) Packet.decode(pkt, Message.class);
- SocketAddress sender = pkt.getSocketAddress();
+ SocketAddress sender = sock.receive(buf);
+ Message<T> msg = (Message<T>) Serial.decode(buf, Message.class);
recv(msg, sender);
- } catch (SocketTimeoutException e) {
- if (Thread.interrupted()) {
- log.info("Interrupted");
- return;
- }
+ } catch (ClosedChannelException e) {
+ log.info("Shutting down.");
+ return;
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
}
@@ -45,16 +44,19 @@ class Receive<T extends MessagePayload> implements Runnable {
}
private void recv(Message<T> msg, SocketAddress sender) throws IOException {
+ log.info("Received " + msg);
if (msg.seq == seq) {
delivered.add(msg.payload);
+ log.info("Delivered " + msg);
ack(msg, sender);
+ log.info("Acked " + msg);
seq++;
}
}
private void ack(Message<T> msg, SocketAddress sender) throws IOException {
Ack ack = new Ack(msg.seq);
- DatagramPacket pkt = Packet.encode(ack, sender);
- sock.send(pkt);
+ ByteBuffer buf = Serial.encode(ack);
+ sock.send(buf, sender);
}
}