summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/runicast
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-22 12:02:08 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-22 12:02:08 -0500
commit43549dee0a8214c1bb02c874dcfd7ba37a8ef310 (patch)
tree12cd75a25518c6d0cad495e0c5eed65d996765b2 /src/main/java/derms/net/runicast
parent91967cbd407254358ab768e74ebcfda8d4a30bc8 (diff)
downloadsoen423-43549dee0a8214c1bb02c874dcfd7ba37a8ef310.zip
ReliableUnicastSender
Diffstat (limited to 'src/main/java/derms/net/runicast')
-rw-r--r--src/main/java/derms/net/runicast/Ack.java12
-rw-r--r--src/main/java/derms/net/runicast/Message.java33
-rw-r--r--src/main/java/derms/net/runicast/ReceiveAcks.java59
-rw-r--r--src/main/java/derms/net/runicast/ReliableUnicastReceiver.java4
-rw-r--r--src/main/java/derms/net/runicast/ReliableUnicastSender.java82
-rw-r--r--src/main/java/derms/net/runicast/Retransmit.java57
6 files changed, 247 insertions, 0 deletions
diff --git a/src/main/java/derms/net/runicast/Ack.java b/src/main/java/derms/net/runicast/Ack.java
new file mode 100644
index 0000000..df2211c
--- /dev/null
+++ b/src/main/java/derms/net/runicast/Ack.java
@@ -0,0 +1,12 @@
+package derms.net.runicast;
+
+import java.io.Serializable;
+
+/** A message acknowledgement. */
+class Ack implements Serializable {
+ final long seq; // The sequence number of the acknowledged message.
+
+ Ack(long seq) {
+ this.seq = seq;
+ }
+}
diff --git a/src/main/java/derms/net/runicast/Message.java b/src/main/java/derms/net/runicast/Message.java
new file mode 100644
index 0000000..802cdb5
--- /dev/null
+++ b/src/main/java/derms/net/runicast/Message.java
@@ -0,0 +1,33 @@
+package derms.net.runicast;
+
+import derms.net.MessagePayload;
+
+import java.io.Serializable;
+
+class Message<T extends MessagePayload> implements Serializable {
+ final long seq; // Sequence number.
+ final T payload;
+
+ Message(long seq, T payload) {
+ this.seq = seq;
+ this.payload = payload;
+ }
+
+ @Override
+ public int hashCode() { return (int) seq * payload.hash(); }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+ if (obj.getClass() != this.getClass())
+ return false;
+ Message<?> other = (Message<?>) obj;
+ if (other.payload.getClass() != this.payload.getClass())
+ return false;
+ return other.seq == this.seq && other.payload.equals(this.payload);
+ }
+
+ @Override
+ public String toString() { return getClass().getSimpleName() + "{" + seq + ", " + payload + "}"; }
+}
diff --git a/src/main/java/derms/net/runicast/ReceiveAcks.java b/src/main/java/derms/net/runicast/ReceiveAcks.java
new file mode 100644
index 0000000..0f585ff
--- /dev/null
+++ b/src/main/java/derms/net/runicast/ReceiveAcks.java
@@ -0,0 +1,59 @@
+package derms.net.runicast;
+
+import derms.net.ConcurrentDatagramSocket;
+import derms.net.MessagePayload;
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.SocketTimeoutException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/** Receive acknowledgements. Remove messages from the sent queue once they are acknowledged. */
+class ReceiveAcks<T extends MessagePayload> implements Runnable {
+ private static final int bufSize = 8192;
+
+ private final AtomicLong unacked;
+ private final Queue<Message<T>> sent;
+ private final ConcurrentDatagramSocket sock;
+ private final Logger log;
+
+ ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) {
+ this.unacked = unacked;
+ this.sent = sent;
+ this.sock = sock;
+ this.log = Logger.getLogger(getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
+ for (;;) {
+ try {
+ sock.receive(pkt);
+ Ack ack = Packet.decode(pkt, Ack.class);
+ recvAck(ack.seq);
+ } catch (SocketTimeoutException e) {
+ if (Thread.interrupted()) {
+ log.info("Interrupted.");
+ return;
+ }
+ } catch (IOException | ClassNotFoundException | ClassCastException e) {
+ log.warning(e.getMessage());
+ }
+ }
+ }
+
+ private void recvAck(long ack) {
+ unacked.updateAndGet((unacked) -> {
+ if (ack >= unacked)
+ return ack+1;
+ return unacked;
+ });
+
+ while (!sent.isEmpty() && sent.peek().seq <= ack)
+ sent.remove();
+ }
+}
diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
new file mode 100644
index 0000000..3d63a6f
--- /dev/null
+++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
@@ -0,0 +1,4 @@
+package derms.net.runicast;
+
+public class ReliableUnicastReceiver {
+}
diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java
new file mode 100644
index 0000000..2a0943a
--- /dev/null
+++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java
@@ -0,0 +1,82 @@
+package derms.net.runicast;
+
+import derms.net.ConcurrentDatagramSocket;
+import derms.net.MessagePayload;
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.time.Duration;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+public class ReliableUnicastSender<T extends MessagePayload> {
+ private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout.
+ private static final Duration terminationTimeout = Duration.ofSeconds(1);
+
+ private final AtomicLong next; // Next sequence number.
+ private final AtomicLong unacked; // Sequence number of first unacknowledged message.
+ private final Queue<Message<T>> sent;
+ private final ConcurrentDatagramSocket sock;
+ private final Logger log;
+ private final ExecutorService pool;
+
+ /**
+ * @param raddr Remote IP address to connect to.
+ */
+ ReliableUnicastSender(InetSocketAddress raddr) throws IOException {
+ this.next = new AtomicLong(0);
+ this.unacked = new AtomicLong(0);
+ this.sent = new LinkedBlockingQueue<Message<T>>();
+ this.sock = new ConcurrentDatagramSocket();
+ this.sock.connect(raddr);
+ this.sock.setSoTimeout(soTimeout);
+ this.log = Logger.getLogger(getClass().getName());
+ this.pool = Executors.newCachedThreadPool();
+ pool.execute(new ReceiveAcks<T>(unacked, sent, sock));
+ pool.execute(new Retransmit<T>(unacked, sent, sock));
+ }
+
+ public void send(T payload) throws IOException {
+ Message<T> msg = new Message<T>(next.get(), payload);
+ DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress());
+ sock.send(pkt);
+ sent.add(msg);
+ next.incrementAndGet();
+ }
+
+ /** Wait for all messages to be acknowledged and close the connection. */
+ public void close() throws InterruptedException {
+ // Wait for receiver to acknowledge all sent messages.
+ while (unacked.get() < next.get()) {
+ Thread.yield();
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ }
+
+ closeNow();
+ }
+
+ /** Close the connection immediately, without waiting for acknowledgements. */
+ public void closeNow() {
+ log.info("Shutting down.");
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ log.warning("Thread pool did not terminate after " + terminationTimeout);
+ } catch (InterruptedException e) {
+ log.warning("Interrupted while terminating thread pool: " + e.getMessage());
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+
+ sock.close();
+ }
+}
diff --git a/src/main/java/derms/net/runicast/Retransmit.java b/src/main/java/derms/net/runicast/Retransmit.java
new file mode 100644
index 0000000..affd00c
--- /dev/null
+++ b/src/main/java/derms/net/runicast/Retransmit.java
@@ -0,0 +1,57 @@
+package derms.net.runicast;
+
+import derms.net.ConcurrentDatagramSocket;
+import derms.net.MessagePayload;
+import derms.net.Packet;
+import derms.util.Wait;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.time.Duration;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/** Retransmit unacknowledged messages. */
+class Retransmit<T extends MessagePayload> implements Runnable {
+ private static final Duration timeout = Duration.ofMillis(500);
+
+ private final AtomicLong unacked;
+ private final Queue<Message<T>> sent;
+ private final ConcurrentDatagramSocket sock;
+ private final Logger log;
+
+ Retransmit(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) {
+ this.unacked = unacked;
+ this.sent = sent;
+ this.sock = sock;
+ this.log = Logger.getLogger(getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ Wait.forDuration(timeout);
+
+ for (Message<T> msg : sent) {
+ if (msg.seq >= unacked.get()) {
+ retransmit(msg);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Interrupted.");
+ }
+ }
+
+ private void retransmit(Message<T> msg) {
+ try {
+ DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress());
+ sock.send(pkt);
+ log.info("Retransmitted " + msg);
+ } catch (IOException e) {
+ log.warning("Failed to retransmit " + msg + ": " + e.getMessage());
+ }
+ }
+}