summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-15 11:39:46 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-15 11:39:46 -0500
commit3ebcee7b335d0e23915f3ec8e15c9995cd8d3004 (patch)
tree21de4fb61fc7adbf1cea7c8f4705f08132787d38 /src/main/java/derms/net/rmulticast
parented304eca556ff4780cbcc8b2615ebc6d945d46ae (diff)
downloadsoen423-3ebcee7b335d0e23915f3ec8e15c9995cd8d3004.zip
reliable multicast: Receive
Diffstat (limited to 'src/main/java/derms/net/rmulticast')
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java68
-rw-r--r--src/main/java/derms/net/rmulticast/ReceivedSet.java25
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java14
3 files changed, 96 insertions, 11 deletions
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
new file mode 100644
index 0000000..c8467c2
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -0,0 +1,68 @@
+package derms.net.rmulticast;
+
+import derms.net.ConcurrentMulticastSocket;
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.logging.Logger;
+
+class Receive<T extends Serializable & Hashable> implements Runnable {
+ private static final int bufSize = 8192;
+
+ private final ConcurrentMulticastSocket inSock;
+ private final Set<MessageID> positiveAcks;
+ private final Set<MessageID> negativeAcks;
+ private final ReceivedSet<T> received;
+ private final BlockingQueue<Message<T>> retransmissions;
+ private final Logger log;
+
+ Receive(ConcurrentMulticastSocket inSock, Set<MessageID> positiveAcks, Set<MessageID> negativeAcks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions) {
+ this.inSock = inSock;
+ this.positiveAcks = positiveAcks;
+ this.negativeAcks = negativeAcks;
+ this.received = received;
+ this.retransmissions = retransmissions;
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
+ Message<T> msg;
+ for (;;) {
+ try {
+ inSock.receive(pkt);
+ msg = Packet.decode(pkt, Message.class);
+ receive(msg);
+ } catch (IOException | ClassNotFoundException | ClassCastException e) {
+ log.warning(e.getMessage());
+ }
+ }
+ }
+
+ private void receive(Message<T> msg) {
+ positiveAcks.add(msg.id());
+ received.add(msg);
+
+ negativeAcks.remove(msg.id());
+ retransmissions.remove(msg);
+
+ for (MessageID mid : msg.positiveAcks) {
+ positiveAcks.remove(mid);
+ if (!received.contains(mid))
+ negativeAcks.add(mid);
+ }
+
+ for (MessageID mid : msg.negativeAcks) {
+ if (received.contains(mid)) {
+ retransmissions.add(msg);
+ } else {
+ negativeAcks.add(mid);
+ }
+ }
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java
new file mode 100644
index 0000000..339a714
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java
@@ -0,0 +1,25 @@
+package derms.net.rmulticast;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+class ReceivedSet<T extends Serializable & Hashable> {
+ private final Set<Message<T>> received;
+
+ ReceivedSet() {
+ this.received = new ConcurrentHashMap<Message<T>, Void>().keySet();
+ }
+
+ void add(Message<T> e) {
+ received.add(e);
+ }
+
+ // TODO: faster search.
+ boolean contains(MessageID mid) {
+ for (Message<T> msg : received)
+ if (msg.id().equals(mid))
+ return true;
+ return false;
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index 85d4f31..44fc10c 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -18,7 +18,7 @@ import java.util.logging.Logger;
public class ReliableMulticast<T extends Serializable & Hashable> {
private final Set<MessageID> positiveAcks; // Positively acknowledged messages.
private final Set<MessageID> negativeAcks; // Negatively acknowledged messages.
- private final Set<Message<T>> received;
+ private final ReceivedSet<T> received;
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
private final AtomicReference<Instant> lastSend;
private final SocketAddress group;
@@ -30,7 +30,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException {
this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
- this.received = new ConcurrentHashMap<Message<T>, Void>().keySet();
+ this.received = new ReceivedSet<T>();
this.retransmissions = new LinkedBlockingQueue<Message<T>>();
this.lastSend = new AtomicReference<Instant>(Instant.now());
@@ -48,7 +48,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.log = Logger.getLogger(this.getClass().getName());
- (new Thread(new Receive())).start();
+ (new Thread(new Receive<T>(inSock, positiveAcks, negativeAcks, received, retransmissions))).start();
(new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
}
@@ -64,12 +64,4 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
(new Thread(new Timeout<T>(msg, positiveAcks, retransmissions))).start();
lastSend.set(Instant.now());
}
-
- private class Receive implements Runnable {
- @Override
- public void run() {
- // TODO
- }
- }
-
}