From d5a1ec8b54c1c3c516d07f1916276cd6e5a937e4 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 23 Nov 2024 11:34:42 -0500 Subject: runicast: use DatagramChannel --- src/main/java/derms/net/runicast/Receive.java | 32 ++++++++++--------- src/main/java/derms/net/runicast/ReceiveAcks.java | 27 ++++++++-------- .../net/runicast/ReliableUnicastReceiver.java | 14 ++++----- .../derms/net/runicast/ReliableUnicastSender.java | 36 +++++++++++----------- src/main/java/derms/net/runicast/Retransmit.java | 21 +++++++------ 5 files changed, 65 insertions(+), 65 deletions(-) (limited to 'src/main/java/derms/net/runicast') diff --git a/src/main/java/derms/net/runicast/Receive.java b/src/main/java/derms/net/runicast/Receive.java index 584861b..8620ebd 100644 --- a/src/main/java/derms/net/runicast/Receive.java +++ b/src/main/java/derms/net/runicast/Receive.java @@ -1,11 +1,13 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import java.io.IOException; import java.net.*; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.logging.Logger; @@ -13,11 +15,11 @@ class Receive implements Runnable { private static final int bufSize = 8192; private long seq; // Sequence number. - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final Queue delivered; private final Logger log; - Receive(ConcurrentDatagramSocket sock, Queue delivered) { + Receive(DatagramChannel sock, Queue delivered) { this.seq = 0; this.sock = sock; this.delivered = delivered; @@ -26,18 +28,15 @@ class Receive implements Runnable { @Override public void run() { - DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); for (;;) { + ByteBuffer buf = ByteBuffer.allocate(bufSize); try { - sock.receive(pkt); - Message msg = (Message) Packet.decode(pkt, Message.class); - SocketAddress sender = pkt.getSocketAddress(); + SocketAddress sender = sock.receive(buf); + Message msg = (Message) Serial.decode(buf, Message.class); recv(msg, sender); - } catch (SocketTimeoutException e) { - if (Thread.interrupted()) { - log.info("Interrupted"); - return; - } + } catch (ClosedChannelException e) { + log.info("Shutting down."); + return; } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } @@ -45,16 +44,19 @@ class Receive implements Runnable { } private void recv(Message msg, SocketAddress sender) throws IOException { + log.info("Received " + msg); if (msg.seq == seq) { delivered.add(msg.payload); + log.info("Delivered " + msg); ack(msg, sender); + log.info("Acked " + msg); seq++; } } private void ack(Message msg, SocketAddress sender) throws IOException { Ack ack = new Ack(msg.seq); - DatagramPacket pkt = Packet.encode(ack, sender); - sock.send(pkt); + ByteBuffer buf = Serial.encode(ack); + sock.send(buf, sender); } } diff --git a/src/main/java/derms/net/runicast/ReceiveAcks.java b/src/main/java/derms/net/runicast/ReceiveAcks.java index 0f585ff..9d7b7de 100644 --- a/src/main/java/derms/net/runicast/ReceiveAcks.java +++ b/src/main/java/derms/net/runicast/ReceiveAcks.java @@ -1,12 +1,12 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -17,10 +17,10 @@ class ReceiveAcks implements Runnable { private final AtomicLong unacked; private final Queue> sent; - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final Logger log; - ReceiveAcks(AtomicLong unacked, Queue> sent, ConcurrentDatagramSocket sock) { + ReceiveAcks(AtomicLong unacked, Queue> sent, DatagramChannel sock) { this.unacked = unacked; this.sent = sent; this.sock = sock; @@ -29,17 +29,15 @@ class ReceiveAcks implements Runnable { @Override public void run() { - DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); for (;;) { + ByteBuffer buf = ByteBuffer.allocate(bufSize); try { - sock.receive(pkt); - Ack ack = Packet.decode(pkt, Ack.class); + sock.receive(buf); + Ack ack = Serial.decode(buf, Ack.class); recvAck(ack.seq); - } catch (SocketTimeoutException e) { - if (Thread.interrupted()) { - log.info("Interrupted."); - return; - } + } catch (ClosedChannelException e) { + log.info("Shutting down."); + return; } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); } @@ -47,6 +45,7 @@ class ReceiveAcks implements Runnable { } private void recvAck(long ack) { + log.info("Received ack: " + ack); unacked.updateAndGet((unacked) -> { if (ack >= unacked) return ack+1; diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java index 81e3502..ff0b72a 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java @@ -1,20 +1,18 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; import derms.net.MessagePayload; import derms.util.ThreadPool; import java.io.IOException; import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.concurrent.*; import java.util.logging.Logger; /** The receiving end of a reliable unicast connection. */ public class ReliableUnicastReceiver { - private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. - - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final BlockingQueue delivered; private final Logger log; private final ExecutorService pool; @@ -25,18 +23,18 @@ public class ReliableUnicastReceiver { * @param laddr The local IP address and port to listen on. */ public ReliableUnicastReceiver(SocketAddress laddr) throws IOException { - this.sock = new ConcurrentDatagramSocket(laddr); - this.sock.setSoTimeout(soTimeout); + this.sock = DatagramChannel.open(); + sock.bind(laddr); this.delivered = new LinkedBlockingQueue(); this.log = Logger.getLogger(getClass().getName()); this.pool = Executors.newCachedThreadPool(); pool.execute(new Receive(sock, delivered)); } - public void close() { + public void close() throws IOException { log.info("Shutting down"); - ThreadPool.shutDown(pool, log); sock.close(); + ThreadPool.shutdown(pool, log); } /** Receive a message, blocking if necessary until one arrives. */ diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java index 83408d5..1f3c5d4 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastSender.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java @@ -1,14 +1,13 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import derms.util.ThreadPool; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.time.Duration; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -18,12 +17,10 @@ import java.util.logging.Logger; /** The sending end of a reliable unicast connection. */ public class ReliableUnicastSender { - private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. - private final AtomicLong next; // Next sequence number. private final AtomicLong unacked; // Sequence number of first unacknowledged message. private final Queue> sent; - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final Logger log; private final ExecutorService pool; @@ -32,13 +29,12 @@ public class ReliableUnicastSender { * * @param raddr The remote IP address to connect to. */ - public ReliableUnicastSender(InetSocketAddress raddr) throws IOException { + public ReliableUnicastSender(SocketAddress raddr) throws IOException { this.next = new AtomicLong(0); this.unacked = new AtomicLong(0); this.sent = new LinkedBlockingQueue>(); - this.sock = new ConcurrentDatagramSocket(); - this.sock.connect(raddr); - this.sock.setSoTimeout(soTimeout); + this.sock = DatagramChannel.open(); + sock.connect(raddr); this.log = Logger.getLogger(getClass().getName()); this.pool = Executors.newCachedThreadPool(); pool.execute(new ReceiveAcks(unacked, sent, sock)); @@ -47,28 +43,32 @@ public class ReliableUnicastSender { public void send(T payload) throws IOException { Message msg = new Message(next.get(), payload); - DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); - sock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, sock.getRemoteAddress()); sent.add(msg); next.incrementAndGet(); + log.info("Sent " + msg); } /** Wait for all messages to be acknowledged and close the connection. */ - public void close() throws InterruptedException { + public void close() throws InterruptedException, IOException { // Wait for receiver to acknowledge all sent messages. + log.info("Waiting for acknowledgements..."); while (unacked.get() < next.get()) { Thread.yield(); if (Thread.interrupted()) throw new InterruptedException(); } - closeNow(); + log.info("Shutting down."); + sock.close(); + ThreadPool.shutdown(pool, log); } /** Close the connection immediately, without waiting for acknowledgements. */ - public void closeNow() { + public void closeNow() throws IOException { log.info("Shutting down."); - ThreadPool.shutDown(pool, log); sock.close(); + ThreadPool.shutdownNow(pool, log); } } diff --git a/src/main/java/derms/net/runicast/Retransmit.java b/src/main/java/derms/net/runicast/Retransmit.java index affd00c..16f8859 100644 --- a/src/main/java/derms/net/runicast/Retransmit.java +++ b/src/main/java/derms/net/runicast/Retransmit.java @@ -1,12 +1,13 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import derms.util.Wait; import java.io.IOException; -import java.net.DatagramPacket; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.DatagramChannel; import java.time.Duration; import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; @@ -18,10 +19,10 @@ class Retransmit implements Runnable { private final AtomicLong unacked; private final Queue> sent; - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final Logger log; - Retransmit(AtomicLong unacked, Queue> sent, ConcurrentDatagramSocket sock) { + Retransmit(AtomicLong unacked, Queue> sent, DatagramChannel sock) { this.unacked = unacked; this.sent = sent; this.sock = sock; @@ -40,15 +41,15 @@ class Retransmit implements Runnable { } } } - } catch (InterruptedException e) { - log.info("Interrupted."); + } catch (InterruptedException | ClosedChannelException e) { + log.info("Shutting down."); } } - private void retransmit(Message msg) { + private void retransmit(Message msg) throws ClosedChannelException { try { - DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); - sock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, sock.getRemoteAddress()); log.info("Retransmitted " + msg); } catch (IOException e) { log.warning("Failed to retransmit " + msg + ": " + e.getMessage()); -- cgit v1.2.3