summaryrefslogtreecommitdiffstats
path: root/doc/trans.pseudo
blob: 0bfcd435798d22f240bff6ce7ee76dc47bf752b5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Pseudocode of the Trans protocol described in "Broadcast Protocols for Distributed
// Systems", Melliar-Smith, P., Moser, L., and Agrawala, V. (1990) in "IEEE Transactions
// on Parallel and Distributed Systems vol. 1, no. 1.

// message
type m struct {
	id mid
	sender pid
	positiveAcks []mid
	negativeAcks []mid
	data
}

type mid "message ID"
type pid "process ID"

var (
	positiveAcks []mid
	negativeAcks []mid
	received []m
	retransmissions []mid
	lastSend time
)

send(m) {
	pkt := (m, positiveAcks, negativeAcks)
	multicast(pkt)
	positiveAcks = []
	go timeout(m)
	lastSend = now()
}

timeout(m) {
	sleep until timeout

	if m not in positiveAcks {
		insert(m, retransmissions)
	}
}

recv(m) {
	insert(m.id, positiveAcks)
	insert(m, received)

	if m.id in negativeAcks {
		delete(m.id, negativeAcks)
	}
	if m.id in retransmissions {
		delete(m.id, retransmissions
	}

	for each mid in m.positiveAcks {
		delete(mid, positiveAcks)
		if mid not in received {
			insert(mid, negativeAcks)
		}
	}

	for each mid in m.negativeAcks {
		if mid in received {
			insert(mid, retransmissions)
		} else {
			insert(mid, negativeAcks)
		}
	}
}

retransmit() {
	forever {
		wait until (timeSince(lastSend) > threshold) && (len(retransmissions) > 0)

		mid := pop(retransmissions)
		m := received[mid]
		send(m)
	}
}

// Observable Predicate for Delivery.
// The process that broadcast c has received and acked message a at the time of broadcasting c.
// All assertions must hold in order to return true.
OPD(a, c m) bool {
	assert (t.e. sequence [a, ..., c])
	for each i, m in sequence, except a {
		predecessor := sequence[i-1]
		assert (predecessor in m.positiveAcks || m.sender == precessor.sender)
		assert (m not in c.negativeAcks)
	}
}

// Partial order.
// All assertions must hold in order to return true.
(c m) follows(b m) bool {
	assert OPD(b, c)
	for all a in received {
		if OPD(a, b) {
			assert OPD(a, c)
		}
	}
}

// Extend seq to include A.
OPDseq(seq []m, A m) ok {
	if A in seq {
		return true
	}

	// C -> ... -> B -> ... ?-> A
	C := first(seq)
	B := last(seq)

	sent := all m in received s.t. m.sender == B && m not in seq
	potentialPredecessors := sent + B.positiveAcks - C.negativeAcks

	for each m in potentialPredecessors {
		if OPDseq(append(seq, m), A) {
			return true
		}
	}

	return false
}

// Free memory from the received list.
prune() {
	A := oldest(received)
	for all q in group {
		C := most recent m in received s.t. m.sender == q
		if !OPDseq(m[]{C}, A) { // q has not received A
			return
		}
	}
	// All processes have received A.
	delete(A, received)
}