diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-18 13:43:05 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-18 13:43:05 -0500 |
| commit | f7fe7601083b7b0cb7bc420004ad574726bfe613 (patch) | |
| tree | 72773bc7dd2b27799d47162676c33fa19984e57f /src/main/java/derms/net/runicast | |
| parent | a9c2ee3c3c4a8791529b9ea448ed96d9aa7b03da (diff) | |
| download | soen423-f7fe7601083b7b0cb7bc420004ad574726bfe613.zip | |
rename il to runicast
Diffstat (limited to 'src/main/java/derms/net/runicast')
| -rw-r--r-- | src/main/java/derms/net/runicast/Connection.java | 115 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/ControlMessage.java | 15 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/ReliableUnicast.java | 38 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/State.java | 5 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/Type.java | 5 |
5 files changed, 178 insertions, 0 deletions
diff --git a/src/main/java/derms/net/runicast/Connection.java b/src/main/java/derms/net/runicast/Connection.java new file mode 100644 index 0000000..68d7355 --- /dev/null +++ b/src/main/java/derms/net/runicast/Connection.java @@ -0,0 +1,115 @@ +package derms.net.runicast; + +import derms.net.Packet; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +public class Connection implements Runnable { + private static final Random rng = new Random(); + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + AtomicReference<State> state; + final InetAddress laddr; // Local IP address. + final int lport; // Local IL port. + final InetAddress raddr; // Remote IP address. + final int rport; // Remote IL port. + final int id0; // Starting sequence number of the local side. + final int rid0; // Starting sequence number of the remote side. + int next; // Sequence number of the next message to be sent from the local side. + int rcvd; // The last in-sequence message received from the remote side. + int unacked; // Sequence number of the first unacked message. + DatagramSocket sock; + private final Logger log; + private final ExecutorService pool; + + Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, DatagramSocket sock) { + this.state = new AtomicReference<State>(State.closed); + this.laddr = laddr; + this.lport = lport; + this.raddr = raddr; + this.rport = rport; + this.id0 = rng.nextInt(); + this.next = this.id0; + this.unacked = this.id0; + this.sock = sock; + this.log = Logger.getLogger(this.getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + } + + public void close() { + state.set(State.closed); + + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + + sock.close(); + log.info("Finished shutting down."); + } + + void start() { + pool.execute(this); + } + + void sendCtl(Type type, int id, int ack) throws IOException { + ControlMessage msg = new ControlMessage(type, id, ack); + DatagramPacket pkt = Packet.encode(msg, raddr, rport); + sock.send(pkt); + } + + @Override + public void run() { + for (;;) { + if (Thread.interrupted()) + return; + + switch (state.get()) { + case closed: + return; + + case syncer: + // TODO + continue; + + case syncee: + // TODO + continue; + + case established: + // TODO + continue; + + case listening: + // TODO + continue; + + case closing: + // TODO + continue; + + case opening: + // TODO + continue; + + default: + throw new IllegalStateException("illegal connection state: " + state); + } + } + } +} diff --git a/src/main/java/derms/net/runicast/ControlMessage.java b/src/main/java/derms/net/runicast/ControlMessage.java new file mode 100644 index 0000000..b08ff98 --- /dev/null +++ b/src/main/java/derms/net/runicast/ControlMessage.java @@ -0,0 +1,15 @@ +package derms.net.runicast; + +import java.io.Serializable; + +class ControlMessage implements Serializable { + final Type type; + final int id; // One greater than the sequence number of the highest sent data message. + final int ack; // Last in-sequence data message received by the transmitter of the message. + + ControlMessage(Type type, int id, int ack) { + this.type = type; + this.id = id; + this.ack = ack; + } +} diff --git a/src/main/java/derms/net/runicast/ReliableUnicast.java b/src/main/java/derms/net/runicast/ReliableUnicast.java new file mode 100644 index 0000000..9e0296f --- /dev/null +++ b/src/main/java/derms/net/runicast/ReliableUnicast.java @@ -0,0 +1,38 @@ +package derms.net.runicast; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.InetAddress; + +/** TODO */ +public class ReliableUnicast { + public static Connection listen(InetAddress laddr, int lport) { + // TODO + } + + public static Connection connect(InetAddress raddr, int rport) throws IOException { + DatagramSocket sock = new DatagramSocket(); + sock.connect(raddr, rport); + InetAddress laddr = sock.getLocalAddress(); + int lport = sock.getLocalPort(); + Connection conn = new Connection(laddr, lport, raddr, rport, sock); + + conn.sendCtl(Type.sync, conn.id0, 0); + conn.state.set(State.syncer); + conn.start(); + while (conn.state.get() == State.syncer) + Thread.yield(); + + State state = conn.state.get(); + switch (state) { + case established: + return conn; + case closed: + conn.close(); + throw new IOException("failed to connect to " + raddr + ":" + rport); + default: + conn.close(); + throw new IllegalStateException("illegal connection state: " + state); + } + } +} diff --git a/src/main/java/derms/net/runicast/State.java b/src/main/java/derms/net/runicast/State.java new file mode 100644 index 0000000..5780ba3 --- /dev/null +++ b/src/main/java/derms/net/runicast/State.java @@ -0,0 +1,5 @@ +package derms.net.runicast; + +public enum State { + closed, syncer, syncee, established, listening, closing, opening +} diff --git a/src/main/java/derms/net/runicast/Type.java b/src/main/java/derms/net/runicast/Type.java new file mode 100644 index 0000000..b2d2943 --- /dev/null +++ b/src/main/java/derms/net/runicast/Type.java @@ -0,0 +1,5 @@ +package derms.net.runicast; + +public enum Type { + sync, data, dataquery, ack, query, state, close +} |