diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-22 12:39:53 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-22 12:39:53 -0500 |
| commit | 0912e30fb375f60bade22fd8cf7331e1b8d77659 (patch) | |
| tree | a2a99ac1a095f65b37a92c4ec1acf05bf1353a3d /src/main/java/derms/net/runicast/ReliableUnicastReceiver.java | |
| parent | 43549dee0a8214c1bb02c874dcfd7ba37a8ef310 (diff) | |
| download | soen423-0912e30fb375f60bade22fd8cf7331e1b8d77659.zip | |
ReliableUnicastReceiver
Diffstat (limited to 'src/main/java/derms/net/runicast/ReliableUnicastReceiver.java')
| -rw-r--r-- | src/main/java/derms/net/runicast/ReliableUnicastReceiver.java | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java index 3d63a6f..dc69834 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java @@ -1,4 +1,58 @@ package derms.net.runicast; -public class ReliableUnicastReceiver { +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.util.ThreadPool; + +import java.io.IOException; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.logging.Logger; + +public class ReliableUnicastReceiver<T extends MessagePayload> { + private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. + + private final ConcurrentDatagramSocket sock; + private final BlockingQueue<T> delivered; + private final Logger log; + private final ExecutorService pool; + + /** + * @param laddr The local IP address and port to listen on. + */ + ReliableUnicastReceiver(SocketAddress laddr) throws IOException { + this.sock = new ConcurrentDatagramSocket(laddr); + this.sock.setSoTimeout(soTimeout); + this.delivered = new LinkedBlockingQueue<T>(); + this.log = Logger.getLogger(getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new Receive<T>(sock, delivered)); + } + + public void close() { + log.info("Shutting down"); + ThreadPool.shutDown(pool, log); + sock.close(); + } + + /** Receive a message, blocking if necessary until one arrives. */ + public T receive() throws InterruptedException { + return delivered.take(); + } + + /** Receive a message, or return null if none are available. */ + public T tryReceive() { + return delivered.poll(); + } + + /** + * Receive a message, waiting up to the specified wait time if necessary. + * + * @return A message, or null if the specified waiting time elapses before + * a message arrives. + */ + public T tryReceive(Duration waitTime) throws InterruptedException { + return delivered.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS); + } } |