summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/Prune.java
blob: e7482482af41c76f48613a645c49b76fa3af67d7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package derms.net.rmulticast;

import java.io.Serializable;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.logging.Logger;

/** Free memory from the received list. */
class Prune<T extends Serializable & Hashable> implements Runnable {
    private static final Duration period = Duration.ofMinutes(1);

    private final ReceivedSet<T> received;
    private final Set<InetAddress> groupMembers;
    private final Logger log;

    Prune(ReceivedSet<T> received, Set<InetAddress> groupMembers) {
        this.received = received;
        this.groupMembers = groupMembers;
        this.log = Logger.getLogger(this.getClass().getName());
    }

    @Override
    public void run() {
        try {
            for (; ; ) {
                Wait.forDuration(period);
                prune();
            }
        } catch (InterruptedException e) {
            log.info("Prune thread interrupted: " + e.getMessage());
        }
    }

    private void prune() {
        // Get the candidate for removal.
        Message<T> a = received.peekOldest();
        if (a == null)
            return;

        // Ensure all group members have received and acknowledge message a.
        for (InetAddress member : groupMembers) {
            try {
                Message<T> c = received.mostRecentSentBy(member);
                if (!receivedByMemberAtTimeOfSending(a, member, c))
                    return; // Member has not received a -- cannot prune it.
            } catch (NoSuchElementException e) {
                log.warning("No message received from " + member);
                return;
            }
        }

        // All group members have received and acknowledged message a. It is safe to delete.
        received.remove(a);
    }

    /**
     * Return true if member has received and ack'ed message a at the time they sent message c.
     * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
     *
     * @param a      A message that may or may not have been received by member.
     * @param member The process that sent c.
     * @param c      A message that was sent by member.
     */
    private boolean receivedByMemberAtTimeOfSending(Message<T> a, InetAddress member, Message<T> c) {
        List<Message<T>> seq = new ArrayList<Message<T>>();
        seq.add(c);
        return OPDseq(seq, a);
    }

    /**
     * Try to extend the OPD sequence to contain a.
     * See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
     *
     * @param seq A sequence of messages each of which acknowledges its predecessor by the OPD.
     * @return True if the sequence can extend to a.
     */
    private boolean OPDseq(List<Message<T>> seq, Message<T> a) {
        if (seq.contains(a))
            return true;

        // c -> ... -> b -> ... ?-> a
        Message<T> c = seq.get(0);
        Message<T> b = seq.get(seq.size() - 1);

        // All messages sent by the sender of b (that are not already in the sequence) can potentially by added to the sequence.
        List<Message<T>> potentialPredecessors = received.allSentBy(b.sender);
        potentialPredecessors.removeAll(seq);

        // Add messages that b positively acknowledged.
        for (MessageID mid : b.acks) {
            try {
                Message<T> msg = received.getByID(mid);
                potentialPredecessors.add(msg);
            } catch (NoSuchElementException e) {
                log.warning("message " + mid + ", acknowledged by " +b.id() + ", is not in the received list. Continuing anyway.");
            }
        }

        // Remove messages that c negatively acknowledged.
        for (MessageID mid : c.nacks) {
            for (int i = 0; i < potentialPredecessors.size(); i++) {
                Message<T> msg = potentialPredecessors.get(i);
                if (msg.id().equals(mid)) {
                    potentialPredecessors.remove(i);
                    break;
                }
            }
        }

        // Try and extend the sequence.
        for (Message<T> predecessor : potentialPredecessors) {
            seq.add(predecessor);
            if (OPDseq(seq, a))
                return true;
            // Failed; remove the predecessor and try another.
            seq.remove(seq.size() - 1);
        }

        // The sequence cannot be extended to include a.
        return false;
    }
}