summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net')
-rw-r--r--src/main/java/derms/net/tomulticast/Message.java40
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticast.java53
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java31
3 files changed, 124 insertions, 0 deletions
diff --git a/src/main/java/derms/net/tomulticast/Message.java b/src/main/java/derms/net/tomulticast/Message.java
new file mode 100644
index 0000000..035e968
--- /dev/null
+++ b/src/main/java/derms/net/tomulticast/Message.java
@@ -0,0 +1,40 @@
+package derms.net.tomulticast;
+
+import derms.net.rmulticast.MessagePayload;
+
+class Message<T extends MessagePayload> implements MessagePayload {
+ long seq; // Sequence number.
+ T payload;
+
+ Message(long seq, T payload) {
+ this.seq = seq;
+ this.payload = payload;
+ }
+
+ @Override
+ public int hash() {
+ return (int) seq * payload.hash();
+ }
+
+ @Override
+ public int hashCode() {
+ return hash();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+ if (obj.getClass() != this.getClass())
+ return false;
+ Message<?> other = (Message<?>) obj;
+ if (other.payload.getClass() != this.payload.getClass())
+ return false;
+ return other.seq == this.seq && other.hash() == this.hash();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "{" + seq + ", " + payload + "}";
+ }
+}
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
new file mode 100644
index 0000000..d83a34d
--- /dev/null
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
@@ -0,0 +1,53 @@
+package derms.net.tomulticast;
+
+import derms.net.rmulticast.MessagePayload;
+import derms.net.rmulticast.ReliableMulticast;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.logging.Logger;
+
+/**
+ * <p>A single-sender, multiple-receiver multicast group that enforces a total order on
+ * received messages. If a process delivers message <i>A</i> before message <i>B</i>,
+ * then all other processes will deliver <i>A</i> and <i>B</i> in the same order
+ * (<i>A</i> before <i>B</i>).</p>
+ *
+ * <p>Additionally, the protocol guarantees reliable delivery of messages. If the sender
+ * crashes while sending a message, either all processes will receive the message, or
+ * none of them will.</p>
+ *
+ * <p><b>Only one sender is allowed per multicast group</b>, but a group may have multiple
+ * receivers. Use {@link derms.net.tomulticast.TotalOrderMulticastSender} to send
+ * messages, and {@link derms.net.tomulticast.TotalOrderMulticastReceiver} to receive
+ * them.</p>
+ */
+public abstract class TotalOrderMulticast<T extends MessagePayload> {
+ final ReliableMulticast<Message<T>> sock;
+ protected final InetSocketAddress group;
+ protected Long seq; // Sequence number.
+ protected final Logger log;
+
+ protected TotalOrderMulticast(InetSocketAddress group, InetAddress laddr) throws IOException {
+ this.sock = new ReliableMulticast<Message<T>>(group, laddr);
+ this.group = group;
+ this.seq = (long) 0;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ /** Close the underlying socket. */
+ public void close() {
+ sock.close();
+ }
+
+ /** Increment the sequence number. */
+ protected void incSeq() {
+ if (seq < seq.MAX_VALUE) {
+ seq++;
+ } else {
+ log.warning("Sequence number overflow. Wrapping to 0.");
+ seq = (long) 0;
+ }
+ }
+}
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java
new file mode 100644
index 0000000..63498a6
--- /dev/null
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java
@@ -0,0 +1,31 @@
+package derms.net.tomulticast;
+
+import derms.net.rmulticast.MessagePayload;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * The single sending process in a {@link TotalOrderMulticast} group. <b>Only one sender is
+ * allowed per group.</b>
+ */
+public class TotalOrderMulticastSender<T extends MessagePayload> extends TotalOrderMulticast<T> {
+ /**
+ * Join the specified totally-ordered multicast group as its lone sender.
+ *
+ * @param group The IP address and port of the multicast group to join.
+ * @param laddr The IP address of the local process.
+ */
+ public TotalOrderMulticastSender(InetSocketAddress group, InetAddress laddr) throws IOException {
+ super(group, laddr);
+ }
+
+ /** Send a message to the group. */
+ public void send(T payload) throws IOException {
+ Message<T> msg = new Message<T>(seq, payload);
+ sock.send(msg);
+ incSeq();
+ log.info("Sent " + msg + " from " + sock + " to " + group);
+ }
+}