summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/runicast/ReliableUnicastSender.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-23 11:34:42 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-23 11:34:42 -0500
commitd5a1ec8b54c1c3c516d07f1916276cd6e5a937e4 (patch)
tree34f4ed7975803f573d16a7215ae39a9b2791a9b9 /src/main/java/derms/net/runicast/ReliableUnicastSender.java
parente3df4a078afd37314d330daa2de0883f8dd1811b (diff)
downloadsoen423-d5a1ec8b54c1c3c516d07f1916276cd6e5a937e4.zip
runicast: use DatagramChannel
Diffstat (limited to 'src/main/java/derms/net/runicast/ReliableUnicastSender.java')
-rw-r--r--src/main/java/derms/net/runicast/ReliableUnicastSender.java36
1 files changed, 18 insertions, 18 deletions
diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java
index 83408d5..1f3c5d4 100644
--- a/src/main/java/derms/net/runicast/ReliableUnicastSender.java
+++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java
@@ -1,14 +1,13 @@
package derms.net.runicast;
-import derms.net.ConcurrentDatagramSocket;
+import derms.io.Serial;
import derms.net.MessagePayload;
-import derms.net.Packet;
import derms.util.ThreadPool;
import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetSocketAddress;
-import java.time.Duration;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -18,12 +17,10 @@ import java.util.logging.Logger;
/** The sending end of a reliable unicast connection. */
public class ReliableUnicastSender<T extends MessagePayload> {
- private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout.
-
private final AtomicLong next; // Next sequence number.
private final AtomicLong unacked; // Sequence number of first unacknowledged message.
private final Queue<Message<T>> sent;
- private final ConcurrentDatagramSocket sock;
+ private final DatagramChannel sock;
private final Logger log;
private final ExecutorService pool;
@@ -32,13 +29,12 @@ public class ReliableUnicastSender<T extends MessagePayload> {
*
* @param raddr The remote IP address to connect to.
*/
- public ReliableUnicastSender(InetSocketAddress raddr) throws IOException {
+ public ReliableUnicastSender(SocketAddress raddr) throws IOException {
this.next = new AtomicLong(0);
this.unacked = new AtomicLong(0);
this.sent = new LinkedBlockingQueue<Message<T>>();
- this.sock = new ConcurrentDatagramSocket();
- this.sock.connect(raddr);
- this.sock.setSoTimeout(soTimeout);
+ this.sock = DatagramChannel.open();
+ sock.connect(raddr);
this.log = Logger.getLogger(getClass().getName());
this.pool = Executors.newCachedThreadPool();
pool.execute(new ReceiveAcks<T>(unacked, sent, sock));
@@ -47,28 +43,32 @@ public class ReliableUnicastSender<T extends MessagePayload> {
public void send(T payload) throws IOException {
Message<T> msg = new Message<T>(next.get(), payload);
- DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress());
- sock.send(pkt);
+ ByteBuffer buf = Serial.encode(msg);
+ sock.send(buf, sock.getRemoteAddress());
sent.add(msg);
next.incrementAndGet();
+ log.info("Sent " + msg);
}
/** Wait for all messages to be acknowledged and close the connection. */
- public void close() throws InterruptedException {
+ public void close() throws InterruptedException, IOException {
// Wait for receiver to acknowledge all sent messages.
+ log.info("Waiting for acknowledgements...");
while (unacked.get() < next.get()) {
Thread.yield();
if (Thread.interrupted())
throw new InterruptedException();
}
- closeNow();
+ log.info("Shutting down.");
+ sock.close();
+ ThreadPool.shutdown(pool, log);
}
/** Close the connection immediately, without waiting for acknowledgements. */
- public void closeNow() {
+ public void closeNow() throws IOException {
log.info("Shutting down.");
- ThreadPool.shutDown(pool, log);
sock.close();
+ ThreadPool.shutdownNow(pool, log);
}
}