From d164f320be094e2348260785386fff0e756d0ec9 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 9 Nov 2024 17:24:51 -0500 Subject: sequencer pseudocode --- doc/sequencer.pseudo | 47 +++++++++++++++++++++++++ doc/trans.pseudo | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 doc/sequencer.pseudo create mode 100644 doc/trans.pseudo (limited to 'doc') diff --git a/doc/sequencer.pseudo b/doc/sequencer.pseudo new file mode 100644 index 0000000..f511eee --- /dev/null +++ b/doc/sequencer.pseudo @@ -0,0 +1,47 @@ +// Sequencer based on "Distributed Systems: Concepts and Design" 5e, Coulouris et +// al. (2012) pp. 654-655. Depends on reliable multicast, implemented here as the +// Trans protocol. + +type dataMsg struct { + id int + data +} + +type orderMsg struct { + id int + seq int +} + +class groupMember { + seq := 0 + holdback := new Queue[dataMsg] + deliver := new Queue[dataMsg] + rMulticast := new Trans + + multicast(m dataMsg) { + rMulticast.send(m) + } + + on rMulticast.recv(m dataMsg) { + holdback.enqueue(m) + } + + on rMulticast.recv(m orderMsg) { + wait until m.id in holdback && seq == m.seq + data := delete(m.id, holdback) + deliver.enqueue(data) + seq++ + } +} + +// Sequencer is also a member of the multicast group. +class sequencer { + seq := 0 + rMulticast := new Trans + + on rMulticast.recv(m dataMsg) { + order := orderMsg{m.id, seq} + rMulticast.send(order) + seq++ + } +} diff --git a/doc/trans.pseudo b/doc/trans.pseudo new file mode 100644 index 0000000..405231d --- /dev/null +++ b/doc/trans.pseudo @@ -0,0 +1,97 @@ +// Pseudocode of the Trans protocol described in "Broadcast Protocols for Distributed +// Systems", Melliar-Smith, P., Moser, L., and Agrawala, V. (1990) in "IEEE Transactions +// on Parallel and Distributed Systems vol. 1, no. 1. + +// message +type m struct { + id mid + sender pid + seq int // sequence number + positiveAcks []mid + negativeAcks []mid + data +} + +type mid "message ID" +type pid "process ID" + +var ( + positiveAcks []mid + negativeAcks []mid + received []m + retransmissions []mid +) + +send(m) { + pkt := (m, positiveAcks, negativeAcks) + multicast(pkt) + positiveAcks = [] + go timeout(m) +} + +timeout(m) { + sleep until timeout + + if m not in positiveAcks { + insert(m, retransmissions) + } +} + +recv(m) { + insert(m.id, positiveAcks) + insert(m, received) + + if m.id in negativeAcks { + delete(m.id, negativeAcks) + } + if m.id in retransmissions { + delete(m.id, retransmissions + } + + for each mid in m.positiveAcks { + delete(mid, positiveAcks) + if mid not in received { + insert(mid, negativeAcks) + } + } + + for each mid in m.negativeAcks { + if mid in received { + insert(mid, retransmissions) + } else { + insert(mid, negativeAcks) + } + } + + acks := union(positiveAcks, negativeAcks) + for each ack in acks s.t. sender(ack) == sender(m) && m.seq > ack.seq+1 { + insert(m.id, negativeAcks) + } +} + +prune() { + // TODO: use Observable Predicate for Delivery +} + +// Observable Predicate for Delivery. +// The process that broadcast c has received and acked message a at the time of broadcasting c. +// All assertions must hold in order to return true. +OPD(a, c m) bool { + assert (t.e. sequence [a, ..., c]) + for each i, m in sequence, except a { + predecessor := sequence[i-1] + assert (predecessor in m.positiveAcks || m.sender == precessor.sender) + assert (m not in c.negativeAcks) + } +} + +// Partial order. +// All assertions must hold in order to return true. +(c m) follows(b m) bool { + assert OPD(b, c) + for all a in received { + if OPD(a, b) { + assert OPD(a, c) + } + } +} -- cgit v1.2.3