From 50f0035fd980305faf666fdcd2d3afe2411ce56e Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 16 Nov 2024 16:47:24 -0500 Subject: TotalOrderMulticastReceiver --- .../tomulticast/TotalOrderMulticastReceiver.java | 65 ++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java (limited to 'src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java') diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java new file mode 100644 index 0000000..16154aa --- /dev/null +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -0,0 +1,65 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.logging.Logger; + +public class TotalOrderMulticastReceiver { + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + private final BlockingQueue> deliver; + private final Logger log; + private final ExecutorService pool; + + public TotalOrderMulticastReceiver(InetSocketAddress group, InetAddress laddr) throws IOException { + this.deliver = new LinkedBlockingQueue>(); + this.log = Logger.getLogger(getClass().getName()); + + this.pool = Executors.newSingleThreadExecutor(); + pool.execute(new Receive(group, laddr, deliver)); + } + + public void close() { + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } + + /** Receive a message from the group, blocking if necessary until one arrives. */ + public T receive() throws InterruptedException { + Message msg = deliver.take(); + return msg.payload; + } + + /** Receive a message from the group, or return null if none are available. */ + public T tryReceive() { + Message msg = deliver.poll(); + if (msg == null) + return null; + return msg.payload; + } + + /** + * Receive a message from the group, waiting up to the specified wait time if necessary. + * + * @return A message received from the group, or null if the specified wait time elapsed + * before a message arrived. + */ + public T tryReceive(Duration waitTime) throws InterruptedException { + Message msg = deliver.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS); + if (msg == null) + return null; + return msg.payload; + } +} -- cgit v1.2.3