diff options
Diffstat (limited to 'src/main/java/derms')
6 files changed, 94 insertions, 76 deletions
diff --git a/src/main/java/derms/net/rmulticast/Announce.java b/src/main/java/derms/net/rmulticast/Announce.java deleted file mode 100644 index 41d72ed..0000000 --- a/src/main/java/derms/net/rmulticast/Announce.java +++ /dev/null @@ -1,48 +0,0 @@ -package derms.net.rmulticast; - -import derms.net.ConcurrentMulticastSocket; -import derms.net.Packet; -import derms.util.Wait; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.time.Duration; -import java.util.logging.Logger; - -/** - * Periodically announce the presence of the local process to the group. - * Allows processes to populate their set of group members, even if not - * all members would normally send messages to the group. - */ -class Announce implements Runnable { - private static final Duration period = Duration.ofSeconds(30); - - private final DatagramPacket pkt; - private final ConcurrentMulticastSocket outSock; - private final Logger log; - - Announce(InetSocketAddress group, InetAddress laddr, ConcurrentMulticastSocket outSock) throws IOException { - AnnounceMessage msg = new AnnounceMessage(laddr); - this.pkt = Packet.encode(msg, group); - this.outSock = outSock; - this.log = Logger.getLogger(this.getClass().getName()); - } - - @Override - public void run() { - try { - for (;;) { - try { - Wait.forDuration(period); - outSock.send(pkt); - } catch (IOException e) { - log.warning(e.getMessage()); - } - } - } catch (InterruptedException e) { - log.info("Announce thread interrupted: " + e.getMessage()); - } - } -} diff --git a/src/main/java/derms/net/rmulticast/AnnounceMessage.java b/src/main/java/derms/net/rmulticast/AnnounceMessage.java deleted file mode 100644 index bfe4b40..0000000 --- a/src/main/java/derms/net/rmulticast/AnnounceMessage.java +++ /dev/null @@ -1,10 +0,0 @@ -package derms.net.rmulticast; - -import java.net.InetAddress; - -/** A message whose only purpose is to announce the presence of the sender in the group. */ -class AnnounceMessage extends Message<NullPayload> { - AnnounceMessage(InetAddress laddr) { - super(new NullPayload(), laddr, new MessageID[0], new MessageID[0]); - } -} diff --git a/src/main/java/derms/net/rmulticast/Heartbeat.java b/src/main/java/derms/net/rmulticast/Heartbeat.java new file mode 100644 index 0000000..43eaaf5 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Heartbeat.java @@ -0,0 +1,63 @@ +package derms.net.rmulticast; + +import derms.net.ConcurrentMulticastSocket; +import derms.net.Packet; +import derms.util.Wait; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Set; +import java.util.logging.Logger; + +/** + * Periodically send acknowledgements to the group. Allows acks to propagate even when some + * processes wouldn't normally send messages. Also allows processes to populate their set of + * group members. + */ +class Heartbeat implements Runnable { + private static final Duration period = Duration.ofSeconds(30); + + private final InetSocketAddress group; + private final InetAddress laddr; + private final Set<MessageID> acks, nacks; + private final ConcurrentMulticastSocket outSock; + private final Logger log; + + Heartbeat(InetSocketAddress group, InetAddress laddr, Set<MessageID> acks, Set<MessageID> nacks, ConcurrentMulticastSocket outSock) throws IOException { + this.group = group; + this.laddr = laddr; + this.acks = acks; + this.nacks = nacks; + this.outSock = outSock; + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + try { + for (;;) { + try { + Wait.forDuration(period); + send(); + } catch (IOException e) { + log.warning(e.getMessage()); + } + } + } catch (InterruptedException e) { + log.info("Heartbeat thread interrupted: " + e.getMessage()); + } + } + + private void send() throws IOException { + HeartbeatMessage msg = new HeartbeatMessage( + laddr, + acks.toArray(new MessageID[0]), + nacks.toArray(new MessageID[0])); + DatagramPacket pkt = Packet.encode(msg, group); + outSock.send(pkt); + acks.clear(); + } +} diff --git a/src/main/java/derms/net/rmulticast/HeartbeatMessage.java b/src/main/java/derms/net/rmulticast/HeartbeatMessage.java new file mode 100644 index 0000000..a5a3c61 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/HeartbeatMessage.java @@ -0,0 +1,10 @@ +package derms.net.rmulticast; + +import java.net.InetAddress; + +/** A message whose only purpose is to propagate acknowledgements and process IDs to the group. */ +class HeartbeatMessage extends Message<NullPayload> { + HeartbeatMessage(InetAddress sender, MessageID[] acks, MessageID[] nacks) { + super(new NullPayload(), sender, acks, nacks); + } +} diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 0856a71..ea11c67 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -6,6 +6,7 @@ import derms.net.Packet; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; @@ -36,11 +37,10 @@ class Receive<T extends MessagePayload> implements Runnable { @Override public void run() { DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); - Message<T> msg; for (;;) { try { inSock.receive(pkt); - msg = Packet.decode(pkt, Message.class); + Message<?> msg = Packet.decode(pkt, Message.class); receive(msg); } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); @@ -48,19 +48,9 @@ class Receive<T extends MessagePayload> implements Runnable { } } - private void receive(Message<T> msg) { + private void receive(Message<?> msg) throws ClassCastException { groupMembers.add(msg.sender); - if (msg instanceof AnnounceMessage) - return; - - acks.add(msg.id()); - received.add(msg); - delivered.add(msg); - - nacks.remove(msg.id()); - retransmissions.remove(msg); - for (MessageID mid : msg.acks) { acks.remove(mid); if (!received.contains(mid)) @@ -68,11 +58,24 @@ class Receive<T extends MessagePayload> implements Runnable { } for (MessageID mid : msg.nacks) { - if (received.contains(mid)) { - retransmissions.add(msg); - } else { + try { + Message<T> rmsg = received.getByID(mid); + retransmissions.add(rmsg); + } catch (NoSuchElementException e) { nacks.add(mid); } } + + if (msg instanceof HeartbeatMessage) + return; // Only deliver in-band messages. + deliver((Message<T>) msg); + } + + private void deliver(Message<T> msg) { + acks.add(msg.id()); + nacks.remove(msg.id()); + received.add(msg); + retransmissions.remove(msg); + delivered.add(msg); } } diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 2ead37e..72b3019 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -61,9 +61,9 @@ public class ReliableMulticast<T extends MessagePayload> { (new Thread(new Prune<T>(received, groupMembers))).start(); try { - (new Thread(new Announce(group, laddr, outSock))).start(); + (new Thread(new Heartbeat(group, laddr, acks, nacks, outSock))).start(); } catch (IOException e) { - log.severe("Failed to start announce thread: " + e.getMessage()); + log.severe("Failed to start heartbeat thread: " + e.getMessage()); throw e; } } |