From 0912e30fb375f60bade22fd8cf7331e1b8d77659 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Fri, 22 Nov 2024 12:39:53 -0500 Subject: ReliableUnicastReceiver --- .../net/runicast/ReliableUnicastReceiver.java | 56 +++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) (limited to 'src/main/java/derms/net/runicast/ReliableUnicastReceiver.java') diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java index 3d63a6f..dc69834 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java @@ -1,4 +1,58 @@ package derms.net.runicast; -public class ReliableUnicastReceiver { +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.util.ThreadPool; + +import java.io.IOException; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.logging.Logger; + +public class ReliableUnicastReceiver { + private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. + + private final ConcurrentDatagramSocket sock; + private final BlockingQueue delivered; + private final Logger log; + private final ExecutorService pool; + + /** + * @param laddr The local IP address and port to listen on. + */ + ReliableUnicastReceiver(SocketAddress laddr) throws IOException { + this.sock = new ConcurrentDatagramSocket(laddr); + this.sock.setSoTimeout(soTimeout); + this.delivered = new LinkedBlockingQueue(); + this.log = Logger.getLogger(getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new Receive(sock, delivered)); + } + + public void close() { + log.info("Shutting down"); + ThreadPool.shutDown(pool, log); + sock.close(); + } + + /** Receive a message, blocking if necessary until one arrives. */ + public T receive() throws InterruptedException { + return delivered.take(); + } + + /** Receive a message, or return null if none are available. */ + public T tryReceive() { + return delivered.poll(); + } + + /** + * Receive a message, waiting up to the specified wait time if necessary. + * + * @return A message, or null if the specified waiting time elapses before + * a message arrives. + */ + public T tryReceive(Duration waitTime) throws InterruptedException { + return delivered.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS); + } } -- cgit v1.2.3