summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/derms/net/tomulticast/Message.java7
-rw-r--r--src/main/java/derms/net/tomulticast/Receive.java53
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticast.java2
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java65
4 files changed, 125 insertions, 2 deletions
diff --git a/src/main/java/derms/net/tomulticast/Message.java b/src/main/java/derms/net/tomulticast/Message.java
index 035e968..29d1ff7 100644
--- a/src/main/java/derms/net/tomulticast/Message.java
+++ b/src/main/java/derms/net/tomulticast/Message.java
@@ -2,7 +2,7 @@ package derms.net.tomulticast;
import derms.net.rmulticast.MessagePayload;
-class Message<T extends MessagePayload> implements MessagePayload {
+class Message<T extends MessagePayload> implements MessagePayload, Comparable<Message<T>> {
long seq; // Sequence number.
T payload;
@@ -34,6 +34,11 @@ class Message<T extends MessagePayload> implements MessagePayload {
}
@Override
+ public int compareTo(Message<T> other) {
+ return Long.compare(this.seq, other.seq);
+ }
+
+ @Override
public String toString() {
return getClass().getSimpleName() + "{" + seq + ", " + payload + "}";
}
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java
new file mode 100644
index 0000000..f95ee5f
--- /dev/null
+++ b/src/main/java/derms/net/tomulticast/Receive.java
@@ -0,0 +1,53 @@
+package derms.net.tomulticast;
+
+import derms.net.rmulticast.MessagePayload;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Receive messages from the multicast group and place them in the holdback queue.
+ * Once previous messages are received, they are transferred from the holdback queue
+ * to the delivery queue in total order.
+ */
+class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implements Runnable {
+ private final PriorityBlockingQueue<Message<T>> holdback;
+ private final BlockingQueue<Message<T>> deliver;
+
+ Receive(InetSocketAddress group, InetAddress laddr, BlockingQueue<Message<T>> deliver) throws IOException {
+ super(group, laddr);
+ this.holdback = new PriorityBlockingQueue<Message<T>>();
+ this.deliver = deliver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ Message<T> msg = sock.receive();
+ holdback.put(msg);
+ tryDeliver();
+ }
+ } catch (InterruptedException e) {
+ close();
+ }
+ }
+
+ // Try to move messages from the holdback queue to the delivery queue.
+ private void tryDeliver() {
+ while (!holdback.isEmpty()) {
+ // Messages with lower sequence numbers are removed from the priority queue first.
+ Message<T> msg = holdback.peek();
+ if (msg == null || msg.seq != seq+1)
+ return;
+
+ // This is the next element in the sequence; deliver it.
+ msg = holdback.remove();
+ deliver.add(msg);
+ incSeq();
+ }
+ }
+}
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
index a6dc2a9..146f706 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
@@ -43,7 +43,7 @@ public abstract class TotalOrderMulticast<T extends MessagePayload> {
/** Increment the sequence number. */
protected void incSeq() {
- if (seq < seq.MAX_VALUE) {
+ if (seq < seq.MAX_VALUE-1) {
seq++;
} else {
log.warning("Sequence number overflow. Wrapping to 0.");
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
new file mode 100644
index 0000000..16154aa
--- /dev/null
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
@@ -0,0 +1,65 @@
+package derms.net.tomulticast;
+
+import derms.net.rmulticast.MessagePayload;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.*;
+import java.util.logging.Logger;
+
+public class TotalOrderMulticastReceiver<T extends MessagePayload> {
+ private static final Duration terminationTimeout = Duration.ofSeconds(1);
+
+ private final BlockingQueue<Message<T>> deliver;
+ private final Logger log;
+ private final ExecutorService pool;
+
+ public TotalOrderMulticastReceiver(InetSocketAddress group, InetAddress laddr) throws IOException {
+ this.deliver = new LinkedBlockingQueue<Message<T>>();
+ this.log = Logger.getLogger(getClass().getName());
+
+ this.pool = Executors.newSingleThreadExecutor();
+ pool.execute(new Receive<T>(group, laddr, deliver));
+ }
+
+ public void close() {
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ log.warning("Thread pool did not terminate after " + terminationTimeout);
+ } catch (InterruptedException e) {
+ log.warning("Interrupted while terminating thread pool: " + e.getMessage());
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /** Receive a message from the group, blocking if necessary until one arrives. */
+ public T receive() throws InterruptedException {
+ Message<T> msg = deliver.take();
+ return msg.payload;
+ }
+
+ /** Receive a message from the group, or return null if none are available. */
+ public T tryReceive() {
+ Message<T> msg = deliver.poll();
+ if (msg == null)
+ return null;
+ return msg.payload;
+ }
+
+ /**
+ * Receive a message from the group, waiting up to the specified wait time if necessary.
+ *
+ * @return A message received from the group, or null if the specified wait time elapsed
+ * before a message arrived.
+ */
+ public T tryReceive(Duration waitTime) throws InterruptedException {
+ Message<T> msg = deliver.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS);
+ if (msg == null)
+ return null;
+ return msg.payload;
+ }
+}