summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/tomulticast/Receive.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-16 16:47:24 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-16 16:47:24 -0500
commit50f0035fd980305faf666fdcd2d3afe2411ce56e (patch)
tree15999b160578e5502c4def46bf8ad106db40742f /src/main/java/derms/net/tomulticast/Receive.java
parentcf8b9db4af665ac116f1030b9acb99fd136781c6 (diff)
downloadsoen423-50f0035fd980305faf666fdcd2d3afe2411ce56e.zip
TotalOrderMulticastReceiver
Diffstat (limited to 'src/main/java/derms/net/tomulticast/Receive.java')
-rw-r--r--src/main/java/derms/net/tomulticast/Receive.java53
1 files changed, 53 insertions, 0 deletions
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java
new file mode 100644
index 0000000..f95ee5f
--- /dev/null
+++ b/src/main/java/derms/net/tomulticast/Receive.java
@@ -0,0 +1,53 @@
+package derms.net.tomulticast;
+
+import derms.net.rmulticast.MessagePayload;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Receive messages from the multicast group and place them in the holdback queue.
+ * Once previous messages are received, they are transferred from the holdback queue
+ * to the delivery queue in total order.
+ */
+class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implements Runnable {
+ private final PriorityBlockingQueue<Message<T>> holdback;
+ private final BlockingQueue<Message<T>> deliver;
+
+ Receive(InetSocketAddress group, InetAddress laddr, BlockingQueue<Message<T>> deliver) throws IOException {
+ super(group, laddr);
+ this.holdback = new PriorityBlockingQueue<Message<T>>();
+ this.deliver = deliver;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ Message<T> msg = sock.receive();
+ holdback.put(msg);
+ tryDeliver();
+ }
+ } catch (InterruptedException e) {
+ close();
+ }
+ }
+
+ // Try to move messages from the holdback queue to the delivery queue.
+ private void tryDeliver() {
+ while (!holdback.isEmpty()) {
+ // Messages with lower sequence numbers are removed from the priority queue first.
+ Message<T> msg = holdback.peek();
+ if (msg == null || msg.seq != seq+1)
+ return;
+
+ // This is the next element in the sequence; deliver it.
+ msg = holdback.remove();
+ deliver.add(msg);
+ incSeq();
+ }
+ }
+}