From 43549dee0a8214c1bb02c874dcfd7ba37a8ef310 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Fri, 22 Nov 2024 12:02:08 -0500 Subject: ReliableUnicastSender --- .../derms/net/runicast/ReliableUnicastSender.java | 82 ++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 src/main/java/derms/net/runicast/ReliableUnicastSender.java (limited to 'src/main/java/derms/net/runicast/ReliableUnicastSender.java') diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java new file mode 100644 index 0000000..2a0943a --- /dev/null +++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java @@ -0,0 +1,82 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +public class ReliableUnicastSender { + private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + private final AtomicLong next; // Next sequence number. + private final AtomicLong unacked; // Sequence number of first unacknowledged message. + private final Queue> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + private final ExecutorService pool; + + /** + * @param raddr Remote IP address to connect to. + */ + ReliableUnicastSender(InetSocketAddress raddr) throws IOException { + this.next = new AtomicLong(0); + this.unacked = new AtomicLong(0); + this.sent = new LinkedBlockingQueue>(); + this.sock = new ConcurrentDatagramSocket(); + this.sock.connect(raddr); + this.sock.setSoTimeout(soTimeout); + this.log = Logger.getLogger(getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new ReceiveAcks(unacked, sent, sock)); + pool.execute(new Retransmit(unacked, sent, sock)); + } + + public void send(T payload) throws IOException { + Message msg = new Message(next.get(), payload); + DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); + sock.send(pkt); + sent.add(msg); + next.incrementAndGet(); + } + + /** Wait for all messages to be acknowledged and close the connection. */ + public void close() throws InterruptedException { + // Wait for receiver to acknowledge all sent messages. + while (unacked.get() < next.get()) { + Thread.yield(); + if (Thread.interrupted()) + throw new InterruptedException(); + } + + closeNow(); + } + + /** Close the connection immediately, without waiting for acknowledgements. */ + public void closeNow() { + log.info("Shutting down."); + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + + sock.close(); + } +} -- cgit v1.2.3