summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java')
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java65
1 files changed, 65 insertions, 0 deletions
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
new file mode 100644
index 0000000..16154aa
--- /dev/null
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
@@ -0,0 +1,65 @@
+package derms.net.tomulticast;
+
+import derms.net.rmulticast.MessagePayload;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.*;
+import java.util.logging.Logger;
+
+public class TotalOrderMulticastReceiver<T extends MessagePayload> {
+ private static final Duration terminationTimeout = Duration.ofSeconds(1);
+
+ private final BlockingQueue<Message<T>> deliver;
+ private final Logger log;
+ private final ExecutorService pool;
+
+ public TotalOrderMulticastReceiver(InetSocketAddress group, InetAddress laddr) throws IOException {
+ this.deliver = new LinkedBlockingQueue<Message<T>>();
+ this.log = Logger.getLogger(getClass().getName());
+
+ this.pool = Executors.newSingleThreadExecutor();
+ pool.execute(new Receive<T>(group, laddr, deliver));
+ }
+
+ public void close() {
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ log.warning("Thread pool did not terminate after " + terminationTimeout);
+ } catch (InterruptedException e) {
+ log.warning("Interrupted while terminating thread pool: " + e.getMessage());
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /** Receive a message from the group, blocking if necessary until one arrives. */
+ public T receive() throws InterruptedException {
+ Message<T> msg = deliver.take();
+ return msg.payload;
+ }
+
+ /** Receive a message from the group, or return null if none are available. */
+ public T tryReceive() {
+ Message<T> msg = deliver.poll();
+ if (msg == null)
+ return null;
+ return msg.payload;
+ }
+
+ /**
+ * Receive a message from the group, waiting up to the specified wait time if necessary.
+ *
+ * @return A message received from the group, or null if the specified wait time elapsed
+ * before a message arrived.
+ */
+ public T tryReceive(Duration waitTime) throws InterruptedException {
+ Message<T> msg = deliver.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS);
+ if (msg == null)
+ return null;
+ return msg.payload;
+ }
+}