blob: 1721025334dee5c41f994dac293d583a3a0cc600 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
type Message[T] struct {
seq int
data T
}
type Ack int
class sender[T] {
const ackTimeout = 500ms
var (
next := 0 // Next sequence number.
unacked := 0 // Sequence number of first unacked message.
sent Queue[Message[T]]
sock Socket
)
func send(data) {
msg := Message{next, data}
sock.send(msg)
sent.add(msg)
next++
}
thread retransmit() {
wait(ackTimeout)
if unacked < next {
forall msg in sent s.t. msg.seq >= unacked {
sock.send(msg)
}
}
}
thread recvAcks() {
ack := sock.receive()
if ack >= unacked {
unacked = ack+1
}
while sent.peek().seq <= ack {
sent.pop()
}
}
}
class receiver[T] {
var (
seq := 0 // Next sequence number.
sock Socket
delivered Queue[T]
)
thread recv() {
msg := sock.receive()
if msg.seq == seq {
delivered.add(msg.data)
sock.send(Ack(seq))
seq++
}
}
func deliver() T {
return delivered.pop()
}
}
|