package derms.net.rmulticast; import derms.net.ConcurrentMulticastSocket; import derms.net.Packet; import java.io.IOException; import java.io.Serializable; import java.net.*; import java.time.Instant; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; /** TODO */ public class ReliableMulticast { private final Set positiveAcks; // Positively acknowledged messages. private final Set negativeAcks; // Negatively acknowledged messages. private final ReceivedSet received; private final BlockingQueue> retransmissions; // Messages pending retransmission. private final AtomicReference lastSend; private final SocketAddress group; private final ConcurrentMulticastSocket inSock, outSock; private final InetAddress laddr; // Local address. private final BlockingQueue> delivered; private final Logger log; public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException { this.positiveAcks = new ConcurrentHashMap().keySet(); this.negativeAcks = new ConcurrentHashMap().keySet(); this.received = new ReceivedSet(); this.retransmissions = new LinkedBlockingQueue>(); this.lastSend = new AtomicReference(Instant.now()); this.group = group; this.inSock = new ConcurrentMulticastSocket(); this.inSock.joinGroup(group.getAddress()); this.outSock = new ConcurrentMulticastSocket(group.getPort()); this.outSock.joinGroup(group.getAddress()); this.laddr = laddr; this.delivered = new LinkedBlockingQueue>(); this.log = Logger.getLogger(this.getClass().getName()); (new Thread(new Receive(inSock, positiveAcks, negativeAcks, received, retransmissions, delivered))).start(); (new Thread(new Retransmit(retransmissions, outSock, group))).start(); } public void send(T payload) throws IOException { Message msg = new Message( payload, laddr, positiveAcks.toArray(new MessageID[0]), negativeAcks.toArray(new MessageID[0])); DatagramPacket pkt = Packet.encode(msg, group); outSock.send(pkt); positiveAcks.clear(); (new Thread(new Timeout(msg, positiveAcks, retransmissions))).start(); lastSend.set(Instant.now()); } public T receive() throws InterruptedException { Message msg = delivered.take(); return msg.payload; } }