summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-14 11:30:35 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-14 11:30:35 -0500
commitc6a6f1d4eb96d6d4854ee66fcf6156b638d80248 (patch)
treeb22247f5f524451f9f9a587a8ee3a4425238bbe0 /src/main/java/derms/net/rmulticast
parentf71a12bfda65c28dd03fa61163795bdd53e60914 (diff)
downloadsoen423-c6a6f1d4eb96d6d4854ee66fcf6156b638d80248.zip
ReliableMulticast: send()
Diffstat (limited to 'src/main/java/derms/net/rmulticast')
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java84
1 files changed, 84 insertions, 0 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
new file mode 100644
index 0000000..464cbb5
--- /dev/null
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -0,0 +1,84 @@
+package derms.net.rmulticast;
+
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.*;
+import java.time.Instant;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/** TODO */
+public class ReliableMulticast<T extends Serializable & Hashable> {
+ private final Set<MessageID> positiveAcks; // Positively acknowledged messages.
+ private final Set<MessageID> negativeAcks; // Negatively acknowledged messages.
+ private final Set<Message<T>> received;
+ private final Set<MessageID> retransmissions; // Messages pending retransmission.
+ private final AtomicReference<Instant> lastSend;
+ private final SocketAddress group;
+ private final MulticastSocket inSock, outSock;
+ private final InetAddress laddr; // Local address.
+ private final BlockingQueue<Message<T>> delivered;
+ private final Logger log;
+
+ public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException {
+ this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.received = new ConcurrentHashMap<Message<T>, Void>().keySet();
+ this.retransmissions = new ConcurrentHashMap<MessageID, Void>().keySet();
+ this.lastSend = new AtomicReference<Instant>(Instant.now());
+
+ this.group = group;
+
+ this.inSock = new MulticastSocket();
+ this.inSock.joinGroup(group.getAddress());
+
+ this.outSock = new MulticastSocket(group.getPort());
+ this.outSock.joinGroup(group.getAddress());
+
+ this.laddr = laddr;
+
+ this.delivered = new LinkedBlockingQueue<Message<T>>();
+ (new Thread(new Receiver())).start();
+
+ this.log = Logger.getLogger(this.getClass().getName());
+ }
+
+ public void send(T payload) throws IOException {
+ Message<T> msg = new Message<T>(
+ payload,
+ laddr,
+ positiveAcks.toArray(new MessageID[0]),
+ negativeAcks.toArray(new MessageID[0]));
+ DatagramPacket pkt = Packet.encode(msg, group);
+ outSock.send(pkt);
+ positiveAcks.clear();
+ (new Thread(new Timeout(msg.id()))).start();
+ lastSend.set(Instant.now());
+ }
+
+ private class Receiver implements Runnable {
+ @Override
+ public void run() {
+ // TODO
+ }
+ }
+
+ private class Timeout implements Runnable {
+ MessageID mid;
+
+ private Timeout(MessageID mid) {
+ this.mid = mid;
+ }
+
+ @Override
+ public void run() {
+ // TODO
+ }
+ }
+}