From 2331dd6b821fd5f97196f16185698c882affe4f3 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Tue, 19 Nov 2024 11:18:33 -0500 Subject: runicast: syncer state --- src/main/java/derms/net/runicast/Connection.java | 101 ++++++++++++++------- .../java/derms/net/runicast/ReliableUnicast.java | 4 +- 2 files changed, 71 insertions(+), 34 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/derms/net/runicast/Connection.java b/src/main/java/derms/net/runicast/Connection.java index 68d7355..034d043 100644 --- a/src/main/java/derms/net/runicast/Connection.java +++ b/src/main/java/derms/net/runicast/Connection.java @@ -1,12 +1,12 @@ package derms.net.runicast; +import derms.net.ConcurrentDatagramSocket; import derms.net.Packet; import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; +import java.net.*; import java.time.Duration; +import java.time.Instant; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -16,6 +16,10 @@ import java.util.logging.Logger; public class Connection implements Runnable { private static final Random rng = new Random(); + private static final Duration roundTrip = Duration.ofMillis(100); // TODO: measure round trip time. + private static final Duration rexitTimeout = roundTrip.multipliedBy(4); + private static final Duration deathTimeout = roundTrip.multipliedBy(300); + private static final int bufSize = 8192; private static final Duration terminationTimeout = Duration.ofSeconds(1); AtomicReference state; @@ -28,11 +32,11 @@ public class Connection implements Runnable { int next; // Sequence number of the next message to be sent from the local side. int rcvd; // The last in-sequence message received from the remote side. int unacked; // Sequence number of the first unacked message. - DatagramSocket sock; + ConcurrentDatagramSocket sock; private final Logger log; private final ExecutorService pool; - Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, DatagramSocket sock) { + Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, ConcurrentDatagramSocket sock) { this.state = new AtomicReference(State.closed); this.laddr = laddr; this.lport = lport; @@ -79,37 +83,68 @@ public class Connection implements Runnable { if (Thread.interrupted()) return; - switch (state.get()) { - case closed: - return; - - case syncer: - // TODO - continue; - - case syncee: - // TODO - continue; - - case established: - // TODO - continue; - - case listening: - // TODO - continue; - - case closing: - // TODO - continue; + try { + switch (state.get()) { + case closed: + return; + case syncer: + syncer(); + continue; + case syncee: + // TODO + continue; + case established: + // TODO + continue; + case listening: + // TODO + continue; + case closing: + // TODO + continue; + case opening: + // TODO + continue; + default: + throw new IllegalStateException("illegal connection state: " + state); + } + } catch (IOException | ClassNotFoundException e) { + log.warning(e.getMessage()); + } + } + } - case opening: - // TODO + private void syncer() throws IOException, ClassNotFoundException { + Instant tstart = Instant.now(); + for (;;) { + try { + ControlMessage msg = recvCtl(rexitTimeout); + if (msg.type == Type.ack && msg.ack != id0 + || msg.type == Type.close && msg.ack == id0) { + state.set(State.closed); + return; + } else if (msg.type == Type.sync && msg.ack == id0) { + state.set(State.established); + return; + } else { continue; - - default: - throw new IllegalStateException("illegal connection state: " + state); + } + } catch (SocketTimeoutException e) { + Duration elapsed = Duration.between(tstart, Instant.now()); + if (elapsed.compareTo(deathTimeout) > 0) { + state.set(State.closed); + return; + } + sendCtl(Type.sync, id0, 0); + continue; } } } + + private ControlMessage recvCtl(Duration timeout) throws IOException, SocketTimeoutException, ClassNotFoundException { + DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + sock.setSoTimeout(timeout); + sock.receive(pkt); + return Packet.decode(pkt, ControlMessage.class); + } } diff --git a/src/main/java/derms/net/runicast/ReliableUnicast.java b/src/main/java/derms/net/runicast/ReliableUnicast.java index 9e0296f..c7d587b 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicast.java +++ b/src/main/java/derms/net/runicast/ReliableUnicast.java @@ -1,5 +1,7 @@ package derms.net.runicast; +import derms.net.ConcurrentDatagramSocket; + import java.io.IOException; import java.net.DatagramSocket; import java.net.InetAddress; @@ -11,7 +13,7 @@ public class ReliableUnicast { } public static Connection connect(InetAddress raddr, int rport) throws IOException { - DatagramSocket sock = new DatagramSocket(); + ConcurrentDatagramSocket sock = new ConcurrentDatagramSocket(); sock.connect(raddr, rport); InetAddress laddr = sock.getLocalAddress(); int lport = sock.getLocalPort(); -- cgit v1.2.3