summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/derms/net/rmulticast/Message.java14
-rw-r--r--src/main/java/derms/net/rmulticast/Prune.java4
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java26
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java18
-rw-r--r--src/main/java/derms/net/rmulticast/Timeout.java12
5 files changed, 40 insertions, 34 deletions
diff --git a/src/main/java/derms/net/rmulticast/Message.java b/src/main/java/derms/net/rmulticast/Message.java
index ab2d9ac..180b245 100644
--- a/src/main/java/derms/net/rmulticast/Message.java
+++ b/src/main/java/derms/net/rmulticast/Message.java
@@ -6,14 +6,18 @@ import java.net.InetAddress;
class Message<T extends Serializable & Hashable> implements Serializable {
T payload;
InetAddress sender;
- MessageID[] positiveAcks; // IDs of messages that this message positively acknowledges.
- MessageID[] negativeAcks; // IDs of messages that this message negatively acknowledges.
+ MessageID[] acks; // IDs of messages that this message positively acknowledges.
+ MessageID[] nacks; // IDs of messages that this message negatively acknowledges.
- Message(T payload, InetAddress sender, MessageID[] positiveAcks, MessageID[] negativeAcks) {
+ /**
+ * @param acks IDs of messages that this message positively acknowledges.
+ * @param nacks IDs of messages that this message negatively acknowledges.
+ */
+ Message(T payload, InetAddress sender, MessageID[] acks, MessageID[] nacks) {
this.payload = payload;
this.sender = sender;
- this.positiveAcks = positiveAcks;
- this.negativeAcks = negativeAcks;
+ this.acks = acks;
+ this.nacks = nacks;
}
MessageID id() {
diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java
index d70f2e6..e748248 100644
--- a/src/main/java/derms/net/rmulticast/Prune.java
+++ b/src/main/java/derms/net/rmulticast/Prune.java
@@ -91,7 +91,7 @@ class Prune<T extends Serializable & Hashable> implements Runnable {
potentialPredecessors.removeAll(seq);
// Add messages that b positively acknowledged.
- for (MessageID mid : b.positiveAcks) {
+ for (MessageID mid : b.acks) {
try {
Message<T> msg = received.getByID(mid);
potentialPredecessors.add(msg);
@@ -101,7 +101,7 @@ class Prune<T extends Serializable & Hashable> implements Runnable {
}
// Remove messages that c negatively acknowledged.
- for (MessageID mid : c.negativeAcks) {
+ for (MessageID mid : c.nacks) {
for (int i = 0; i < potentialPredecessors.size(); i++) {
Message<T> msg = potentialPredecessors.get(i);
if (msg.id().equals(mid)) {
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index 93a56a8..f20f833 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -15,18 +15,18 @@ class Receive<T extends Serializable & Hashable> implements Runnable {
private static final int bufSize = 8192;
private final ConcurrentMulticastSocket inSock;
- private final Set<MessageID> positiveAcks;
- private final Set<MessageID> negativeAcks;
- private final ReceivedSet<T> received;
+ private final Set<MessageID> acks;
+ private final Set<MessageID> nacks; // Positively acknowledged messages.
+ private final ReceivedSet<T> received; // Negatively acknowledged messages.
private final BlockingQueue<Message<T>> retransmissions;
private final Set<InetAddress> groupMembers;
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
- Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) {
+ Receive(ConcurrentMulticastSocket inSock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) {
this.inSock = inSock;
- this.positiveAcks = positiveAcks;
- this.negativeAcks = negativeAcks;
+ this.acks = acks;
+ this.nacks = nacks;
this.received = received;
this.retransmissions = retransmissions;
this.groupMembers = groupMembers;
@@ -50,26 +50,26 @@ class Receive<T extends Serializable & Hashable> implements Runnable {
}
private void receive(Message<T> msg) {
- positiveAcks.add(msg.id());
+ acks.add(msg.id());
received.add(msg);
delivered.add(msg);
groupMembers.add(msg.sender);
- negativeAcks.remove(msg.id());
+ nacks.remove(msg.id());
retransmissions.remove(msg);
- for (MessageID mid : msg.positiveAcks) {
- positiveAcks.remove(mid);
+ for (MessageID mid : msg.acks) {
+ acks.remove(mid);
if (!received.contains(mid))
- negativeAcks.add(mid);
+ nacks.add(mid);
}
- for (MessageID mid : msg.negativeAcks) {
+ for (MessageID mid : msg.nacks) {
if (received.contains(mid)) {
retransmissions.add(msg);
} else {
- negativeAcks.add(mid);
+ nacks.add(mid);
}
}
}
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index 0798e68..528bb41 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -18,8 +18,8 @@ import java.util.logging.Logger;
public class ReliableMulticast<T extends Serializable & Hashable> {
private final SocketAddress group;
private final InetAddress laddr; // Local address.
- private final Set<MessageID> positiveAcks; // Positively acknowledged messages.
- private final Set<MessageID> negativeAcks; // Negatively acknowledged messages.
+ private final Set<MessageID> acks; // Positively acknowledged messages.
+ private final Set<MessageID> nacks; // Negatively acknowledged messages.
private final ReceivedSet<T> received;
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
private final Set<InetAddress> groupMembers;
@@ -32,8 +32,8 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.group = group;
this.laddr = laddr;
- this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
- this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.acks = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.nacks = new ConcurrentHashMap<MessageID, Void>().keySet();
this.received = new ReceivedSet<T>();
this.retransmissions = new LinkedBlockingQueue<Message<T>>();
this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet();
@@ -48,7 +48,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket();
inSock.joinGroup(group.getAddress());
- (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions, groupMembers, delivered))).start();
+ (new Thread(new Receive<T>(inSock, acks, nacks, received, retransmissions, groupMembers, delivered))).start();
(new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
@@ -59,12 +59,12 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
Message<T> msg = new Message<T>(
payload,
laddr,
- positiveAcks.toArray(new MessageID[0]),
- negativeAcks.toArray(new MessageID[0]));
+ acks.toArray(new MessageID[0]),
+ nacks.toArray(new MessageID[0]));
DatagramPacket pkt = Packet.encode(msg, group);
outSock.send(pkt);
- positiveAcks.clear();
- (new Thread(new Timeout<T>(msg, positiveAcks, retransmissions))).start();
+ acks.clear();
+ (new Thread(new Timeout<T>(msg, acks, retransmissions))).start();
lastSend.set(Instant.now());
}
diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java
index 2ccccd1..7b16f67 100644
--- a/src/main/java/derms/net/rmulticast/Timeout.java
+++ b/src/main/java/derms/net/rmulticast/Timeout.java
@@ -2,7 +2,6 @@ package derms.net.rmulticast;
import java.io.Serializable;
import java.time.Duration;
-import java.time.Instant;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
@@ -12,13 +11,16 @@ class Timeout<T extends Serializable & Hashable> implements Runnable {
private static final Duration timeout = Duration.ofSeconds(1);
private final Message<T> msg;
- private final Set<MessageID> positiveAcks;
+ private final Set<MessageID> acks; // Positively acknowledged messages.
private final BlockingQueue<Message<T>> retransmissions;
private final Logger log;
- Timeout(Message<T> msg, Set<MessageID> positiveAcks, BlockingQueue<Message<T>> retransmissions) {
+ /**
+ * @param acks Positively acknowledged messages.
+ */
+ Timeout(Message<T> msg, Set<MessageID> acks, BlockingQueue<Message<T>> retransmissions) {
this.msg = msg;
- this.positiveAcks = positiveAcks;
+ this.acks = acks;
this.retransmissions = retransmissions;
this.log = Logger.getLogger(this.getClass().getName());
}
@@ -28,7 +30,7 @@ class Timeout<T extends Serializable & Hashable> implements Runnable {
try {
for (;;) {
Wait.forDuration(timeout);
- if (positiveAcks.contains(msg.id())) {
+ if (acks.contains(msg.id())) {
log.info("Message " + msg.id() + "positively ack'ed.");
return;
} else {