summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/derms/net/runicast/Connection.java101
-rw-r--r--src/main/java/derms/net/runicast/ReliableUnicast.java4
2 files changed, 71 insertions, 34 deletions
diff --git a/src/main/java/derms/net/runicast/Connection.java b/src/main/java/derms/net/runicast/Connection.java
index 68d7355..034d043 100644
--- a/src/main/java/derms/net/runicast/Connection.java
+++ b/src/main/java/derms/net/runicast/Connection.java
@@ -1,12 +1,12 @@
package derms.net.runicast;
+import derms.net.ConcurrentDatagramSocket;
import derms.net.Packet;
import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
+import java.net.*;
import java.time.Duration;
+import java.time.Instant;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -16,6 +16,10 @@ import java.util.logging.Logger;
public class Connection implements Runnable {
private static final Random rng = new Random();
+ private static final Duration roundTrip = Duration.ofMillis(100); // TODO: measure round trip time.
+ private static final Duration rexitTimeout = roundTrip.multipliedBy(4);
+ private static final Duration deathTimeout = roundTrip.multipliedBy(300);
+ private static final int bufSize = 8192;
private static final Duration terminationTimeout = Duration.ofSeconds(1);
AtomicReference<State> state;
@@ -28,11 +32,11 @@ public class Connection implements Runnable {
int next; // Sequence number of the next message to be sent from the local side.
int rcvd; // The last in-sequence message received from the remote side.
int unacked; // Sequence number of the first unacked message.
- DatagramSocket sock;
+ ConcurrentDatagramSocket sock;
private final Logger log;
private final ExecutorService pool;
- Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, DatagramSocket sock) {
+ Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, ConcurrentDatagramSocket sock) {
this.state = new AtomicReference<State>(State.closed);
this.laddr = laddr;
this.lport = lport;
@@ -79,37 +83,68 @@ public class Connection implements Runnable {
if (Thread.interrupted())
return;
- switch (state.get()) {
- case closed:
- return;
-
- case syncer:
- // TODO
- continue;
-
- case syncee:
- // TODO
- continue;
-
- case established:
- // TODO
- continue;
-
- case listening:
- // TODO
- continue;
-
- case closing:
- // TODO
- continue;
+ try {
+ switch (state.get()) {
+ case closed:
+ return;
+ case syncer:
+ syncer();
+ continue;
+ case syncee:
+ // TODO
+ continue;
+ case established:
+ // TODO
+ continue;
+ case listening:
+ // TODO
+ continue;
+ case closing:
+ // TODO
+ continue;
+ case opening:
+ // TODO
+ continue;
+ default:
+ throw new IllegalStateException("illegal connection state: " + state);
+ }
+ } catch (IOException | ClassNotFoundException e) {
+ log.warning(e.getMessage());
+ }
+ }
+ }
- case opening:
- // TODO
+ private void syncer() throws IOException, ClassNotFoundException {
+ Instant tstart = Instant.now();
+ for (;;) {
+ try {
+ ControlMessage msg = recvCtl(rexitTimeout);
+ if (msg.type == Type.ack && msg.ack != id0
+ || msg.type == Type.close && msg.ack == id0) {
+ state.set(State.closed);
+ return;
+ } else if (msg.type == Type.sync && msg.ack == id0) {
+ state.set(State.established);
+ return;
+ } else {
continue;
-
- default:
- throw new IllegalStateException("illegal connection state: " + state);
+ }
+ } catch (SocketTimeoutException e) {
+ Duration elapsed = Duration.between(tstart, Instant.now());
+ if (elapsed.compareTo(deathTimeout) > 0) {
+ state.set(State.closed);
+ return;
+ }
+ sendCtl(Type.sync, id0, 0);
+ continue;
}
}
}
+
+ private ControlMessage recvCtl(Duration timeout) throws IOException, SocketTimeoutException, ClassNotFoundException {
+ DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
+ sock.setSoTimeout(timeout);
+ sock.receive(pkt);
+ return Packet.decode(pkt, ControlMessage.class);
+ }
}
diff --git a/src/main/java/derms/net/runicast/ReliableUnicast.java b/src/main/java/derms/net/runicast/ReliableUnicast.java
index 9e0296f..c7d587b 100644
--- a/src/main/java/derms/net/runicast/ReliableUnicast.java
+++ b/src/main/java/derms/net/runicast/ReliableUnicast.java
@@ -1,5 +1,7 @@
package derms.net.runicast;
+import derms.net.ConcurrentDatagramSocket;
+
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
@@ -11,7 +13,7 @@ public class ReliableUnicast {
}
public static Connection connect(InetAddress raddr, int rport) throws IOException {
- DatagramSocket sock = new DatagramSocket();
+ ConcurrentDatagramSocket sock = new ConcurrentDatagramSocket();
sock.connect(raddr, rport);
InetAddress laddr = sock.getLocalAddress();
int lport = sock.getLocalPort();