From d5a1ec8b54c1c3c516d07f1916276cd6e5a937e4 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 23 Nov 2024 11:34:42 -0500 Subject: runicast: use DatagramChannel --- .../derms/net/runicast/ReliableUnicastSender.java | 36 +++++++++++----------- 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'src/main/java/derms/net/runicast/ReliableUnicastSender.java') diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java index 83408d5..1f3c5d4 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastSender.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java @@ -1,14 +1,13 @@ package derms.net.runicast; -import derms.net.ConcurrentDatagramSocket; +import derms.io.Serial; import derms.net.MessagePayload; -import derms.net.Packet; import derms.util.ThreadPool; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.time.Duration; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -18,12 +17,10 @@ import java.util.logging.Logger; /** The sending end of a reliable unicast connection. */ public class ReliableUnicastSender { - private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. - private final AtomicLong next; // Next sequence number. private final AtomicLong unacked; // Sequence number of first unacknowledged message. private final Queue> sent; - private final ConcurrentDatagramSocket sock; + private final DatagramChannel sock; private final Logger log; private final ExecutorService pool; @@ -32,13 +29,12 @@ public class ReliableUnicastSender { * * @param raddr The remote IP address to connect to. */ - public ReliableUnicastSender(InetSocketAddress raddr) throws IOException { + public ReliableUnicastSender(SocketAddress raddr) throws IOException { this.next = new AtomicLong(0); this.unacked = new AtomicLong(0); this.sent = new LinkedBlockingQueue>(); - this.sock = new ConcurrentDatagramSocket(); - this.sock.connect(raddr); - this.sock.setSoTimeout(soTimeout); + this.sock = DatagramChannel.open(); + sock.connect(raddr); this.log = Logger.getLogger(getClass().getName()); this.pool = Executors.newCachedThreadPool(); pool.execute(new ReceiveAcks(unacked, sent, sock)); @@ -47,28 +43,32 @@ public class ReliableUnicastSender { public void send(T payload) throws IOException { Message msg = new Message(next.get(), payload); - DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); - sock.send(pkt); + ByteBuffer buf = Serial.encode(msg); + sock.send(buf, sock.getRemoteAddress()); sent.add(msg); next.incrementAndGet(); + log.info("Sent " + msg); } /** Wait for all messages to be acknowledged and close the connection. */ - public void close() throws InterruptedException { + public void close() throws InterruptedException, IOException { // Wait for receiver to acknowledge all sent messages. + log.info("Waiting for acknowledgements..."); while (unacked.get() < next.get()) { Thread.yield(); if (Thread.interrupted()) throw new InterruptedException(); } - closeNow(); + log.info("Shutting down."); + sock.close(); + ThreadPool.shutdown(pool, log); } /** Close the connection immediately, without waiting for acknowledgements. */ - public void closeNow() { + public void closeNow() throws IOException { log.info("Shutting down."); - ThreadPool.shutDown(pool, log); sock.close(); + ThreadPool.shutdownNow(pool, log); } } -- cgit v1.2.3