summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/derms/net/Packet.java6
-rw-r--r--src/main/java/derms/net/il/Connection.java115
-rw-r--r--src/main/java/derms/net/il/ControlMessage.java15
-rw-r--r--src/main/java/derms/net/il/IL.java37
-rw-r--r--src/main/java/derms/net/il/State.java5
-rw-r--r--src/main/java/derms/net/il/Type.java5
6 files changed, 183 insertions, 0 deletions
diff --git a/src/main/java/derms/net/Packet.java b/src/main/java/derms/net/Packet.java
index 68416d7..9e9ba08 100644
--- a/src/main/java/derms/net/Packet.java
+++ b/src/main/java/derms/net/Packet.java
@@ -2,6 +2,8 @@ package derms.net;
import java.io.*;
import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class Packet {
@@ -15,6 +17,10 @@ public class Packet {
return new DatagramPacket(buf, buf.length, dst);
}
+ public static DatagramPacket encode(Serializable obj, InetAddress dstAddr, int dstPort) throws IOException {
+ return encode(obj, new InetSocketAddress(dstAddr, dstPort));
+ }
+
public static <T extends Serializable> T decode(DatagramPacket pkt, Class<T> clazz) throws IOException, ClassNotFoundException, ClassCastException {
ObjectInputStream objStream = new ObjectInputStream(
new ByteArrayInputStream(pkt.getData()));
diff --git a/src/main/java/derms/net/il/Connection.java b/src/main/java/derms/net/il/Connection.java
new file mode 100644
index 0000000..f3bf0d0
--- /dev/null
+++ b/src/main/java/derms/net/il/Connection.java
@@ -0,0 +1,115 @@
+package derms.net.il;
+
+import derms.net.Packet;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+public class Connection implements Runnable {
+ private static final Random rng = new Random();
+ private static final Duration terminationTimeout = Duration.ofSeconds(1);
+
+ AtomicReference<State> state;
+ final InetAddress laddr; // Local IP address.
+ final int lport; // Local IL port.
+ final InetAddress raddr; // Remote IP address.
+ final int rport; // Remote IL port.
+ final int id0; // Starting sequence number of the local side.
+ final int rid0; // Starting sequence number of the remote side.
+ int next; // Sequence number of the next message to be sent from the local side.
+ int rcvd; // The last in-sequence message received from the remote side.
+ int unacked; // Sequence number of the first unacked message.
+ DatagramSocket sock;
+ private final Logger log;
+ private final ExecutorService pool;
+
+ Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, DatagramSocket sock) {
+ this.state = new AtomicReference<State>(State.closed);
+ this.laddr = laddr;
+ this.lport = lport;
+ this.raddr = raddr;
+ this.rport = rport;
+ this.id0 = rng.nextInt();
+ this.next = this.id0;
+ this.unacked = this.id0;
+ this.sock = sock;
+ this.log = Logger.getLogger(this.getClass().getName());
+ this.pool = Executors.newCachedThreadPool();
+ }
+
+ public void close() {
+ state.set(State.closed);
+
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ log.warning("Thread pool did not terminate after " + terminationTimeout);
+ } catch (InterruptedException e) {
+ log.warning("Interrupted while terminating thread pool: " + e.getMessage());
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+
+ sock.close();
+ log.info("Finished shutting down.");
+ }
+
+ void start() {
+ pool.execute(this);
+ }
+
+ void sendCtl(Type type, int id, int ack) throws IOException {
+ ControlMessage msg = new ControlMessage(type, id, ack);
+ DatagramPacket pkt = Packet.encode(msg, raddr, rport);
+ sock.send(pkt);
+ }
+
+ @Override
+ public void run() {
+ for (;;) {
+ if (Thread.interrupted())
+ return;
+
+ switch (state.get()) {
+ case closed:
+ return;
+
+ case syncer:
+ // TODO
+ continue;
+
+ case syncee:
+ // TODO
+ continue;
+
+ case established:
+ // TODO
+ continue;
+
+ case listening:
+ // TODO
+ continue;
+
+ case closing:
+ // TODO
+ continue;
+
+ case opening:
+ // TODO
+ continue;
+
+ default:
+ throw new IllegalStateException("illegal connection state: " + state);
+ }
+ }
+ }
+}
diff --git a/src/main/java/derms/net/il/ControlMessage.java b/src/main/java/derms/net/il/ControlMessage.java
new file mode 100644
index 0000000..de69fe8
--- /dev/null
+++ b/src/main/java/derms/net/il/ControlMessage.java
@@ -0,0 +1,15 @@
+package derms.net.il;
+
+import java.io.Serializable;
+
+class ControlMessage implements Serializable {
+ final Type type;
+ final int id; // One greater than the sequence number of the highest sent data message.
+ final int ack; // Last in-sequence data message received by the transmitter of the message.
+
+ ControlMessage(Type type, int id, int ack) {
+ this.type = type;
+ this.id = id;
+ this.ack = ack;
+ }
+}
diff --git a/src/main/java/derms/net/il/IL.java b/src/main/java/derms/net/il/IL.java
new file mode 100644
index 0000000..56c8b40
--- /dev/null
+++ b/src/main/java/derms/net/il/IL.java
@@ -0,0 +1,37 @@
+package derms.net.il;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+
+public class IL {
+ public static Connection listen(InetAddress laddr, int lport) {
+ // TODO
+ }
+
+ public static Connection connect(InetAddress raddr, int rport) throws IOException {
+ DatagramSocket sock = new DatagramSocket();
+ sock.connect(raddr, rport);
+ InetAddress laddr = sock.getLocalAddress();
+ int lport = sock.getLocalPort();
+ Connection conn = new Connection(laddr, lport, raddr, rport, sock);
+
+ conn.sendCtl(Type.sync, conn.id0, 0);
+ conn.state.set(State.syncer);
+ conn.start();
+ while (conn.state.get() == State.syncer)
+ Thread.yield();
+
+ State state = conn.state.get();
+ switch (state) {
+ case established:
+ return conn;
+ case closed:
+ conn.close();
+ throw new IOException("failed to connect to " + raddr + ":" + rport);
+ default:
+ conn.close();
+ throw new IllegalStateException("illegal connection state: " + state);
+ }
+ }
+}
diff --git a/src/main/java/derms/net/il/State.java b/src/main/java/derms/net/il/State.java
new file mode 100644
index 0000000..46ffd2d
--- /dev/null
+++ b/src/main/java/derms/net/il/State.java
@@ -0,0 +1,5 @@
+package derms.net.il;
+
+public enum State {
+ closed, syncer, syncee, established, listening, closing, opening
+}
diff --git a/src/main/java/derms/net/il/Type.java b/src/main/java/derms/net/il/Type.java
new file mode 100644
index 0000000..d067740
--- /dev/null
+++ b/src/main/java/derms/net/il/Type.java
@@ -0,0 +1,5 @@
+package derms.net.il;
+
+public enum Type {
+ sync, data, dataquery, ack, query, state, close
+}