diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 16:47:24 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 16:47:24 -0500 |
| commit | 50f0035fd980305faf666fdcd2d3afe2411ce56e (patch) | |
| tree | 15999b160578e5502c4def46bf8ad106db40742f /src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java | |
| parent | cf8b9db4af665ac116f1030b9acb99fd136781c6 (diff) | |
| download | soen423-50f0035fd980305faf666fdcd2d3afe2411ce56e.zip | |
TotalOrderMulticastReceiver
Diffstat (limited to 'src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java')
| -rw-r--r-- | src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java new file mode 100644 index 0000000..16154aa --- /dev/null +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -0,0 +1,65 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.logging.Logger; + +public class TotalOrderMulticastReceiver<T extends MessagePayload> { + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + private final BlockingQueue<Message<T>> deliver; + private final Logger log; + private final ExecutorService pool; + + public TotalOrderMulticastReceiver(InetSocketAddress group, InetAddress laddr) throws IOException { + this.deliver = new LinkedBlockingQueue<Message<T>>(); + this.log = Logger.getLogger(getClass().getName()); + + this.pool = Executors.newSingleThreadExecutor(); + pool.execute(new Receive<T>(group, laddr, deliver)); + } + + public void close() { + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } + + /** Receive a message from the group, blocking if necessary until one arrives. */ + public T receive() throws InterruptedException { + Message<T> msg = deliver.take(); + return msg.payload; + } + + /** Receive a message from the group, or return null if none are available. */ + public T tryReceive() { + Message<T> msg = deliver.poll(); + if (msg == null) + return null; + return msg.payload; + } + + /** + * Receive a message from the group, waiting up to the specified wait time if necessary. + * + * @return A message received from the group, or null if the specified wait time elapsed + * before a message arrived. + */ + public T tryReceive(Duration waitTime) throws InterruptedException { + Message<T> msg = deliver.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS); + if (msg == null) + return null; + return msg.payload; + } +} |