summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
blob: 0d8b6905d8a0a8e9fed72ab5ecc5aa7637f2cf81 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package derms.net.tomulticast;

import derms.net.MessagePayload;
import derms.net.rmulticast.ReliableMulticast;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.logging.Logger;

/**
 * <p>A single-sender, multiple-receiver multicast group that enforces a total order on
 * received messages. If a process delivers message <i>A</i> before message <i>B</i>,
 * then all other processes will deliver <i>A</i> and <i>B</i> in the same order
 * (<i>A</i> before <i>B</i>).</p>
 *
 * <p>Additionally, the protocol guarantees reliable delivery of messages. If the sender
 * crashes while sending a message, either all processes will receive the message, or
 * none of them will.</p>
 *
 * <p><b>Only one sender is allowed per multicast group</b>, but a group may have multiple
 * receivers. Use {@link TotalOrderMulticastSender} to send
 * messages, and {@link TotalOrderMulticastReceiver} to receive
 * them.</p>
 */
public abstract class TotalOrderMulticast<T extends MessagePayload> {
    final ReliableMulticast<Message<T>> sock;
    protected final InetSocketAddress group;
    protected Long seq; // Sequence number.
    protected final Logger log;

    protected TotalOrderMulticast(InetSocketAddress group, InetAddress laddr) throws IOException {
        this.sock = new ReliableMulticast<Message<T>>(group, laddr);
        this.group = group;
        this.seq = (long) 0;
        this.log = Logger.getLogger(this.getClass().getName());
    }

    /** Close the underlying socket. */
    public void close() throws IOException {
        sock.close();
    }

    /** Increment the sequence number. */
    protected void incSeq() {
        if (seq < seq.MAX_VALUE) {
            seq++;
        } else {
            log.warning("Sequence number overflow. Wrapping to 0.");
            seq = (long) 0;
        }
    }
}