blob: 68d7355d7414bef03dc95fdecf3f989f93f8e140 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
package derms.net.runicast;
import derms.net.Packet;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
public class Connection implements Runnable {
private static final Random rng = new Random();
private static final Duration terminationTimeout = Duration.ofSeconds(1);
AtomicReference<State> state;
final InetAddress laddr; // Local IP address.
final int lport; // Local IL port.
final InetAddress raddr; // Remote IP address.
final int rport; // Remote IL port.
final int id0; // Starting sequence number of the local side.
final int rid0; // Starting sequence number of the remote side.
int next; // Sequence number of the next message to be sent from the local side.
int rcvd; // The last in-sequence message received from the remote side.
int unacked; // Sequence number of the first unacked message.
DatagramSocket sock;
private final Logger log;
private final ExecutorService pool;
Connection(InetAddress laddr, int lport, InetAddress raddr, int rport, DatagramSocket sock) {
this.state = new AtomicReference<State>(State.closed);
this.laddr = laddr;
this.lport = lport;
this.raddr = raddr;
this.rport = rport;
this.id0 = rng.nextInt();
this.next = this.id0;
this.unacked = this.id0;
this.sock = sock;
this.log = Logger.getLogger(this.getClass().getName());
this.pool = Executors.newCachedThreadPool();
}
public void close() {
state.set(State.closed);
pool.shutdownNow();
try {
if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
log.warning("Thread pool did not terminate after " + terminationTimeout);
} catch (InterruptedException e) {
log.warning("Interrupted while terminating thread pool: " + e.getMessage());
// Preserve interrupt status.
Thread.currentThread().interrupt();
}
sock.close();
log.info("Finished shutting down.");
}
void start() {
pool.execute(this);
}
void sendCtl(Type type, int id, int ack) throws IOException {
ControlMessage msg = new ControlMessage(type, id, ack);
DatagramPacket pkt = Packet.encode(msg, raddr, rport);
sock.send(pkt);
}
@Override
public void run() {
for (;;) {
if (Thread.interrupted())
return;
switch (state.get()) {
case closed:
return;
case syncer:
// TODO
continue;
case syncee:
// TODO
continue;
case established:
// TODO
continue;
case listening:
// TODO
continue;
case closing:
// TODO
continue;
case opening:
// TODO
continue;
default:
throw new IllegalStateException("illegal connection state: " + state);
}
}
}
}
|