summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/runicast/Connection.java
blob: 4824c2a05f35b80fb570d8fb9318ee64755f01c6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package derms.net.runicast;

import derms.net.ConcurrentDatagramSocket;
import derms.net.Packet;

import java.io.IOException;
import java.net.*;
import java.time.Duration;
import java.time.Instant;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

public class Connection implements Runnable {
    private static final Random rng = new Random();
    private static final Duration roundTrip = Duration.ofMillis(100); // TODO: measure round trip time.
    private static final Duration rexitTimeout = roundTrip.multipliedBy(4);
    private static final Duration deathTimeout = roundTrip.multipliedBy(300);
    private static final int bufSize = 8192;
    private static final Duration terminationTimeout = Duration.ofSeconds(1);

    AtomicReference<State> state;
    final InetAddress laddr; // Local IP address.
    final int lport; // Local IL port.
    final InetAddress raddr; // Remote IP address.
    final int rport; // Remote IL port.
    final int id0; // Starting sequence number of the local side.
    final int rid0; // Starting sequence number of the remote side.
    int next; // Sequence number of the next message to be sent from the local side.
    int rcvd; // The last in-sequence message received from the remote side.
    int unacked; // Sequence number of the first unacked message.
    ConcurrentDatagramSocket sock;
    private final Logger log;
    private final ExecutorService pool;

    Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, ConcurrentDatagramSocket sock) {
        this.state = new AtomicReference<State>(State.closed);
        this.laddr = laddr;
        this.lport = lport;
        this.raddr = raddr;
        this.rport = rport;
        this.id0 = rng.nextInt();
        this.next = this.id0;
        this.unacked = this.id0;
        this.sock = sock;
        this.log = Logger.getLogger(this.getClass().getName());
        this.pool = Executors.newCachedThreadPool();
    }

    public void close() {
        state.set(State.closed);

        pool.shutdownNow();
        try {
            if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
                log.warning("Thread pool did not terminate after " + terminationTimeout);
        } catch (InterruptedException e) {
            log.warning("Interrupted while terminating thread pool: " + e.getMessage());
            // Preserve interrupt status.
            Thread.currentThread().interrupt();
        }

        sock.close();
        log.info("Finished shutting down.");
    }

    void start() {
        pool.execute(this);
    }

    @Override
    public void run() {
        for (;;) {
            if (Thread.interrupted())
                return;

            try {
                switch (state.get()) {
                    case closed:
                        return;
                    case syncer:
                        syncer();
                        continue;
                    case syncee:
                        // TODO
                        continue;
                    case established:
                        // TODO
                        continue;
                    case listening:
                        // TODO
                        continue;
                    case closing:
                        // TODO
                        continue;
                    case opening:
                        // TODO
                        continue;
                    default:
                        throw new IllegalStateException("illegal connection state: " + state);
                }
            } catch (IOException | ClassNotFoundException e) {
                log.warning(e.getMessage());
            }
        }
    }

    private void syncer() throws IOException, ClassNotFoundException {
        Instant tstart = Instant.now();
        for (;;) {
            try {
                sendCtl(Type.sync, id0, 0);

                ControlMessage msg = recvCtl(rexitTimeout);
                if (msg.type == Type.ack && msg.ack != id0
                        || msg.type == Type.close && msg.ack == id0) {
                    state.set(State.closed);
                    return;
                } else if (msg.type == Type.sync && msg.ack == id0) {
                    state.set(State.established);
                    return;
                }
            } catch (SocketTimeoutException e) {
                Duration elapsed = Duration.between(tstart, Instant.now());
                if (elapsed.compareTo(deathTimeout) > 0) {
                    state.set(State.closed);
                    return;
                }
            }
        }
    }

    private void sendCtl(Type type, int id, int ack) throws IOException {
        ControlMessage msg = new ControlMessage(type, id, ack);
        DatagramPacket pkt = Packet.encode(msg, raddr, rport);
        sock.send(pkt);
    }

    private ControlMessage recvCtl(Duration timeout) throws IOException, SocketTimeoutException, ClassNotFoundException {
        DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize);
        sock.setSoTimeout(timeout);
        sock.receive(pkt);
        return Packet.decode(pkt, ControlMessage.class);
    }
}