From 3ebcee7b335d0e23915f3ec8e15c9995cd8d3004 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Fri, 15 Nov 2024 11:39:46 -0500 Subject: reliable multicast: Receive --- src/main/java/derms/net/rmulticast/Receive.java | 68 ++++++++++++++++++++++ .../java/derms/net/rmulticast/ReceivedSet.java | 25 ++++++++ .../derms/net/rmulticast/ReliableMulticast.java | 14 +---- 3 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 src/main/java/derms/net/rmulticast/Receive.java create mode 100644 src/main/java/derms/net/rmulticast/ReceivedSet.java (limited to 'src/main/java/derms/net/rmulticast') diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java new file mode 100644 index 0000000..c8467c2 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -0,0 +1,68 @@ +package derms.net.rmulticast; + +import derms.net.ConcurrentMulticastSocket; +import derms.net.Packet; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Logger; + +class Receive implements Runnable { + private static final int bufSize = 8192; + + private final ConcurrentMulticastSocket inSock; + private final Set positiveAcks; + private final Set negativeAcks; + private final ReceivedSet received; + private final BlockingQueue> retransmissions; + private final Logger log; + + Receive(ConcurrentMulticastSocket inSock, Set positiveAcks, Set negativeAcks, ReceivedSet received, BlockingQueue> retransmissions) { + this.inSock = inSock; + this.positiveAcks = positiveAcks; + this.negativeAcks = negativeAcks; + this.received = received; + this.retransmissions = retransmissions; + this.log = Logger.getLogger(this.getClass().getName()); + } + + @Override + public void run() { + DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + Message msg; + for (;;) { + try { + inSock.receive(pkt); + msg = Packet.decode(pkt, Message.class); + receive(msg); + } catch (IOException | ClassNotFoundException | ClassCastException e) { + log.warning(e.getMessage()); + } + } + } + + private void receive(Message msg) { + positiveAcks.add(msg.id()); + received.add(msg); + + negativeAcks.remove(msg.id()); + retransmissions.remove(msg); + + for (MessageID mid : msg.positiveAcks) { + positiveAcks.remove(mid); + if (!received.contains(mid)) + negativeAcks.add(mid); + } + + for (MessageID mid : msg.negativeAcks) { + if (received.contains(mid)) { + retransmissions.add(msg); + } else { + negativeAcks.add(mid); + } + } + } +} diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java new file mode 100644 index 0000000..339a714 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java @@ -0,0 +1,25 @@ +package derms.net.rmulticast; + +import java.io.Serializable; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +class ReceivedSet { + private final Set> received; + + ReceivedSet() { + this.received = new ConcurrentHashMap, Void>().keySet(); + } + + void add(Message e) { + received.add(e); + } + + // TODO: faster search. + boolean contains(MessageID mid) { + for (Message msg : received) + if (msg.id().equals(mid)) + return true; + return false; + } +} diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 85d4f31..44fc10c 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -18,7 +18,7 @@ import java.util.logging.Logger; public class ReliableMulticast { private final Set positiveAcks; // Positively acknowledged messages. private final Set negativeAcks; // Negatively acknowledged messages. - private final Set> received; + private final ReceivedSet received; private final BlockingQueue> retransmissions; // Messages pending retransmission. private final AtomicReference lastSend; private final SocketAddress group; @@ -30,7 +30,7 @@ public class ReliableMulticast { public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { this.positiveAcks = new ConcurrentHashMap().keySet(); this.negativeAcks = new ConcurrentHashMap().keySet(); - this.received = new ConcurrentHashMap, Void>().keySet(); + this.received = new ReceivedSet(); this.retransmissions = new LinkedBlockingQueue>(); this.lastSend = new AtomicReference(Instant.now()); @@ -48,7 +48,7 @@ public class ReliableMulticast { this.log = Logger.getLogger(this.getClass().getName()); - (new Thread(new Receive())).start(); + (new Thread(new Receive(inSock, positiveAcks, negativeAcks, received, retransmissions))).start(); (new Thread(new Retransmit(retransmissions, outSock, group))).start(); } @@ -64,12 +64,4 @@ public class ReliableMulticast { (new Thread(new Timeout(msg, positiveAcks, retransmissions))).start(); lastSend.set(Instant.now()); } - - private class Receive implements Runnable { - @Override - public void run() { - // TODO - } - } - } -- cgit v1.2.3