summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/Heartbeat.java
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-16 10:39:06 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-16 10:39:06 -0500
commite0196b370e8da38d366d327da57186dd3a44c2b2 (patch)
tree673384668db4a6bf39433224196c673d5ebfc9f5 /src/main/java/derms/net/rmulticast/Heartbeat.java
parentabb23793ff85515237b8bd5d4b0e7e8e3be0af9a (diff)
downloadsoen423-e0196b370e8da38d366d327da57186dd3a44c2b2.zip
reliable multicast: periodically send acks
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Heartbeat.java')
-rw-r--r--src/main/java/derms/net/rmulticast/Heartbeat.java63
1 files changed, 63 insertions, 0 deletions
diff --git a/src/main/java/derms/net/rmulticast/Heartbeat.java b/src/main/java/derms/net/rmulticast/Heartbeat.java
new file mode 100644
index 0000000..43eaaf5
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Heartbeat.java
@@ -0,0 +1,63 @@
+package derms.net.rmulticast;
+
+import derms.net.ConcurrentMulticastSocket;
+import derms.net.Packet;
+import derms.util.Wait;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/**
+ * Periodically send acknowledgements to the group. Allows acks to propagate even when some
+ * processes wouldn't normally send messages. Also allows processes to populate their set of
+ * group members.
+ */
+class Heartbeat implements Runnable {
+ private static final Duration period = Duration.ofSeconds(30);
+
+ private final InetSocketAddress group;
+ private final InetAddress laddr;
+ private final Set<MessageID> acks, nacks;
+ private final ConcurrentMulticastSocket outSock;
+ private final Logger log;
+
+ Heartbeat(InetSocketAddress group, InetAddress laddr, Set<MessageID> acks, Set<MessageID> nacks, ConcurrentMulticastSocket outSock) throws IOException {
+ this.group = group;
+ this.laddr = laddr;
+ this.acks = acks;
+ this.nacks = nacks;
+ this.outSock = outSock;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ try {
+ Wait.forDuration(period);
+ send();
+ } catch (IOException e) {
+ log.warning(e.getMessage());
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Heartbeat thread interrupted: " + e.getMessage());
+ }
+ }
+
+ private void send() throws IOException {
+ HeartbeatMessage msg = new HeartbeatMessage(
+ laddr,
+ acks.toArray(new MessageID[0]),
+ nacks.toArray(new MessageID[0]));
+ DatagramPacket pkt = Packet.encode(msg, group);
+ outSock.send(pkt);
+ acks.clear();
+ }
+}