summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-16 10:39:06 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-16 10:39:06 -0500
commite0196b370e8da38d366d327da57186dd3a44c2b2 (patch)
tree673384668db4a6bf39433224196c673d5ebfc9f5 /src/main/java/derms/net
parentabb23793ff85515237b8bd5d4b0e7e8e3be0af9a (diff)
downloadsoen423-e0196b370e8da38d366d327da57186dd3a44c2b2.zip
reliable multicast: periodically send acks
Diffstat (limited to 'src/main/java/derms/net')
-rw-r--r--src/main/java/derms/net/rmulticast/Announce.java48
-rw-r--r--src/main/java/derms/net/rmulticast/AnnounceMessage.java10
-rw-r--r--src/main/java/derms/net/rmulticast/Heartbeat.java63
-rw-r--r--src/main/java/derms/net/rmulticast/HeartbeatMessage.java10
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java35
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java4
6 files changed, 94 insertions, 76 deletions
diff --git a/src/main/java/derms/net/rmulticast/Announce.java b/src/main/java/derms/net/rmulticast/Announce.java
deleted file mode 100644
index 41d72ed..0000000
--- a/src/main/java/derms/net/rmulticast/Announce.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package derms.net.rmulticast;
-
-import derms.net.ConcurrentMulticastSocket;
-import derms.net.Packet;
-import derms.util.Wait;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.time.Duration;
-import java.util.logging.Logger;
-
-/**
- * Periodically announce the presence of the local process to the group.
- * Allows processes to populate their set of group members, even if not
- * all members would normally send messages to the group.
- */
-class Announce implements Runnable {
- private static final Duration period = Duration.ofSeconds(30);
-
- private final DatagramPacket pkt;
- private final ConcurrentMulticastSocket outSock;
- private final Logger log;
-
- Announce(InetSocketAddress group, InetAddress laddr, ConcurrentMulticastSocket outSock) throws IOException {
- AnnounceMessage msg = new AnnounceMessage(laddr);
- this.pkt = Packet.encode(msg, group);
- this.outSock = outSock;
- this.log = Logger.getLogger(this.getClass().getName());
- }
-
- @Override
- public void run() {
- try {
- for (;;) {
- try {
- Wait.forDuration(period);
- outSock.send(pkt);
- } catch (IOException e) {
- log.warning(e.getMessage());
- }
- }
- } catch (InterruptedException e) {
- log.info("Announce thread interrupted: " + e.getMessage());
- }
- }
-}
diff --git a/src/main/java/derms/net/rmulticast/AnnounceMessage.java b/src/main/java/derms/net/rmulticast/AnnounceMessage.java
deleted file mode 100644
index bfe4b40..0000000
--- a/src/main/java/derms/net/rmulticast/AnnounceMessage.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package derms.net.rmulticast;
-
-import java.net.InetAddress;
-
-/** A message whose only purpose is to announce the presence of the sender in the group. */
-class AnnounceMessage extends Message<NullPayload> {
- AnnounceMessage(InetAddress laddr) {
- super(new NullPayload(), laddr, new MessageID[0], new MessageID[0]);
- }
-}
diff --git a/src/main/java/derms/net/rmulticast/Heartbeat.java b/src/main/java/derms/net/rmulticast/Heartbeat.java
new file mode 100644
index 0000000..43eaaf5
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Heartbeat.java
@@ -0,0 +1,63 @@
+package derms.net.rmulticast;
+
+import derms.net.ConcurrentMulticastSocket;
+import derms.net.Packet;
+import derms.util.Wait;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/**
+ * Periodically send acknowledgements to the group. Allows acks to propagate even when some
+ * processes wouldn't normally send messages. Also allows processes to populate their set of
+ * group members.
+ */
+class Heartbeat implements Runnable {
+ private static final Duration period = Duration.ofSeconds(30);
+
+ private final InetSocketAddress group;
+ private final InetAddress laddr;
+ private final Set<MessageID> acks, nacks;
+ private final ConcurrentMulticastSocket outSock;
+ private final Logger log;
+
+ Heartbeat(InetSocketAddress group, InetAddress laddr, Set<MessageID> acks, Set<MessageID> nacks, ConcurrentMulticastSocket outSock) throws IOException {
+ this.group = group;
+ this.laddr = laddr;
+ this.acks = acks;
+ this.nacks = nacks;
+ this.outSock = outSock;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ try {
+ Wait.forDuration(period);
+ send();
+ } catch (IOException e) {
+ log.warning(e.getMessage());
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Heartbeat thread interrupted: " + e.getMessage());
+ }
+ }
+
+ private void send() throws IOException {
+ HeartbeatMessage msg = new HeartbeatMessage(
+ laddr,
+ acks.toArray(new MessageID[0]),
+ nacks.toArray(new MessageID[0]));
+ DatagramPacket pkt = Packet.encode(msg, group);
+ outSock.send(pkt);
+ acks.clear();
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/HeartbeatMessage.java b/src/main/java/derms/net/rmulticast/HeartbeatMessage.java
new file mode 100644
index 0000000..a5a3c61
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/HeartbeatMessage.java
@@ -0,0 +1,10 @@
+package derms.net.rmulticast;
+
+import java.net.InetAddress;
+
+/** A message whose only purpose is to propagate acknowledgements and process IDs to the group. */
+class HeartbeatMessage extends Message<NullPayload> {
+ HeartbeatMessage(InetAddress sender, MessageID[] acks, MessageID[] nacks) {
+ super(new NullPayload(), sender, acks, nacks);
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index 0856a71..ea11c67 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -6,6 +6,7 @@ import derms.net.Packet;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
@@ -36,11 +37,10 @@ class Receive<T extends MessagePayload> implements Runnable {
@Override
public void run() {
DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
- Message<T> msg;
for (;;) {
try {
inSock.receive(pkt);
- msg = Packet.decode(pkt, Message.class);
+ Message<?> msg = Packet.decode(pkt, Message.class);
receive(msg);
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
@@ -48,19 +48,9 @@ class Receive<T extends MessagePayload> implements Runnable {
}
}
- private void receive(Message<T> msg) {
+ private void receive(Message<?> msg) throws ClassCastException {
groupMembers.add(msg.sender);
- if (msg instanceof AnnounceMessage)
- return;
-
- acks.add(msg.id());
- received.add(msg);
- delivered.add(msg);
-
- nacks.remove(msg.id());
- retransmissions.remove(msg);
-
for (MessageID mid : msg.acks) {
acks.remove(mid);
if (!received.contains(mid))
@@ -68,11 +58,24 @@ class Receive<T extends MessagePayload> implements Runnable {
}
for (MessageID mid : msg.nacks) {
- if (received.contains(mid)) {
- retransmissions.add(msg);
- } else {
+ try {
+ Message<T> rmsg = received.getByID(mid);
+ retransmissions.add(rmsg);
+ } catch (NoSuchElementException e) {
nacks.add(mid);
}
}
+
+ if (msg instanceof HeartbeatMessage)
+ return; // Only deliver in-band messages.
+ deliver((Message<T>) msg);
+ }
+
+ private void deliver(Message<T> msg) {
+ acks.add(msg.id());
+ nacks.remove(msg.id());
+ received.add(msg);
+ retransmissions.remove(msg);
+ delivered.add(msg);
}
}
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index 2ead37e..72b3019 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -61,9 +61,9 @@ public class ReliableMulticast<T extends MessagePayload> {
(new Thread(new Prune<T>(received, groupMembers))).start();
try {
- (new Thread(new Announce(group, laddr, outSock))).start();
+ (new Thread(new Heartbeat(group, laddr, acks, nacks, outSock))).start();
} catch (IOException e) {
- log.severe("Failed to start announce thread: " + e.getMessage());
+ log.severe("Failed to start heartbeat thread: " + e.getMessage());
throw e;
}
}