From 50f0035fd980305faf666fdcd2d3afe2411ce56e Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 16 Nov 2024 16:47:24 -0500 Subject: TotalOrderMulticastReceiver --- src/main/java/derms/net/tomulticast/Receive.java | 53 ++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 src/main/java/derms/net/tomulticast/Receive.java (limited to 'src/main/java/derms/net/tomulticast/Receive.java') diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java new file mode 100644 index 0000000..f95ee5f --- /dev/null +++ b/src/main/java/derms/net/tomulticast/Receive.java @@ -0,0 +1,53 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Receive messages from the multicast group and place them in the holdback queue. + * Once previous messages are received, they are transferred from the holdback queue + * to the delivery queue in total order. + */ +class Receive extends TotalOrderMulticast implements Runnable { + private final PriorityBlockingQueue> holdback; + private final BlockingQueue> deliver; + + Receive(InetSocketAddress group, InetAddress laddr, BlockingQueue> deliver) throws IOException { + super(group, laddr); + this.holdback = new PriorityBlockingQueue>(); + this.deliver = deliver; + } + + @Override + public void run() { + try { + for (;;) { + Message msg = sock.receive(); + holdback.put(msg); + tryDeliver(); + } + } catch (InterruptedException e) { + close(); + } + } + + // Try to move messages from the holdback queue to the delivery queue. + private void tryDeliver() { + while (!holdback.isEmpty()) { + // Messages with lower sequence numbers are removed from the priority queue first. + Message msg = holdback.peek(); + if (msg == null || msg.seq != seq+1) + return; + + // This is the next element in the sequence; deliver it. + msg = holdback.remove(); + deliver.add(msg); + incSeq(); + } + } +} -- cgit v1.2.3