From a9c2ee3c3c4a8791529b9ea448ed96d9aa7b03da Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Mon, 18 Nov 2024 13:39:06 -0500 Subject: il: connect() --- src/main/java/derms/net/Packet.java | 6 ++ src/main/java/derms/net/il/Connection.java | 115 +++++++++++++++++++++++++ src/main/java/derms/net/il/ControlMessage.java | 15 ++++ src/main/java/derms/net/il/IL.java | 37 ++++++++ src/main/java/derms/net/il/State.java | 5 ++ src/main/java/derms/net/il/Type.java | 5 ++ 6 files changed, 183 insertions(+) create mode 100644 src/main/java/derms/net/il/Connection.java create mode 100644 src/main/java/derms/net/il/ControlMessage.java create mode 100644 src/main/java/derms/net/il/IL.java create mode 100644 src/main/java/derms/net/il/State.java create mode 100644 src/main/java/derms/net/il/Type.java diff --git a/src/main/java/derms/net/Packet.java b/src/main/java/derms/net/Packet.java index 68416d7..9e9ba08 100644 --- a/src/main/java/derms/net/Packet.java +++ b/src/main/java/derms/net/Packet.java @@ -2,6 +2,8 @@ package derms.net; import java.io.*; import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.SocketAddress; public class Packet { @@ -15,6 +17,10 @@ public class Packet { return new DatagramPacket(buf, buf.length, dst); } + public static DatagramPacket encode(Serializable obj, InetAddress dstAddr, int dstPort) throws IOException { + return encode(obj, new InetSocketAddress(dstAddr, dstPort)); + } + public static T decode(DatagramPacket pkt, Class clazz) throws IOException, ClassNotFoundException, ClassCastException { ObjectInputStream objStream = new ObjectInputStream( new ByteArrayInputStream(pkt.getData())); diff --git a/src/main/java/derms/net/il/Connection.java b/src/main/java/derms/net/il/Connection.java new file mode 100644 index 0000000..f3bf0d0 --- /dev/null +++ b/src/main/java/derms/net/il/Connection.java @@ -0,0 +1,115 @@ +package derms.net.il; + +import derms.net.Packet; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +public class Connection implements Runnable { + private static final Random rng = new Random(); + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + AtomicReference state; + final InetAddress laddr; // Local IP address. + final int lport; // Local IL port. + final InetAddress raddr; // Remote IP address. + final int rport; // Remote IL port. + final int id0; // Starting sequence number of the local side. + final int rid0; // Starting sequence number of the remote side. + int next; // Sequence number of the next message to be sent from the local side. + int rcvd; // The last in-sequence message received from the remote side. + int unacked; // Sequence number of the first unacked message. + DatagramSocket sock; + private final Logger log; + private final ExecutorService pool; + + Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, DatagramSocket sock) { + this.state = new AtomicReference(State.closed); + this.laddr = laddr; + this.lport = lport; + this.raddr = raddr; + this.rport = rport; + this.id0 = rng.nextInt(); + this.next = this.id0; + this.unacked = this.id0; + this.sock = sock; + this.log = Logger.getLogger(this.getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + } + + public void close() { + state.set(State.closed); + + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + + sock.close(); + log.info("Finished shutting down."); + } + + void start() { + pool.execute(this); + } + + void sendCtl(Type type, int id, int ack) throws IOException { + ControlMessage msg = new ControlMessage(type, id, ack); + DatagramPacket pkt = Packet.encode(msg, raddr, rport); + sock.send(pkt); + } + + @Override + public void run() { + for (;;) { + if (Thread.interrupted()) + return; + + switch (state.get()) { + case closed: + return; + + case syncer: + // TODO + continue; + + case syncee: + // TODO + continue; + + case established: + // TODO + continue; + + case listening: + // TODO + continue; + + case closing: + // TODO + continue; + + case opening: + // TODO + continue; + + default: + throw new IllegalStateException("illegal connection state: " + state); + } + } + } +} diff --git a/src/main/java/derms/net/il/ControlMessage.java b/src/main/java/derms/net/il/ControlMessage.java new file mode 100644 index 0000000..de69fe8 --- /dev/null +++ b/src/main/java/derms/net/il/ControlMessage.java @@ -0,0 +1,15 @@ +package derms.net.il; + +import java.io.Serializable; + +class ControlMessage implements Serializable { + final Type type; + final int id; // One greater than the sequence number of the highest sent data message. + final int ack; // Last in-sequence data message received by the transmitter of the message. + + ControlMessage(Type type, int id, int ack) { + this.type = type; + this.id = id; + this.ack = ack; + } +} diff --git a/src/main/java/derms/net/il/IL.java b/src/main/java/derms/net/il/IL.java new file mode 100644 index 0000000..56c8b40 --- /dev/null +++ b/src/main/java/derms/net/il/IL.java @@ -0,0 +1,37 @@ +package derms.net.il; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.InetAddress; + +public class IL { + public static Connection listen(InetAddress laddr, int lport) { + // TODO + } + + public static Connection connect(InetAddress raddr, int rport) throws IOException { + DatagramSocket sock = new DatagramSocket(); + sock.connect(raddr, rport); + InetAddress laddr = sock.getLocalAddress(); + int lport = sock.getLocalPort(); + Connection conn = new Connection(laddr, lport, raddr, rport, sock); + + conn.sendCtl(Type.sync, conn.id0, 0); + conn.state.set(State.syncer); + conn.start(); + while (conn.state.get() == State.syncer) + Thread.yield(); + + State state = conn.state.get(); + switch (state) { + case established: + return conn; + case closed: + conn.close(); + throw new IOException("failed to connect to " + raddr + ":" + rport); + default: + conn.close(); + throw new IllegalStateException("illegal connection state: " + state); + } + } +} diff --git a/src/main/java/derms/net/il/State.java b/src/main/java/derms/net/il/State.java new file mode 100644 index 0000000..46ffd2d --- /dev/null +++ b/src/main/java/derms/net/il/State.java @@ -0,0 +1,5 @@ +package derms.net.il; + +public enum State { + closed, syncer, syncee, established, listening, closing, opening +} diff --git a/src/main/java/derms/net/il/Type.java b/src/main/java/derms/net/il/Type.java new file mode 100644 index 0000000..d067740 --- /dev/null +++ b/src/main/java/derms/net/il/Type.java @@ -0,0 +1,5 @@ +package derms.net.il; + +public enum Type { + sync, data, dataquery, ack, query, state, close +} -- cgit v1.2.3