summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/ReliableMulticast.java
blob: c08d5a105d5cd6a133cc7dea1357fb3d2e69bf4e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package derms.net.rmulticast;

import derms.io.Serial;
import derms.net.MessagePayload;
import derms.net.Net;
import derms.util.ThreadPool;

import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.*;
import java.util.logging.Logger;

/**
 * A reliable multicast protocol that guarantees delivery of messages in the event of a fail-stop.
 *
 * An implementation of the Trans protocol over IP multicast. Melliar-Smith, et al. "Broadcast Protocols for
 * Distributed Systems" in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, 1990.
 */
public class ReliableMulticast<T extends MessagePayload> {
    private final SocketAddress group;
    private final InetAddress laddr; // Local address.
    private final Set<MessageID> acks; // Positively acknowledged messages.
    private final Set<MessageID> nacks; // Negatively acknowledged messages.
    private final ReceivedSet<T> received;
    private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
    private final Set<InetAddress> groupMembers;
    private final DatagramChannel sock;
    private final BlockingQueue<Message<T>> delivered;
    private final Logger log;
    private final ExecutorService pool;

    /**
     * Join the specified multicast group.
     *
     * @param group The IP address and port of the multicast group.
     * @param laddr The IP address of the local process.
     * @param ifs The network interface to use.
     */
    public ReliableMulticast(InetSocketAddress group, InetAddress laddr, NetworkInterface ifs) throws IOException {
        this.group = group;
        this.laddr = laddr;

        this.acks = ConcurrentHashMap.newKeySet();
        this.nacks = ConcurrentHashMap.newKeySet();
        this.received = new ReceivedSet<T>();
        this.retransmissions = new LinkedBlockingQueue<Message<T>>();
        this.groupMembers = ConcurrentHashMap.newKeySet();

        System.out.println(getClass().getSimpleName() + " using network interface " + ifs);
        this.sock = DatagramChannel.open(StandardProtocolFamily.INET)
                .setOption(StandardSocketOptions.SO_REUSEADDR, true)
                .bind(new InetSocketAddress(group.getAddress(), group.getPort()))
                .setOption(StandardSocketOptions.IP_MULTICAST_IF, ifs);
        sock.join(group.getAddress(), ifs);

        this.delivered = new LinkedBlockingQueue<Message<T>>();

        this.log = Logger.getLogger(this.getClass().getName());

        this.pool = Executors.newCachedThreadPool();
        pool.execute(new Receive<T>(sock, acks, nacks, received, retransmissions, groupMembers, delivered));
        pool.execute(new Retransmit<T>(retransmissions, sock, group));
        pool.execute(new Prune<T>(received, groupMembers));
        pool.execute(new Heartbeat(group, laddr, acks, nacks, sock));
    }

    /**
     * Join the specified multicast group using the default network interface on the machine.
     *
     * @param group The IP address and port of the multicast group.
     * @param laddr The IP address of the local process.
     */
    public ReliableMulticast(InetSocketAddress group, InetAddress laddr) throws IOException {
        this(group, laddr, Net.getMulticastInterface());
    }

    public void close() throws IOException {
        log.info("Shutting down...");
        sock.close();
        ThreadPool.shutdownNow(pool, log);
        log.info("Finished shutting down.");
    }

    /** Send a message to the group. */
    public void send(T payload) throws IOException {
        Message<T> msg = new Message<T>(
                payload,
                laddr,
                acks.toArray(new MessageID[0]),
                nacks.toArray(new MessageID[0]));
        ByteBuffer buf = Serial.encode(msg);
        sock.send(buf, group);
        acks.clear();
        (new Thread(new Timeout<T>(msg, acks, retransmissions))).start();
        log.info("Sent " + msg + " from " + sock.getLocalAddress() + " to " + group);
    }

    /** Receive a message from the group, blocking if necessary until a message arrives. */
    public T receive() throws InterruptedException {
        Message<T> msg = delivered.take(); // Blocks until a message becomes available.
        return msg.payload;
    }

    /** Receive a message, or return null if none are available. */
    public T tryReceive() {
        Message<T> msg = delivered.poll();
        if (msg == null)
            return null;
        return msg.payload;
    }

    /**
     * Receive a message, waiting up to the specified wait time if necessary.
     *
     * @return A message received from the group, or null if the specified waiting
     * time elapses before a message arrives.
     */
    public T tryReceive(Duration waitTime) throws InterruptedException {
        Message<T> msg = delivered.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS);
        if (msg == null)
            return null;
        return msg.payload;
    }
}