diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 10:39:06 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 10:39:06 -0500 |
| commit | e0196b370e8da38d366d327da57186dd3a44c2b2 (patch) | |
| tree | 673384668db4a6bf39433224196c673d5ebfc9f5 /src/main/java/derms/net/rmulticast/Receive.java | |
| parent | abb23793ff85515237b8bd5d4b0e7e8e3be0af9a (diff) | |
| download | soen423-e0196b370e8da38d366d327da57186dd3a44c2b2.zip | |
reliable multicast: periodically send acks
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Receive.java')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Receive.java | 35 |
1 files changed, 19 insertions, 16 deletions
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 0856a71..ea11c67 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -6,6 +6,7 @@ import derms.net.Packet; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetAddress; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; @@ -36,11 +37,10 @@ class Receive<T extends MessagePayload> implements Runnable { @Override public void run() { DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); - Message<T> msg; for (;;) { try { inSock.receive(pkt); - msg = Packet.decode(pkt, Message.class); + Message<?> msg = Packet.decode(pkt, Message.class); receive(msg); } catch (IOException | ClassNotFoundException | ClassCastException e) { log.warning(e.getMessage()); @@ -48,19 +48,9 @@ class Receive<T extends MessagePayload> implements Runnable { } } - private void receive(Message<T> msg) { + private void receive(Message<?> msg) throws ClassCastException { groupMembers.add(msg.sender); - if (msg instanceof AnnounceMessage) - return; - - acks.add(msg.id()); - received.add(msg); - delivered.add(msg); - - nacks.remove(msg.id()); - retransmissions.remove(msg); - for (MessageID mid : msg.acks) { acks.remove(mid); if (!received.contains(mid)) @@ -68,11 +58,24 @@ class Receive<T extends MessagePayload> implements Runnable { } for (MessageID mid : msg.nacks) { - if (received.contains(mid)) { - retransmissions.add(msg); - } else { + try { + Message<T> rmsg = received.getByID(mid); + retransmissions.add(rmsg); + } catch (NoSuchElementException e) { nacks.add(mid); } } + + if (msg instanceof HeartbeatMessage) + return; // Only deliver in-band messages. + deliver((Message<T>) msg); + } + + private void deliver(Message<T> msg) { + acks.add(msg.id()); + nacks.remove(msg.id()); + received.add(msg); + retransmissions.remove(msg); + delivered.add(msg); } } |