From 50f0035fd980305faf666fdcd2d3afe2411ce56e Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 16 Nov 2024 16:47:24 -0500 Subject: TotalOrderMulticastReceiver --- src/main/java/derms/net/tomulticast/Message.java | 7 ++- src/main/java/derms/net/tomulticast/Receive.java | 53 ++++++++++++++++++ .../derms/net/tomulticast/TotalOrderMulticast.java | 2 +- .../tomulticast/TotalOrderMulticastReceiver.java | 65 ++++++++++++++++++++++ 4 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 src/main/java/derms/net/tomulticast/Receive.java create mode 100644 src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java diff --git a/src/main/java/derms/net/tomulticast/Message.java b/src/main/java/derms/net/tomulticast/Message.java index 035e968..29d1ff7 100644 --- a/src/main/java/derms/net/tomulticast/Message.java +++ b/src/main/java/derms/net/tomulticast/Message.java @@ -2,7 +2,7 @@ package derms.net.tomulticast; import derms.net.rmulticast.MessagePayload; -class Message implements MessagePayload { +class Message implements MessagePayload, Comparable> { long seq; // Sequence number. T payload; @@ -33,6 +33,11 @@ class Message implements MessagePayload { return other.seq == this.seq && other.hash() == this.hash(); } + @Override + public int compareTo(Message other) { + return Long.compare(this.seq, other.seq); + } + @Override public String toString() { return getClass().getSimpleName() + "{" + seq + ", " + payload + "}"; diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java new file mode 100644 index 0000000..f95ee5f --- /dev/null +++ b/src/main/java/derms/net/tomulticast/Receive.java @@ -0,0 +1,53 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Receive messages from the multicast group and place them in the holdback queue. + * Once previous messages are received, they are transferred from the holdback queue + * to the delivery queue in total order. + */ +class Receive extends TotalOrderMulticast implements Runnable { + private final PriorityBlockingQueue> holdback; + private final BlockingQueue> deliver; + + Receive(InetSocketAddress group, InetAddress laddr, BlockingQueue> deliver) throws IOException { + super(group, laddr); + this.holdback = new PriorityBlockingQueue>(); + this.deliver = deliver; + } + + @Override + public void run() { + try { + for (;;) { + Message msg = sock.receive(); + holdback.put(msg); + tryDeliver(); + } + } catch (InterruptedException e) { + close(); + } + } + + // Try to move messages from the holdback queue to the delivery queue. + private void tryDeliver() { + while (!holdback.isEmpty()) { + // Messages with lower sequence numbers are removed from the priority queue first. + Message msg = holdback.peek(); + if (msg == null || msg.seq != seq+1) + return; + + // This is the next element in the sequence; deliver it. + msg = holdback.remove(); + deliver.add(msg); + incSeq(); + } + } +} diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java index a6dc2a9..146f706 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java @@ -43,7 +43,7 @@ public abstract class TotalOrderMulticast { /** Increment the sequence number. */ protected void incSeq() { - if (seq < seq.MAX_VALUE) { + if (seq < seq.MAX_VALUE-1) { seq++; } else { log.warning("Sequence number overflow. Wrapping to 0."); diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java new file mode 100644 index 0000000..16154aa --- /dev/null +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -0,0 +1,65 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.logging.Logger; + +public class TotalOrderMulticastReceiver { + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + private final BlockingQueue> deliver; + private final Logger log; + private final ExecutorService pool; + + public TotalOrderMulticastReceiver(InetSocketAddress group, InetAddress laddr) throws IOException { + this.deliver = new LinkedBlockingQueue>(); + this.log = Logger.getLogger(getClass().getName()); + + this.pool = Executors.newSingleThreadExecutor(); + pool.execute(new Receive(group, laddr, deliver)); + } + + public void close() { + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } + + /** Receive a message from the group, blocking if necessary until one arrives. */ + public T receive() throws InterruptedException { + Message msg = deliver.take(); + return msg.payload; + } + + /** Receive a message from the group, or return null if none are available. */ + public T tryReceive() { + Message msg = deliver.poll(); + if (msg == null) + return null; + return msg.payload; + } + + /** + * Receive a message from the group, waiting up to the specified wait time if necessary. + * + * @return A message received from the group, or null if the specified wait time elapsed + * before a message arrived. + */ + public T tryReceive(Duration waitTime) throws InterruptedException { + Message msg = deliver.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS); + if (msg == null) + return null; + return msg.payload; + } +} -- cgit v1.2.3