From c6a6f1d4eb96d6d4854ee66fcf6156b638d80248 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Thu, 14 Nov 2024 11:30:35 -0500 Subject: ReliableMulticast: send() --- .../derms/net/rmulticast/ReliableMulticast.java | 84 ++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/main/java/derms/net/rmulticast/ReliableMulticast.java diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java new file mode 100644 index 0000000..464cbb5 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -0,0 +1,84 @@ +package derms.net.rmulticast; + +import derms.net.Packet; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.time.Instant; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** TODO */ +public class ReliableMulticast { + private final Set positiveAcks; // Positively acknowledged messages. + private final Set negativeAcks; // Negatively acknowledged messages. + private final Set> received; + private final Set retransmissions; // Messages pending retransmission. + private final AtomicReference lastSend; + private final SocketAddress group; + private final MulticastSocket inSock, outSock; + private final InetAddress laddr; // Local address. + private final BlockingQueue> delivered; + private final Logger log; + + public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { + this.positiveAcks = new ConcurrentHashMap().keySet(); + this.negativeAcks = new ConcurrentHashMap().keySet(); + this.received = new ConcurrentHashMap, Void>().keySet(); + this.retransmissions = new ConcurrentHashMap().keySet(); + this.lastSend = new AtomicReference(Instant.now()); + + this.group = group; + + this.inSock = new MulticastSocket(); + this.inSock.joinGroup(group.getAddress()); + + this.outSock = new MulticastSocket(group.getPort()); + this.outSock.joinGroup(group.getAddress()); + + this.laddr = laddr; + + this.delivered = new LinkedBlockingQueue>(); + (new Thread(new Receiver())).start(); + + this.log = Logger.getLogger(this.getClass().getName()); + } + + public void send(T payload) throws IOException { + Message msg = new Message( + payload, + laddr, + positiveAcks.toArray(new MessageID[0]), + negativeAcks.toArray(new MessageID[0])); + DatagramPacket pkt = Packet.encode(msg, group); + outSock.send(pkt); + positiveAcks.clear(); + (new Thread(new Timeout(msg.id()))).start(); + lastSend.set(Instant.now()); + } + + private class Receiver implements Runnable { + @Override + public void run() { + // TODO + } + } + + private class Timeout implements Runnable { + MessageID mid; + + private Timeout(MessageID mid) { + this.mid = mid; + } + + @Override + public void run() { + // TODO + } + } +} -- cgit v1.2.3