summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/Receive.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/rmulticast/Receive.java')
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java35
1 files changed, 19 insertions, 16 deletions
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index 0856a71..ea11c67 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -6,6 +6,7 @@ import derms.net.Packet;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
@@ -36,11 +37,10 @@ class Receive<T extends MessagePayload> implements Runnable {
@Override
public void run() {
DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
- Message<T> msg;
for (;;) {
try {
inSock.receive(pkt);
- msg = Packet.decode(pkt, Message.class);
+ Message<?> msg = Packet.decode(pkt, Message.class);
receive(msg);
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
@@ -48,19 +48,9 @@ class Receive<T extends MessagePayload> implements Runnable {
}
}
- private void receive(Message<T> msg) {
+ private void receive(Message<?> msg) throws ClassCastException {
groupMembers.add(msg.sender);
- if (msg instanceof AnnounceMessage)
- return;
-
- acks.add(msg.id());
- received.add(msg);
- delivered.add(msg);
-
- nacks.remove(msg.id());
- retransmissions.remove(msg);
-
for (MessageID mid : msg.acks) {
acks.remove(mid);
if (!received.contains(mid))
@@ -68,11 +58,24 @@ class Receive<T extends MessagePayload> implements Runnable {
}
for (MessageID mid : msg.nacks) {
- if (received.contains(mid)) {
- retransmissions.add(msg);
- } else {
+ try {
+ Message<T> rmsg = received.getByID(mid);
+ retransmissions.add(rmsg);
+ } catch (NoSuchElementException e) {
nacks.add(mid);
}
}
+
+ if (msg instanceof HeartbeatMessage)
+ return; // Only deliver in-band messages.
+ deliver((Message<T>) msg);
+ }
+
+ private void deliver(Message<T> msg) {
+ acks.add(msg.id());
+ nacks.remove(msg.id());
+ received.add(msg);
+ retransmissions.remove(msg);
+ delivered.add(msg);
}
}