summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/runicast/Connection.java
blob: 68d7355d7414bef03dc95fdecf3f989f93f8e140 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package derms.net.runicast;

import derms.net.Packet;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

public class Connection implements Runnable {
    private static final Random rng = new Random();
    private static final Duration terminationTimeout = Duration.ofSeconds(1);

    AtomicReference<State> state;
    final InetAddress laddr; // Local IP address.
    final int lport; // Local IL port.
    final InetAddress raddr; // Remote IP address.
    final int rport; // Remote IL port.
    final int id0; // Starting sequence number of the local side.
    final int rid0; // Starting sequence number of the remote side.
    int next; // Sequence number of the next message to be sent from the local side.
    int rcvd; // The last in-sequence message received from the remote side.
    int unacked; // Sequence number of the first unacked message.
    DatagramSocket sock;
    private final Logger log;
    private final ExecutorService pool;

    Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, DatagramSocket sock) {
        this.state = new AtomicReference<State>(State.closed);
        this.laddr = laddr;
        this.lport = lport;
        this.raddr = raddr;
        this.rport = rport;
        this.id0 = rng.nextInt();
        this.next = this.id0;
        this.unacked = this.id0;
        this.sock = sock;
        this.log = Logger.getLogger(this.getClass().getName());
        this.pool = Executors.newCachedThreadPool();
    }

    public void close() {
        state.set(State.closed);

        pool.shutdownNow();
        try {
            if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
                log.warning("Thread pool did not terminate after " + terminationTimeout);
        } catch (InterruptedException e) {
            log.warning("Interrupted while terminating thread pool: " + e.getMessage());
            // Preserve interrupt status.
            Thread.currentThread().interrupt();
        }

        sock.close();
        log.info("Finished shutting down.");
    }

    void start() {
        pool.execute(this);
    }

    void sendCtl(Type type, int id, int ack) throws IOException {
        ControlMessage msg = new ControlMessage(type, id, ack);
        DatagramPacket pkt = Packet.encode(msg, raddr, rport);
        sock.send(pkt);
    }

    @Override
    public void run() {
        for (;;) {
            if (Thread.interrupted())
                return;

            switch (state.get()) {
                case closed:
                    return;

                case syncer:
                    // TODO
                    continue;

                case syncee:
                    // TODO
                    continue;

                case established:
                    // TODO
                    continue;

                case listening:
                    // TODO
                    continue;

                case closing:
                    // TODO
                    continue;

                case opening:
                    // TODO
                    continue;

                default:
                    throw new IllegalStateException("illegal connection state: " + state);
            }
        }
    }
}