summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-22 12:02:08 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-22 12:02:08 -0500
commit43549dee0a8214c1bb02c874dcfd7ba37a8ef310 (patch)
tree12cd75a25518c6d0cad495e0c5eed65d996765b2 /src/main/java/derms
parent91967cbd407254358ab768e74ebcfda8d4a30bc8 (diff)
downloadsoen423-43549dee0a8214c1bb02c874dcfd7ba37a8ef310.zip
ReliableUnicastSender
Diffstat (limited to 'src/main/java/derms')
-rw-r--r--src/main/java/derms/net/ConcurrentDatagramSocket.java13
-rw-r--r--src/main/java/derms/net/Hashable.java (renamed from src/main/java/derms/net/rmulticast/Hashable.java)2
-rw-r--r--src/main/java/derms/net/MessagePayload.java (renamed from src/main/java/derms/net/rmulticast/MessagePayload.java)2
-rw-r--r--src/main/java/derms/net/rmulticast/Message.java2
-rw-r--r--src/main/java/derms/net/rmulticast/NullPayload.java2
-rw-r--r--src/main/java/derms/net/rmulticast/Prune.java1
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java1
-rw-r--r--src/main/java/derms/net/rmulticast/ReceivedSet.java2
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java1
-rw-r--r--src/main/java/derms/net/rmulticast/Retransmit.java1
-rw-r--r--src/main/java/derms/net/rmulticast/Timeout.java1
-rw-r--r--src/main/java/derms/net/runicast/Ack.java12
-rw-r--r--src/main/java/derms/net/runicast/Message.java33
-rw-r--r--src/main/java/derms/net/runicast/ReceiveAcks.java59
-rw-r--r--src/main/java/derms/net/runicast/ReliableUnicastReceiver.java4
-rw-r--r--src/main/java/derms/net/runicast/ReliableUnicastSender.java82
-rw-r--r--src/main/java/derms/net/runicast/Retransmit.java57
-rw-r--r--src/main/java/derms/net/tomulticast/Message.java2
-rw-r--r--src/main/java/derms/net/tomulticast/Receive.java2
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticast.java2
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java2
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java2
22 files changed, 274 insertions, 11 deletions
diff --git a/src/main/java/derms/net/ConcurrentDatagramSocket.java b/src/main/java/derms/net/ConcurrentDatagramSocket.java
index ee5dbb2..44d9186 100644
--- a/src/main/java/derms/net/ConcurrentDatagramSocket.java
+++ b/src/main/java/derms/net/ConcurrentDatagramSocket.java
@@ -1,10 +1,7 @@
package derms.net;
import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.SocketException;
+import java.net.*;
import java.time.Duration;
public class ConcurrentDatagramSocket {
@@ -22,6 +19,10 @@ public class ConcurrentDatagramSocket {
sock.connect(address, port);
}
+ public synchronized void connect(InetSocketAddress addr) {
+ connect(addr.getAddress(), addr.getPort());
+ }
+
public synchronized InetAddress getLocalAddress() {
return sock.getLocalAddress();
}
@@ -30,6 +31,10 @@ public class ConcurrentDatagramSocket {
return sock.getLocalPort();
}
+ public synchronized SocketAddress getRemoteSocketAddress() {
+ return sock.getRemoteSocketAddress();
+ }
+
public synchronized void receive(DatagramPacket p) throws IOException {
sock.receive(p);
}
diff --git a/src/main/java/derms/net/rmulticast/Hashable.java b/src/main/java/derms/net/Hashable.java
index f00f62b..addfcb4 100644
--- a/src/main/java/derms/net/rmulticast/Hashable.java
+++ b/src/main/java/derms/net/Hashable.java
@@ -1,4 +1,4 @@
-package derms.net.rmulticast;
+package derms.net;
public interface Hashable {
int hash();
diff --git a/src/main/java/derms/net/rmulticast/MessagePayload.java b/src/main/java/derms/net/MessagePayload.java
index ca36831..54f1966 100644
--- a/src/main/java/derms/net/rmulticast/MessagePayload.java
+++ b/src/main/java/derms/net/MessagePayload.java
@@ -1,4 +1,4 @@
-package derms.net.rmulticast;
+package derms.net;
import java.io.Serializable;
diff --git a/src/main/java/derms/net/rmulticast/Message.java b/src/main/java/derms/net/rmulticast/Message.java
index 21f8d19..281eb4f 100644
--- a/src/main/java/derms/net/rmulticast/Message.java
+++ b/src/main/java/derms/net/rmulticast/Message.java
@@ -1,5 +1,7 @@
package derms.net.rmulticast;
+import derms.net.MessagePayload;
+
import java.io.Serializable;
import java.net.InetAddress;
diff --git a/src/main/java/derms/net/rmulticast/NullPayload.java b/src/main/java/derms/net/rmulticast/NullPayload.java
index 4c818e4..e051d2f 100644
--- a/src/main/java/derms/net/rmulticast/NullPayload.java
+++ b/src/main/java/derms/net/rmulticast/NullPayload.java
@@ -1,5 +1,7 @@
package derms.net.rmulticast;
+import derms.net.MessagePayload;
+
class NullPayload implements MessagePayload {
@Override
public int hash() {
diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java
index 04f8423..77ab6a7 100644
--- a/src/main/java/derms/net/rmulticast/Prune.java
+++ b/src/main/java/derms/net/rmulticast/Prune.java
@@ -1,5 +1,6 @@
package derms.net.rmulticast;
+import derms.net.MessagePayload;
import derms.util.Wait;
import java.net.InetAddress;
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index 3d25e75..c9e1acf 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -1,6 +1,7 @@
package derms.net.rmulticast;
import derms.net.ConcurrentMulticastSocket;
+import derms.net.MessagePayload;
import derms.net.Packet;
import java.io.IOException;
diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java
index 5bd3d85..47a3791 100644
--- a/src/main/java/derms/net/rmulticast/ReceivedSet.java
+++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java
@@ -1,5 +1,7 @@
package derms.net.rmulticast;
+import derms.net.MessagePayload;
+
import java.net.InetAddress;
import java.time.Instant;
import java.util.*;
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index 6edab5d..c201e3a 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -1,6 +1,7 @@
package derms.net.rmulticast;
import derms.net.ConcurrentMulticastSocket;
+import derms.net.MessagePayload;
import derms.net.Packet;
import java.io.IOException;
diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java
index 6205697..9d5e10b 100644
--- a/src/main/java/derms/net/rmulticast/Retransmit.java
+++ b/src/main/java/derms/net/rmulticast/Retransmit.java
@@ -1,6 +1,7 @@
package derms.net.rmulticast;
import derms.net.ConcurrentMulticastSocket;
+import derms.net.MessagePayload;
import derms.net.Packet;
import java.net.DatagramPacket;
diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java
index bfae630..5c47e0c 100644
--- a/src/main/java/derms/net/rmulticast/Timeout.java
+++ b/src/main/java/derms/net/rmulticast/Timeout.java
@@ -1,5 +1,6 @@
package derms.net.rmulticast;
+import derms.net.MessagePayload;
import derms.util.Wait;
import java.time.Duration;
diff --git a/src/main/java/derms/net/runicast/Ack.java b/src/main/java/derms/net/runicast/Ack.java
new file mode 100644
index 0000000..df2211c
--- /dev/null
+++ b/src/main/java/derms/net/runicast/Ack.java
@@ -0,0 +1,12 @@
+package derms.net.runicast;
+
+import java.io.Serializable;
+
+/** A message acknowledgement. */
+class Ack implements Serializable {
+ final long seq; // The sequence number of the acknowledged message.
+
+ Ack(long seq) {
+ this.seq = seq;
+ }
+}
diff --git a/src/main/java/derms/net/runicast/Message.java b/src/main/java/derms/net/runicast/Message.java
new file mode 100644
index 0000000..802cdb5
--- /dev/null
+++ b/src/main/java/derms/net/runicast/Message.java
@@ -0,0 +1,33 @@
+package derms.net.runicast;
+
+import derms.net.MessagePayload;
+
+import java.io.Serializable;
+
+class Message<T extends MessagePayload> implements Serializable {
+ final long seq; // Sequence number.
+ final T payload;
+
+ Message(long seq, T payload) {
+ this.seq = seq;
+ this.payload = payload;
+ }
+
+ @Override
+ public int hashCode() { return (int) seq * payload.hash(); }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+ if (obj.getClass() != this.getClass())
+ return false;
+ Message<?> other = (Message<?>) obj;
+ if (other.payload.getClass() != this.payload.getClass())
+ return false;
+ return other.seq == this.seq && other.payload.equals(this.payload);
+ }
+
+ @Override
+ public String toString() { return getClass().getSimpleName() + "{" + seq + ", " + payload + "}"; }
+}
diff --git a/src/main/java/derms/net/runicast/ReceiveAcks.java b/src/main/java/derms/net/runicast/ReceiveAcks.java
new file mode 100644
index 0000000..0f585ff
--- /dev/null
+++ b/src/main/java/derms/net/runicast/ReceiveAcks.java
@@ -0,0 +1,59 @@
+package derms.net.runicast;
+
+import derms.net.ConcurrentDatagramSocket;
+import derms.net.MessagePayload;
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.SocketTimeoutException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/** Receive acknowledgements. Remove messages from the sent queue once they are acknowledged. */
+class ReceiveAcks<T extends MessagePayload> implements Runnable {
+ private static final int bufSize = 8192;
+
+ private final AtomicLong unacked;
+ private final Queue<Message<T>> sent;
+ private final ConcurrentDatagramSocket sock;
+ private final Logger log;
+
+ ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) {
+ this.unacked = unacked;
+ this.sent = sent;
+ this.sock = sock;
+ this.log = Logger.getLogger(getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
+ for (;;) {
+ try {
+ sock.receive(pkt);
+ Ack ack = Packet.decode(pkt, Ack.class);
+ recvAck(ack.seq);
+ } catch (SocketTimeoutException e) {
+ if (Thread.interrupted()) {
+ log.info("Interrupted.");
+ return;
+ }
+ } catch (IOException | ClassNotFoundException | ClassCastException e) {
+ log.warning(e.getMessage());
+ }
+ }
+ }
+
+ private void recvAck(long ack) {
+ unacked.updateAndGet((unacked) -> {
+ if (ack >= unacked)
+ return ack+1;
+ return unacked;
+ });
+
+ while (!sent.isEmpty() && sent.peek().seq <= ack)
+ sent.remove();
+ }
+}
diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
new file mode 100644
index 0000000..3d63a6f
--- /dev/null
+++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
@@ -0,0 +1,4 @@
+package derms.net.runicast;
+
+public class ReliableUnicastReceiver {
+}
diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java
new file mode 100644
index 0000000..2a0943a
--- /dev/null
+++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java
@@ -0,0 +1,82 @@
+package derms.net.runicast;
+
+import derms.net.ConcurrentDatagramSocket;
+import derms.net.MessagePayload;
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.time.Duration;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+public class ReliableUnicastSender<T extends MessagePayload> {
+ private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout.
+ private static final Duration terminationTimeout = Duration.ofSeconds(1);
+
+ private final AtomicLong next; // Next sequence number.
+ private final AtomicLong unacked; // Sequence number of first unacknowledged message.
+ private final Queue<Message<T>> sent;
+ private final ConcurrentDatagramSocket sock;
+ private final Logger log;
+ private final ExecutorService pool;
+
+ /**
+ * @param raddr Remote IP address to connect to.
+ */
+ ReliableUnicastSender(InetSocketAddress raddr) throws IOException {
+ this.next = new AtomicLong(0);
+ this.unacked = new AtomicLong(0);
+ this.sent = new LinkedBlockingQueue<Message<T>>();
+ this.sock = new ConcurrentDatagramSocket();
+ this.sock.connect(raddr);
+ this.sock.setSoTimeout(soTimeout);
+ this.log = Logger.getLogger(getClass().getName());
+ this.pool = Executors.newCachedThreadPool();
+ pool.execute(new ReceiveAcks<T>(unacked, sent, sock));
+ pool.execute(new Retransmit<T>(unacked, sent, sock));
+ }
+
+ public void send(T payload) throws IOException {
+ Message<T> msg = new Message<T>(next.get(), payload);
+ DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress());
+ sock.send(pkt);
+ sent.add(msg);
+ next.incrementAndGet();
+ }
+
+ /** Wait for all messages to be acknowledged and close the connection. */
+ public void close() throws InterruptedException {
+ // Wait for receiver to acknowledge all sent messages.
+ while (unacked.get() < next.get()) {
+ Thread.yield();
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ }
+
+ closeNow();
+ }
+
+ /** Close the connection immediately, without waiting for acknowledgements. */
+ public void closeNow() {
+ log.info("Shutting down.");
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ log.warning("Thread pool did not terminate after " + terminationTimeout);
+ } catch (InterruptedException e) {
+ log.warning("Interrupted while terminating thread pool: " + e.getMessage());
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+
+ sock.close();
+ }
+}
diff --git a/src/main/java/derms/net/runicast/Retransmit.java b/src/main/java/derms/net/runicast/Retransmit.java
new file mode 100644
index 0000000..affd00c
--- /dev/null
+++ b/src/main/java/derms/net/runicast/Retransmit.java
@@ -0,0 +1,57 @@
+package derms.net.runicast;
+
+import derms.net.ConcurrentDatagramSocket;
+import derms.net.MessagePayload;
+import derms.net.Packet;
+import derms.util.Wait;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.time.Duration;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/** Retransmit unacknowledged messages. */
+class Retransmit<T extends MessagePayload> implements Runnable {
+ private static final Duration timeout = Duration.ofMillis(500);
+
+ private final AtomicLong unacked;
+ private final Queue<Message<T>> sent;
+ private final ConcurrentDatagramSocket sock;
+ private final Logger log;
+
+ Retransmit(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) {
+ this.unacked = unacked;
+ this.sent = sent;
+ this.sock = sock;
+ this.log = Logger.getLogger(getClass().getName());
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ Wait.forDuration(timeout);
+
+ for (Message<T> msg : sent) {
+ if (msg.seq >= unacked.get()) {
+ retransmit(msg);
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ log.info("Interrupted.");
+ }
+ }
+
+ private void retransmit(Message<T> msg) {
+ try {
+ DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress());
+ sock.send(pkt);
+ log.info("Retransmitted " + msg);
+ } catch (IOException e) {
+ log.warning("Failed to retransmit " + msg + ": " + e.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/derms/net/tomulticast/Message.java b/src/main/java/derms/net/tomulticast/Message.java
index 29d1ff7..a85e827 100644
--- a/src/main/java/derms/net/tomulticast/Message.java
+++ b/src/main/java/derms/net/tomulticast/Message.java
@@ -1,6 +1,6 @@
package derms.net.tomulticast;
-import derms.net.rmulticast.MessagePayload;
+import derms.net.MessagePayload;
class Message<T extends MessagePayload> implements MessagePayload, Comparable<Message<T>> {
long seq; // Sequence number.
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java
index 06db127..aad0a0b 100644
--- a/src/main/java/derms/net/tomulticast/Receive.java
+++ b/src/main/java/derms/net/tomulticast/Receive.java
@@ -1,6 +1,6 @@
package derms.net.tomulticast;
-import derms.net.rmulticast.MessagePayload;
+import derms.net.MessagePayload;
import java.io.IOException;
import java.net.InetAddress;
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
index a6dc2a9..d2f7e61 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
@@ -1,6 +1,6 @@
package derms.net.tomulticast;
-import derms.net.rmulticast.MessagePayload;
+import derms.net.MessagePayload;
import derms.net.rmulticast.ReliableMulticast;
import java.io.IOException;
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
index 9690c38..83f2da5 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
@@ -1,6 +1,6 @@
package derms.net.tomulticast;
-import derms.net.rmulticast.MessagePayload;
+import derms.net.MessagePayload;
import java.io.IOException;
import java.net.InetAddress;
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java
index 63498a6..0de0ad0 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastSender.java
@@ -1,6 +1,6 @@
package derms.net.tomulticast;
-import derms.net.rmulticast.MessagePayload;
+import derms.net.MessagePayload;
import java.io.IOException;
import java.net.InetAddress;