From 35c2f2a917de223c658ecc594719edd09bead7cc Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 16 Nov 2024 15:54:22 -0500 Subject: TotalOrderMulticastSender --- src/main/java/derms/net/tomulticast/Message.java | 40 ++++++++++++++++ .../derms/net/tomulticast/TotalOrderMulticast.java | 53 ++++++++++++++++++++++ .../net/tomulticast/TotalOrderMulticastSender.java | 31 +++++++++++++ 3 files changed, 124 insertions(+) create mode 100644 src/main/java/derms/net/tomulticast/Message.java create mode 100644 src/main/java/derms/net/tomulticast/TotalOrderMulticast.java create mode 100644 src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java (limited to 'src/main') diff --git a/src/main/java/derms/net/tomulticast/Message.java b/src/main/java/derms/net/tomulticast/Message.java new file mode 100644 index 0000000..035e968 --- /dev/null +++ b/src/main/java/derms/net/tomulticast/Message.java @@ -0,0 +1,40 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +class Message implements MessagePayload { + long seq; // Sequence number. + T payload; + + Message(long seq, T payload) { + this.seq = seq; + this.payload = payload; + } + + @Override + public int hash() { + return (int) seq * payload.hash(); + } + + @Override + public int hashCode() { + return hash(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + if (obj.getClass() != this.getClass()) + return false; + Message other = (Message) obj; + if (other.payload.getClass() != this.payload.getClass()) + return false; + return other.seq == this.seq && other.hash() == this.hash(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + seq + ", " + payload + "}"; + } +} diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java new file mode 100644 index 0000000..d83a34d --- /dev/null +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java @@ -0,0 +1,53 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; +import derms.net.rmulticast.ReliableMulticast; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.logging.Logger; + +/** + *

A single-sender, multiple-receiver multicast group that enforces a total order on + * received messages. If a process delivers message A before message B, + * then all other processes will deliver A and B in the same order + * (A before B).

+ * + *

Additionally, the protocol guarantees reliable delivery of messages. If the sender + * crashes while sending a message, either all processes will receive the message, or + * none of them will.

+ * + *

Only one sender is allowed per multicast group, but a group may have multiple + * receivers. Use {@link derms.net.tomulticast.TotalOrderMulticastSender} to send + * messages, and {@link derms.net.tomulticast.TotalOrderMulticastReceiver} to receive + * them.

+ */ +public abstract class TotalOrderMulticast { + final ReliableMulticast> sock; + protected final InetSocketAddress group; + protected Long seq; // Sequence number. + protected final Logger log; + + protected TotalOrderMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { + this.sock = new ReliableMulticast>(group, laddr); + this.group = group; + this.seq = (long) 0; + this.log = Logger.getLogger(this.getClass().getName()); + } + + /** Close the underlying socket. */ + public void close() { + sock.close(); + } + + /** Increment the sequence number. */ + protected void incSeq() { + if (seq < seq.MAX_VALUE) { + seq++; + } else { + log.warning("Sequence number overflow. Wrapping to 0."); + seq = (long) 0; + } + } +} diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java new file mode 100644 index 0000000..63498a6 --- /dev/null +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java @@ -0,0 +1,31 @@ +package derms.net.tomulticast; + +import derms.net.rmulticast.MessagePayload; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * The single sending process in a {@link TotalOrderMulticast} group. Only one sender is + * allowed per group. + */ +public class TotalOrderMulticastSender extends TotalOrderMulticast { + /** + * Join the specified totally-ordered multicast group as its lone sender. + * + * @param group The IP address and port of the multicast group to join. + * @param laddr The IP address of the local process. + */ + public TotalOrderMulticastSender(InetSocketAddress group, InetAddress laddr) throws IOException { + super(group, laddr); + } + + /** Send a message to the group. */ + public void send(T payload) throws IOException { + Message msg = new Message(seq, payload); + sock.send(msg); + incSeq(); + log.info("Sent " + msg + " from " + sock + " to " + group); + } +} -- cgit v1.2.3