diff options
22 files changed, 274 insertions, 11 deletions
diff --git a/src/main/java/derms/net/ConcurrentDatagramSocket.java b/src/main/java/derms/net/ConcurrentDatagramSocket.java index ee5dbb2..44d9186 100644 --- a/src/main/java/derms/net/ConcurrentDatagramSocket.java +++ b/src/main/java/derms/net/ConcurrentDatagramSocket.java @@ -1,10 +1,7 @@ package derms.net; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.SocketException; +import java.net.*; import java.time.Duration; public class ConcurrentDatagramSocket { @@ -22,6 +19,10 @@ public class ConcurrentDatagramSocket { sock.connect(address, port); } + public synchronized void connect(InetSocketAddress addr) { + connect(addr.getAddress(), addr.getPort()); + } + public synchronized InetAddress getLocalAddress() { return sock.getLocalAddress(); } @@ -30,6 +31,10 @@ public class ConcurrentDatagramSocket { return sock.getLocalPort(); } + public synchronized SocketAddress getRemoteSocketAddress() { + return sock.getRemoteSocketAddress(); + } + public synchronized void receive(DatagramPacket p) throws IOException { sock.receive(p); } diff --git a/src/main/java/derms/net/rmulticast/Hashable.java b/src/main/java/derms/net/Hashable.java index f00f62b..addfcb4 100644 --- a/src/main/java/derms/net/rmulticast/Hashable.java +++ b/src/main/java/derms/net/Hashable.java @@ -1,4 +1,4 @@ -package derms.net.rmulticast; +package derms.net; public interface Hashable { int hash(); diff --git a/src/main/java/derms/net/rmulticast/MessagePayload.java b/src/main/java/derms/net/MessagePayload.java index ca36831..54f1966 100644 --- a/src/main/java/derms/net/rmulticast/MessagePayload.java +++ b/src/main/java/derms/net/MessagePayload.java @@ -1,4 +1,4 @@ -package derms.net.rmulticast; +package derms.net; import java.io.Serializable; diff --git a/src/main/java/derms/net/rmulticast/Message.java b/src/main/java/derms/net/rmulticast/Message.java index 21f8d19..281eb4f 100644 --- a/src/main/java/derms/net/rmulticast/Message.java +++ b/src/main/java/derms/net/rmulticast/Message.java @@ -1,5 +1,7 @@ package derms.net.rmulticast; +import derms.net.MessagePayload; + import java.io.Serializable; import java.net.InetAddress; diff --git a/src/main/java/derms/net/rmulticast/NullPayload.java b/src/main/java/derms/net/rmulticast/NullPayload.java index 4c818e4..e051d2f 100644 --- a/src/main/java/derms/net/rmulticast/NullPayload.java +++ b/src/main/java/derms/net/rmulticast/NullPayload.java @@ -1,5 +1,7 @@ package derms.net.rmulticast; +import derms.net.MessagePayload; + class NullPayload implements MessagePayload { @Override public int hash() { diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java index 04f8423..77ab6a7 100644 --- a/src/main/java/derms/net/rmulticast/Prune.java +++ b/src/main/java/derms/net/rmulticast/Prune.java @@ -1,5 +1,6 @@ package derms.net.rmulticast; +import derms.net.MessagePayload; import derms.util.Wait; import java.net.InetAddress; diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 3d25e75..c9e1acf 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -1,6 +1,7 @@ package derms.net.rmulticast; import derms.net.ConcurrentMulticastSocket; +import derms.net.MessagePayload; import derms.net.Packet; import java.io.IOException; diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java index 5bd3d85..47a3791 100644 --- a/src/main/java/derms/net/rmulticast/ReceivedSet.java +++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java @@ -1,5 +1,7 @@ package derms.net.rmulticast; +import derms.net.MessagePayload; + import java.net.InetAddress; import java.time.Instant; import java.util.*; diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index 6edab5d..c201e3a 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -1,6 +1,7 @@ package derms.net.rmulticast; import derms.net.ConcurrentMulticastSocket; +import derms.net.MessagePayload; import derms.net.Packet; import java.io.IOException; diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java index 6205697..9d5e10b 100644 --- a/src/main/java/derms/net/rmulticast/Retransmit.java +++ b/src/main/java/derms/net/rmulticast/Retransmit.java @@ -1,6 +1,7 @@ package derms.net.rmulticast; import derms.net.ConcurrentMulticastSocket; +import derms.net.MessagePayload; import derms.net.Packet; import java.net.DatagramPacket; diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java index bfae630..5c47e0c 100644 --- a/src/main/java/derms/net/rmulticast/Timeout.java +++ b/src/main/java/derms/net/rmulticast/Timeout.java @@ -1,5 +1,6 @@ package derms.net.rmulticast; +import derms.net.MessagePayload; import derms.util.Wait; import java.time.Duration; diff --git a/src/main/java/derms/net/runicast/Ack.java b/src/main/java/derms/net/runicast/Ack.java new file mode 100644 index 0000000..df2211c --- /dev/null +++ b/src/main/java/derms/net/runicast/Ack.java @@ -0,0 +1,12 @@ +package derms.net.runicast; + +import java.io.Serializable; + +/** A message acknowledgement. */ +class Ack implements Serializable { + final long seq; // The sequence number of the acknowledged message. + + Ack(long seq) { + this.seq = seq; + } +} diff --git a/src/main/java/derms/net/runicast/Message.java b/src/main/java/derms/net/runicast/Message.java new file mode 100644 index 0000000..802cdb5 --- /dev/null +++ b/src/main/java/derms/net/runicast/Message.java @@ -0,0 +1,33 @@ +package derms.net.runicast; + +import derms.net.MessagePayload; + +import java.io.Serializable; + +class Message<T extends MessagePayload> implements Serializable { + final long seq; // Sequence number. + final T payload; + + Message(long seq, T payload) { + this.seq = seq; + this.payload = payload; + } + + @Override + public int hashCode() { return (int) seq * payload.hash(); } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + if (obj.getClass() != this.getClass()) + return false; + Message<?> other = (Message<?>) obj; + if (other.payload.getClass() != this.payload.getClass()) + return false; + return other.seq == this.seq && other.payload.equals(this.payload); + } + + @Override + public String toString() { return getClass().getSimpleName() + "{" + seq + ", " + payload + "}"; } +} diff --git a/src/main/java/derms/net/runicast/ReceiveAcks.java b/src/main/java/derms/net/runicast/ReceiveAcks.java new file mode 100644 index 0000000..0f585ff --- /dev/null +++ b/src/main/java/derms/net/runicast/ReceiveAcks.java @@ -0,0 +1,59 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.SocketTimeoutException; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** Receive acknowledgements. Remove messages from the sent queue once they are acknowledged. */ +class ReceiveAcks<T extends MessagePayload> implements Runnable { + private static final int bufSize = 8192; + + private final AtomicLong unacked; + private final Queue<Message<T>> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + + ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) { + this.unacked = unacked; + this.sent = sent; + this.sock = sock; + this.log = Logger.getLogger(getClass().getName()); + } + + @Override + public void run() { + DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + for (;;) { + try { + sock.receive(pkt); + Ack ack = Packet.decode(pkt, Ack.class); + recvAck(ack.seq); + } catch (SocketTimeoutException e) { + if (Thread.interrupted()) { + log.info("Interrupted."); + return; + } + } catch (IOException | ClassNotFoundException | ClassCastException e) { + log.warning(e.getMessage()); + } + } + } + + private void recvAck(long ack) { + unacked.updateAndGet((unacked) -> { + if (ack >= unacked) + return ack+1; + return unacked; + }); + + while (!sent.isEmpty() && sent.peek().seq <= ack) + sent.remove(); + } +} diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java new file mode 100644 index 0000000..3d63a6f --- /dev/null +++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java @@ -0,0 +1,4 @@ +package derms.net.runicast; + +public class ReliableUnicastReceiver { +} diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java new file mode 100644 index 0000000..2a0943a --- /dev/null +++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java @@ -0,0 +1,82 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +public class ReliableUnicastSender<T extends MessagePayload> { + private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + private final AtomicLong next; // Next sequence number. + private final AtomicLong unacked; // Sequence number of first unacknowledged message. + private final Queue<Message<T>> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + private final ExecutorService pool; + + /** + * @param raddr Remote IP address to connect to. + */ + ReliableUnicastSender(InetSocketAddress raddr) throws IOException { + this.next = new AtomicLong(0); + this.unacked = new AtomicLong(0); + this.sent = new LinkedBlockingQueue<Message<T>>(); + this.sock = new ConcurrentDatagramSocket(); + this.sock.connect(raddr); + this.sock.setSoTimeout(soTimeout); + this.log = Logger.getLogger(getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new ReceiveAcks<T>(unacked, sent, sock)); + pool.execute(new Retransmit<T>(unacked, sent, sock)); + } + + public void send(T payload) throws IOException { + Message<T> msg = new Message<T>(next.get(), payload); + DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); + sock.send(pkt); + sent.add(msg); + next.incrementAndGet(); + } + + /** Wait for all messages to be acknowledged and close the connection. */ + public void close() throws InterruptedException { + // Wait for receiver to acknowledge all sent messages. + while (unacked.get() < next.get()) { + Thread.yield(); + if (Thread.interrupted()) + throw new InterruptedException(); + } + + closeNow(); + } + + /** Close the connection immediately, without waiting for acknowledgements. */ + public void closeNow() { + log.info("Shutting down."); + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + + sock.close(); + } +} diff --git a/src/main/java/derms/net/runicast/Retransmit.java b/src/main/java/derms/net/runicast/Retransmit.java new file mode 100644 index 0000000..affd00c --- /dev/null +++ b/src/main/java/derms/net/runicast/Retransmit.java @@ -0,0 +1,57 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; +import derms.util.Wait; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** Retransmit unacknowledged messages. */ +class Retransmit<T extends MessagePayload> implements Runnable { + private static final Duration timeout = Duration.ofMillis(500); + + private final AtomicLong unacked; + private final Queue<Message<T>> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + + Retransmit(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) { + this.unacked = unacked; + this.sent = sent; + this.sock = sock; + this.log = Logger.getLogger(getClass().getName()); + } + + @Override + public void run() { + try { + for (;;) { + Wait.forDuration(timeout); + + for (Message<T> msg : sent) { + if (msg.seq >= unacked.get()) { + retransmit(msg); + } + } + } + } catch (InterruptedException e) { + log.info("Interrupted."); + } + } + + private void retransmit(Message<T> msg) { + try { + DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); + sock.send(pkt); + log.info("Retransmitted " + msg); + } catch (IOException e) { + log.warning("Failed to retransmit " + msg + ": " + e.getMessage()); + } + } +} diff --git a/src/main/java/derms/net/tomulticast/Message.java b/src/main/java/derms/net/tomulticast/Message.java index 29d1ff7..a85e827 100644 --- a/src/main/java/derms/net/tomulticast/Message.java +++ b/src/main/java/derms/net/tomulticast/Message.java @@ -1,6 +1,6 @@ package derms.net.tomulticast; -import derms.net.rmulticast.MessagePayload; +import derms.net.MessagePayload; class Message<T extends MessagePayload> implements MessagePayload, Comparable<Message<T>> { long seq; // Sequence number. diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java index 06db127..aad0a0b 100644 --- a/src/main/java/derms/net/tomulticast/Receive.java +++ b/src/main/java/derms/net/tomulticast/Receive.java @@ -1,6 +1,6 @@ package derms.net.tomulticast; -import derms.net.rmulticast.MessagePayload; +import derms.net.MessagePayload; import java.io.IOException; import java.net.InetAddress; diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java index a6dc2a9..d2f7e61 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java @@ -1,6 +1,6 @@ package derms.net.tomulticast; -import derms.net.rmulticast.MessagePayload; +import derms.net.MessagePayload; import derms.net.rmulticast.ReliableMulticast; import java.io.IOException; diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java index 9690c38..83f2da5 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -1,6 +1,6 @@ package derms.net.tomulticast; -import derms.net.rmulticast.MessagePayload; +import derms.net.MessagePayload; import java.io.IOException; import java.net.InetAddress; diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java index 63498a6..0de0ad0 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java @@ -1,6 +1,6 @@ package derms.net.tomulticast; -import derms.net.rmulticast.MessagePayload; +import derms.net.MessagePayload; import java.io.IOException; import java.net.InetAddress; |