// Pseudocode of the Trans protocol described in "Broadcast Protocols for Distributed // Systems", Melliar-Smith, P., Moser, L., and Agrawala, V. (1990) in "IEEE Transactions // on Parallel and Distributed Systems vol. 1, no. 1. // message type m struct { id mid sender pid positiveAcks []mid negativeAcks []mid data } type mid "message ID" type pid "process ID" var ( positiveAcks []mid negativeAcks []mid received []m retransmissions []mid lastSend time ) send(m) { pkt := (m, positiveAcks, negativeAcks) multicast(pkt) positiveAcks = [] go timeout(m) lastSend = now() } timeout(m) { sleep until timeout if m not in positiveAcks { insert(m, retransmissions) } } recv(m) { insert(m.id, positiveAcks) insert(m, received) if m.id in negativeAcks { delete(m.id, negativeAcks) } if m.id in retransmissions { delete(m.id, retransmissions } for each mid in m.positiveAcks { delete(mid, positiveAcks) if mid not in received { insert(mid, negativeAcks) } } for each mid in m.negativeAcks { if mid in received { insert(mid, retransmissions) } else { insert(mid, negativeAcks) } } } retransmit() { forever { wait until (timeSince(lastSend) > threshold) && (len(retransmissions) > 0) mid := pop(retransmissions) m := received[mid] send(m) } } // Observable Predicate for Delivery. // The process that broadcast c has received and acked message a at the time of broadcasting c. // All assertions must hold in order to return true. OPD(a, c m) bool { assert (t.e. sequence [a, ..., c]) for each i, m in sequence, except a { predecessor := sequence[i-1] assert (predecessor in m.positiveAcks || m.sender == precessor.sender) assert (m not in c.negativeAcks) } } // Partial order. // All assertions must hold in order to return true. (c m) follows(b m) bool { assert OPD(b, c) for all a in received { if OPD(a, b) { assert OPD(a, c) } } } // Extend seq to include A. OPDseq(seq []m, A m) ok { if A in seq { return true } // C -> ... -> B -> ... ?-> A C := first(seq) B := last(seq) sent := all m in received s.t. m.sender == B && m not in seq potentialPredecessors := sent + B.positiveAcks - C.negativeAcks for each m in potentialPredecessors { if OPDseq(append(seq, m), A) { return true } } return false } // Free memory from the received list. prune() { A := oldest(received) for all q in group { C := most recent m in received s.t. m.sender == q if !OPDseq(m[]{C}, A) { // q has not received A return } } // All processes have received A. delete(A, received) }