diff options
Diffstat (limited to 'src/main/java/derms/net')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 17 |
1 files changed, 15 insertions, 2 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 2b9d9aa..1632351 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -13,7 +13,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; -/** TODO */ +/** + * A reliable multicast protocol that guarantees delivery of messages in the event of a fail-stop. + * + * An implementation of the Trans protocol over IP multicast. Melliar-Smith, et al. "Broadcast Protocols for + * Distributed Systems" in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, 1990. + */ public class ReliableMulticast<T extends MessagePayload> { private final SocketAddress group; private final InetAddress laddr; // Local address. @@ -27,6 +32,12 @@ public class ReliableMulticast<T extends MessagePayload> { private final BlockingQueue<Message<T>> delivered; private final Logger log; + /** + * Join the specified multicast group. + * + * @param group The IP address and port of the multicast group. + * @param laddr The IP address of the local process. + */ public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { this.group = group; this.laddr = laddr; @@ -61,6 +72,7 @@ public class ReliableMulticast<T extends MessagePayload> { } } + /** Send a message to the group. */ public void send(T payload) throws IOException { Message<T> msg = new Message<T>( payload, @@ -74,8 +86,9 @@ public class ReliableMulticast<T extends MessagePayload> { lastSend.set(Instant.now()); } + /** Receive a message from the group, blocking if necessary until a message arrives. */ public T receive() throws InterruptedException { - Message<T> msg = delivered.take(); + Message<T> msg = delivered.take(); // Blocks until a message becomes available. return msg.payload; } } |