diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 16:47:24 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 16:47:24 -0500 |
| commit | 50f0035fd980305faf666fdcd2d3afe2411ce56e (patch) | |
| tree | 15999b160578e5502c4def46bf8ad106db40742f /src/main/java/derms/net/tomulticast/Receive.java | |
| parent | cf8b9db4af665ac116f1030b9acb99fd136781c6 (diff) | |
| download | soen423-50f0035fd980305faf666fdcd2d3afe2411ce56e.zip | |
TotalOrderMulticastReceiver
Diffstat (limited to 'src/main/java/derms/net/tomulticast/Receive.java')
| -rw-r--r-- | src/main/java/derms/net/tomulticast/Receive.java | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java new file mode 100644 index 0000000..f95ee5f --- /dev/null +++ b/src/main/java/derms/net/tomulticast/Receive.java @@ -0,0 +1,53 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Receive messages from the multicast group and place them in the holdback queue. + * Once previous messages are received, they are transferred from the holdback queue + * to the delivery queue in total order. + */ +class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implements Runnable { + private final PriorityBlockingQueue<Message<T>> holdback; + private final BlockingQueue<Message<T>> deliver; + + Receive(InetSocketAddress group, InetAddress laddr, BlockingQueue<Message<T>> deliver) throws IOException { + super(group, laddr); + this.holdback = new PriorityBlockingQueue<Message<T>>(); + this.deliver = deliver; + } + + @Override + public void run() { + try { + for (;;) { + Message<T> msg = sock.receive(); + holdback.put(msg); + tryDeliver(); + } + } catch (InterruptedException e) { + close(); + } + } + + // Try to move messages from the holdback queue to the delivery queue. + private void tryDeliver() { + while (!holdback.isEmpty()) { + // Messages with lower sequence numbers are removed from the priority queue first. + Message<T> msg = holdback.peek(); + if (msg == null || msg.seq != seq+1) + return; + + // This is the next element in the sequence; deliver it. + msg = holdback.remove(); + deliver.add(msg); + incSeq(); + } + } +} |