summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net')
-rw-r--r--src/main/java/derms/net/tomulticast/Receive.java7
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java18
2 files changed, 25 insertions, 0 deletions
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java
index e89ae1b..6bc2df5 100644
--- a/src/main/java/derms/net/tomulticast/Receive.java
+++ b/src/main/java/derms/net/tomulticast/Receive.java
@@ -5,6 +5,7 @@ import derms.net.MessagePayload;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -17,6 +18,12 @@ class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implement
private final PriorityBlockingQueue<Message<T>> holdback;
private final BlockingQueue<Message<T>> deliver;
+ Receive(InetSocketAddress group, InetAddress laddr, BlockingQueue<Message<T>> deliver, NetworkInterface ifs) throws IOException {
+ super(group, laddr, ifs);
+ this.holdback = new PriorityBlockingQueue<Message<T>>();
+ this.deliver = deliver;
+ }
+
Receive(InetSocketAddress group, InetAddress laddr, BlockingQueue<Message<T>> deliver) throws IOException {
super(group, laddr);
this.holdback = new PriorityBlockingQueue<Message<T>>();
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
index 549af16..bb0f2d8 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
@@ -6,6 +6,7 @@ import derms.util.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.logging.Logger;
@@ -24,6 +25,23 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> {
*
* @param group The IP address and port of the multicast group.
* @param laddr The IP address of the local process.
+ * @param ifs The network interface to use.
+ */
+ public TotalOrderMulticastReceiver(InetSocketAddress group, InetAddress laddr, NetworkInterface ifs) throws IOException {
+ this.deliver = new LinkedBlockingQueue<Message<T>>();
+ this.log = Logger.getLogger(getClass().getName());
+
+ this.pool = Executors.newSingleThreadExecutor();
+ this.receiver = new Receive<T>(group, laddr, deliver, ifs);
+ pool.execute(receiver);
+ }
+
+ /**
+ * Join the specified totally-ordered multicast group as a receiver using the
+ * default network interface of the machine.
+ *
+ * @param group The IP address and port of the multicast group.
+ * @param laddr The IP address of the local process.
*/
public TotalOrderMulticastReceiver(InetSocketAddress group, InetAddress laddr) throws IOException {
this.deliver = new LinkedBlockingQueue<Message<T>>();