diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-14 11:30:35 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-14 11:30:35 -0500 |
| commit | c6a6f1d4eb96d6d4854ee66fcf6156b638d80248 (patch) | |
| tree | b22247f5f524451f9f9a587a8ee3a4425238bbe0 /src | |
| parent | f71a12bfda65c28dd03fa61163795bdd53e60914 (diff) | |
| download | soen423-c6a6f1d4eb96d6d4854ee66fcf6156b638d80248.zip | |
ReliableMulticast: send()
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java new file mode 100644 index 0000000..464cbb5 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -0,0 +1,84 @@ +package derms.net.rmulticast; + +import derms.net.Packet; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.time.Instant; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** TODO */ +public class ReliableMulticast<T extends Serializable & Hashable> { + private final Set<MessageID> positiveAcks; // Positively acknowledged messages. + private final Set<MessageID> negativeAcks; // Negatively acknowledged messages. + private final Set<Message<T>> received; + private final Set<MessageID> retransmissions; // Messages pending retransmission. + private final AtomicReference<Instant> lastSend; + private final SocketAddress group; + private final MulticastSocket inSock, outSock; + private final InetAddress laddr; // Local address. + private final BlockingQueue<Message<T>> delivered; + private final Logger log; + + public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { + this.positiveAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.negativeAcks = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.received = new ConcurrentHashMap<Message<T>, Void>().keySet(); + this.retransmissions = new ConcurrentHashMap<MessageID, Void>().keySet(); + this.lastSend = new AtomicReference<Instant>(Instant.now()); + + this.group = group; + + this.inSock = new MulticastSocket(); + this.inSock.joinGroup(group.getAddress()); + + this.outSock = new MulticastSocket(group.getPort()); + this.outSock.joinGroup(group.getAddress()); + + this.laddr = laddr; + + this.delivered = new LinkedBlockingQueue<Message<T>>(); + (new Thread(new Receiver())).start(); + + this.log = Logger.getLogger(this.getClass().getName()); + } + + public void send(T payload) throws IOException { + Message<T> msg = new Message<T>( + payload, + laddr, + positiveAcks.toArray(new MessageID[0]), + negativeAcks.toArray(new MessageID[0])); + DatagramPacket pkt = Packet.encode(msg, group); + outSock.send(pkt); + positiveAcks.clear(); + (new Thread(new Timeout(msg.id()))).start(); + lastSend.set(Instant.now()); + } + + private class Receiver implements Runnable { + @Override + public void run() { + // TODO + } + } + + private class Timeout implements Runnable { + MessageID mid; + + private Timeout(MessageID mid) { + this.mid = mid; + } + + @Override + public void run() { + // TODO + } + } +} |