summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-15 17:03:21 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-15 17:03:21 -0500
commit050942608e70d01e5833fe5d0e5019071f95c68b (patch)
tree0e4c285c4290def09877a9e5378861a03d0ee088 /src/main/java/derms
parent5a7cc1873ade99634726042d97a9d91d382867ad (diff)
downloadsoen423-050942608e70d01e5833fe5d0e5019071f95c68b.zip
reliable multicast: announce presence to group
Diffstat (limited to 'src/main/java/derms')
-rw-r--r--src/main/java/derms/net/rmulticast/Announce.java47
-rw-r--r--src/main/java/derms/net/rmulticast/AnnounceMessage.java10
-rw-r--r--src/main/java/derms/net/rmulticast/NullPayload.java10
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java7
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java7
5 files changed, 79 insertions, 2 deletions
diff --git a/src/main/java/derms/net/rmulticast/Announce.java b/src/main/java/derms/net/rmulticast/Announce.java
new file mode 100644
index 0000000..08d817d
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Announce.java
@@ -0,0 +1,47 @@
+package derms.net.rmulticast;
+
+import derms.net.ConcurrentMulticastSocket;
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.logging.Logger;
+
+/**
+ * Periodically announce the presence of the local process to the group.
+ * Allows processes to populate their set of group members, even if not
+ * all members would normally send messages to the group.
+ */
+class Announce implements Runnable {
+ private static final Duration period = Duration.ofSeconds(30);
+
+ private final DatagramPacket pkt;
+ private final ConcurrentMulticastSocket outSock;
+ private final Logger log;
+
+ Announce(InetSocketAddress group, InetAddress laddr, ConcurrentMulticastSocket outSock) throws IOException {
+ AnnounceMessage msg = new AnnounceMessage(laddr);
+ this.pkt = Packet.encode(msg, group);
+ this.outSock = outSock;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ try {
+ Wait.forDuration(period);
+ outSock.send(pkt);
+ } catch (IOException e) {
+ log.warning(e.getMessage());
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Announce thread interrupted: " + e.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/AnnounceMessage.java b/src/main/java/derms/net/rmulticast/AnnounceMessage.java
new file mode 100644
index 0000000..bfe4b40
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/AnnounceMessage.java
@@ -0,0 +1,10 @@
+package derms.net.rmulticast;
+
+import java.net.InetAddress;
+
+/** A message whose only purpose is to announce the presence of the sender in the group. */
+class AnnounceMessage extends Message<NullPayload> {
+ AnnounceMessage(InetAddress laddr) {
+ super(new NullPayload(), laddr, new MessageID[0], new MessageID[0]);
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/NullPayload.java b/src/main/java/derms/net/rmulticast/NullPayload.java
new file mode 100644
index 0000000..1eee5d6
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/NullPayload.java
@@ -0,0 +1,10 @@
+package derms.net.rmulticast;
+
+import java.io.Serializable;
+
+class NullPayload implements Serializable, Hashable {
+ @Override
+ public int hash() {
+ return -1;
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index f20f833..29ee305 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -50,12 +50,15 @@ class Receive<T extends Serializable & Hashable> implements Runnable {
}
private void receive(Message<T> msg) {
+ groupMembers.add(msg.sender);
+
+ if (msg instanceof AnnounceMessage)
+ return;
+
acks.add(msg.id());
received.add(msg);
delivered.add(msg);
- groupMembers.add(msg.sender);
-
nacks.remove(msg.id());
retransmissions.remove(msg);
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index 528bb41..a9e1fa4 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -53,6 +53,13 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
(new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
(new Thread(new Prune<T>(received, groupMembers))).start();
+
+ try {
+ (new Thread(new Announce(group, laddr, outSock))).start();
+ } catch (IOException e) {
+ log.severe("Failed to start announce thread: " + e.getMessage());
+ throw e;
+ }
}
public void send(T payload) throws IOException {