diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Receive.java | 4 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 4 |
2 files changed, 3 insertions, 5 deletions
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index c1965ed..f0d1909 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -21,11 +21,11 @@ class Receive<T extends Serializable & Hashable> implements Runnable { private final BlockingQueue<Message<T>> delivered; private final Logger log; - Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, BlockingQueue<Message<T>> delivered) { + Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, BlockingQueue<Message<T>> retransmissions, BlockingQueue<Message<T>> delivered) { this.inSock = inSock; this.positiveAcks = positiveAcks; this.negativeAcks = negativeAcks; - this.received = received; + this.received = new ReceivedSet<T>(); this.retransmissions = retransmissions; this.delivered = delivered; this.log = Logger.getLogger(this.getClass().getName()); diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 98c8681..8b39ae6 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -18,7 +18,6 @@ import java.util.logging.Logger; public class ReliableMulticast<T extends Serializable & Hashable> { private final Set<MessageID> positiveAcks; // Positively acknowledged messages. private final Set<MessageID> negativeAcks; // Negatively acknowledged messages. - private final ReceivedSet<T> received; private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final AtomicReference<Instant> lastSend; private final SocketAddress group; @@ -30,7 +29,6 @@ public class ReliableMulticast<T extends Serializable & Hashable> { public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); - this.received = new ReceivedSet<T>(); this.retransmissions = new LinkedBlockingQueue<Message<T>>(); this.lastSend = new AtomicReference<Instant>(Instant.now()); @@ -47,7 +45,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket(); inSock.joinGroup(group.getAddress()); - (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions, delivered))).start(); + (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, retransmissions, delivered))).start(); (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start(); } |