1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
// Pseudocode of the Trans protocol described in "Broadcast Protocols for Distributed
// Systems", Melliar-Smith, P., Moser, L., and Agrawala, V. (1990) in "IEEE Transactions
// on Parallel and Distributed Systems vol. 1, no. 1.
// message
type m struct {
id mid
sender pid
positiveAcks []mid
negativeAcks []mid
data
}
type mid "message ID"
type pid "process ID"
var (
positiveAcks []mid
negativeAcks []mid
received []m
retransmissions []mid
lastSend time
)
send(m) {
pkt := (m, positiveAcks, negativeAcks)
multicast(pkt)
positiveAcks = []
go timeout(m)
lastSend = now()
}
timeout(m) {
sleep until timeout
if m not in positiveAcks {
insert(m, retransmissions)
}
}
recv(m) {
insert(m.id, positiveAcks)
insert(m, received)
if m.id in negativeAcks {
delete(m.id, negativeAcks)
}
if m.id in retransmissions {
delete(m.id, retransmissions
}
for each mid in m.positiveAcks {
delete(mid, positiveAcks)
if mid not in received {
insert(mid, negativeAcks)
}
}
for each mid in m.negativeAcks {
if mid in received {
insert(mid, retransmissions)
} else {
insert(mid, negativeAcks)
}
}
}
retransmit() {
forever {
wait until (timeSince(lastSend) > threshold) && (len(retransmissions) > 0)
mid := pop(retransmissions)
m := received[mid]
send(m)
}
}
// Observable Predicate for Delivery.
// The process that broadcast c has received and acked message a at the time of broadcasting c.
// All assertions must hold in order to return true.
OPD(a, c m) bool {
assert (t.e. sequence [a, ..., c])
for each i, m in sequence, except a {
predecessor := sequence[i-1]
assert (predecessor in m.positiveAcks || m.sender == precessor.sender)
assert (m not in c.negativeAcks)
}
}
// Partial order.
// All assertions must hold in order to return true.
(c m) follows(b m) bool {
assert OPD(b, c)
for all a in received {
if OPD(a, b) {
assert OPD(a, c)
}
}
}
// Extend seq to include A.
OPDseq(seq []m, A m) ok {
if A in seq {
return true
}
// C -> ... -> B -> ... ?-> A
C := first(seq)
B := last(seq)
sent := all m in received s.t. m.sender == B && m not in seq
potentialPredecessors := sent + B.positiveAcks - C.negativeAcks
for each m in potentialPredecessors {
if OPDseq(append(seq, m), A) {
return true
}
}
return false
}
// Free memory from the received list.
prune() {
A := oldest(received)
for all q in group {
C := most recent m in received s.t. m.sender == q
if !OPDseq(m[]{C}, A) { // q has not received A
return
}
}
// All processes have received A.
delete(A, received)
}
|