diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Message.java | 14 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Prune.java | 4 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Receive.java | 26 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 18 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Timeout.java | 12 |
5 files changed, 40 insertions, 34 deletions
diff --git a/src/main/java/derms/net/rmulticast/Message.java b/src/main/java/derms/net/rmulticast/Message.java index ab2d9ac..180b245 100644 --- a/src/main/java/derms/net/rmulticast/Message.java +++ b/src/main/java/derms/net/rmulticast/Message.java @@ -6,14 +6,18 @@ import java.net.InetAddress; class Message<T extends Serializable & Hashable> implements Serializable { T payload; InetAddress sender; - MessageID[] positiveAcks; // IDs of messages that this message positively acknowledges. - MessageID[] negativeAcks; // IDs of messages that this message negatively acknowledges. + MessageID[] acks; // IDs of messages that this message positively acknowledges. + MessageID[] nacks; // IDs of messages that this message negatively acknowledges. - Message(T payload, InetAddress sender, MessageID[] positiveAcks, MessageID[] negativeAcks) { + /** + * @param acks IDs of messages that this message positively acknowledges. + * @param nacks IDs of messages that this message negatively acknowledges. + */ + Message(T payload, InetAddress sender, MessageID[] acks, MessageID[] nacks) { this.payload = payload; this.sender = sender; - this.positiveAcks = positiveAcks; - this.negativeAcks = negativeAcks; + this.acks = acks; + this.nacks = nacks; } MessageID id() { diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java index d70f2e6..e748248 100644 --- a/src/main/java/derms/net/rmulticast/Prune.java +++ b/src/main/java/derms/net/rmulticast/Prune.java @@ -91,7 +91,7 @@ class Prune<T extends Serializable & Hashable> implements Runnable { potentialPredecessors.removeAll(seq); // Add messages that b positively acknowledged. - for (MessageID mid : b.positiveAcks) { + for (MessageID mid : b.acks) { try { Message<T> msg = received.getByID(mid); potentialPredecessors.add(msg); @@ -101,7 +101,7 @@ class Prune<T extends Serializable & Hashable> implements Runnable { } // Remove messages that c negatively acknowledged. - for (MessageID mid : c.negativeAcks) { + for (MessageID mid : c.nacks) { for (int i = 0; i < potentialPredecessors.size(); i++) { Message<T> msg = potentialPredecessors.get(i); if (msg.id().equals(mid)) { diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 93a56a8..f20f833 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -15,18 +15,18 @@ class Receive<T extends Serializable & Hashable> implements Runnable { private static final int bufSize = 8192; private final ConcurrentMulticastSocket inSock; - private final Set<MessageID> positiveAcks; - private final Set<MessageID> negativeAcks; - private final ReceivedSet<T> received; + private final Set<MessageID> acks; + private final Set<MessageID> nacks; // Positively acknowledged messages. + private final ReceivedSet<T> received; // Negatively acknowledged messages. private final BlockingQueue<Message<T>> retransmissions; private final Set<InetAddress> groupMembers; private final BlockingQueue<Message<T>> delivered; private final Logger log; - Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) { + Receive(ConcurrentMulticastSocket inSock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) { this.inSock = inSock; - this.positiveAcks = positiveAcks; - this.negativeAcks = negativeAcks; + this.acks = acks; + this.nacks = nacks; this.received = received; this.retransmissions = retransmissions; this.groupMembers = groupMembers; @@ -50,26 +50,26 @@ class Receive<T extends Serializable & Hashable> implements Runnable { } private void receive(Message<T> msg) { - positiveAcks.add(msg.id()); + acks.add(msg.id()); received.add(msg); delivered.add(msg); groupMembers.add(msg.sender); - negativeAcks.remove(msg.id()); + nacks.remove(msg.id()); retransmissions.remove(msg); - for (MessageID mid : msg.positiveAcks) { - positiveAcks.remove(mid); + for (MessageID mid : msg.acks) { + acks.remove(mid); if (!received.contains(mid)) - negativeAcks.add(mid); + nacks.add(mid); } - for (MessageID mid : msg.negativeAcks) { + for (MessageID mid : msg.nacks) { if (received.contains(mid)) { retransmissions.add(msg); } else { - negativeAcks.add(mid); + nacks.add(mid); } } } diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 0798e68..528bb41 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -18,8 +18,8 @@ import java.util.logging.Logger; public class ReliableMulticast<T extends Serializable & Hashable> { private final SocketAddress group; private final InetAddress laddr; // Local address. - private final Set<MessageID> positiveAcks; // Positively acknowledged messages. - private final Set<MessageID> negativeAcks; // Negatively acknowledged messages. + private final Set<MessageID> acks; // Positively acknowledged messages. + private final Set<MessageID> nacks; // Negatively acknowledged messages. private final ReceivedSet<T> received; private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final Set<InetAddress> groupMembers; @@ -32,8 +32,8 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.group = group; this.laddr = laddr; - this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); - this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.acks = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.nacks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.received = new ReceivedSet<T>(); this.retransmissions = new LinkedBlockingQueue<Message<T>>(); this.groupMembers = new ConcurrentHashMap<InetAddress, Void>().keySet(); @@ -48,7 +48,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket(); inSock.joinGroup(group.getAddress()); - (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions, groupMembers, delivered))).start(); + (new Thread(new Receive<T>(inSock, acks, nacks, received, retransmissions, groupMembers, delivered))).start(); (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start(); @@ -59,12 +59,12 @@ public class ReliableMulticast<T extends Serializable & Hashable> { Message<T> msg = new Message<T>( payload, laddr, - positiveAcks.toArray(new MessageID[0]), - negativeAcks.toArray(new MessageID[0])); + acks.toArray(new MessageID[0]), + nacks.toArray(new MessageID[0])); DatagramPacket pkt = Packet.encode(msg, group); outSock.send(pkt); - positiveAcks.clear(); - (new Thread(new Timeout<T>(msg, positiveAcks, retransmissions))).start(); + acks.clear(); + (new Thread(new Timeout<T>(msg, acks, retransmissions))).start(); lastSend.set(Instant.now()); } diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java index 2ccccd1..7b16f67 100644 --- a/src/main/java/derms/net/rmulticast/Timeout.java +++ b/src/main/java/derms/net/rmulticast/Timeout.java @@ -2,7 +2,6 @@ package derms.net.rmulticast; import java.io.Serializable; import java.time.Duration; -import java.time.Instant; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; @@ -12,13 +11,16 @@ class Timeout<T extends Serializable & Hashable> implements Runnable { private static final Duration timeout = Duration.ofSeconds(1); private final Message<T> msg; - private final Set<MessageID> positiveAcks; + private final Set<MessageID> acks; // Positively acknowledged messages. private final BlockingQueue<Message<T>> retransmissions; private final Logger log; - Timeout(Message<T> msg, Set<MessageID> positiveAcks, BlockingQueue<Message<T>> retransmissions) { + /** + * @param acks Positively acknowledged messages. + */ + Timeout(Message<T> msg, Set<MessageID> acks, BlockingQueue<Message<T>> retransmissions) { this.msg = msg; - this.positiveAcks = positiveAcks; + this.acks = acks; this.retransmissions = retransmissions; this.log = Logger.getLogger(this.getClass().getName()); } @@ -28,7 +30,7 @@ class Timeout<T extends Serializable & Hashable> implements Runnable { try { for (;;) { Wait.forDuration(timeout); - if (positiveAcks.contains(msg.id())) { + if (acks.contains(msg.id())) { log.info("Message " + msg.id() + "positively ack'ed."); return; } else { |