type Message[T] struct { seq int data T } type Ack int class sender[T] { const ackTimeout = 500ms var ( next := 0 // Next sequence number. unacked := 0 // Sequence number of first unacked message. sent Queue[Message[T]] sock Socket ) func send(data) { msg := Message{next, data} sock.send(msg) sent.add(msg) next++ } thread retransmit() { wait(ackTimeout) if unacked < next { forall msg in sent s.t. msg.seq >= unacked { sock.send(msg) } } } thread recvAcks() { ack := sock.receive() if ack >= unacked { unacked = ack+1 } while sent.peek().seq <= ack { sent.pop() } } } class receiver[T] { var ( seq := 0 // Next sequence number. sock Socket delivered Queue[T] ) thread recv() { msg := sock.receive() if msg.seq == seq { delivered.add(msg.data) sock.send(Ack(seq)) seq++ } } func deliver() T { return delivered.pop() } }