blob: c156bcc05d1256209d10a48b703a8d98d81649c5 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
package derms.net.rmulticast;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.logging.Logger;
/** Free memory from the received list. */
class Prune<T extends MessagePayload> implements Runnable {
private static final Duration period = Duration.ofMinutes(1);
private final ReceivedSet<T> received;
private final Set<InetAddress> groupMembers;
private final Logger log;
Prune(ReceivedSet<T> received, Set<InetAddress> groupMembers) {
this.received = received;
this.groupMembers = groupMembers;
this.log = Logger.getLogger(this.getClass().getName());
}
@Override
public void run() {
try {
for (; ; ) {
Wait.forDuration(period);
prune();
}
} catch (InterruptedException e) {
log.info("Prune thread interrupted: " + e.getMessage());
}
}
private void prune() {
// Get the candidate for removal.
Message<T> a = received.peekOldest();
if (a == null)
return;
// Ensure all group members have received and acknowledge message a.
for (InetAddress member : groupMembers) {
try {
Message<T> c = received.mostRecentSentBy(member);
if (!receivedByMemberAtTimeOfSending(a, member, c))
return; // Member has not received a -- cannot prune it.
} catch (NoSuchElementException e) {
log.warning("No message received from " + member);
return;
}
}
// All group members have received and acknowledged message a. It is safe to delete.
received.remove(a);
}
/**
* Return true if member has received and ack'ed message a at the time they sent message c.
* See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
*
* @param a A message that may or may not have been received by member.
* @param member The process that sent c.
* @param c A message that was sent by member.
*/
private boolean receivedByMemberAtTimeOfSending(Message<T> a, InetAddress member, Message<T> c) {
List<Message<T>> seq = new ArrayList<Message<T>>();
seq.add(c);
return OPDseq(seq, a);
}
/**
* Try to extend the OPD sequence to contain a.
* See Observable Predicate for Delivery (OPD) in "Broadcast Protocols for Distributed Systems" Melliar-Smith et. al. (1990).
*
* @param seq A sequence of messages each of which acknowledges its predecessor by the OPD.
* @return True if the sequence can extend to a.
*/
private boolean OPDseq(List<Message<T>> seq, Message<T> a) {
if (seq.contains(a))
return true;
// c -> ... -> b -> ... ?-> a
Message<T> c = seq.get(0);
Message<T> b = seq.get(seq.size() - 1);
// All messages sent by the sender of b (that are not already in the sequence) can potentially by added to the sequence.
List<Message<T>> potentialPredecessors = received.allSentBy(b.sender);
potentialPredecessors.removeAll(seq);
// Add messages that b positively acknowledged.
for (MessageID mid : b.acks) {
try {
Message<T> msg = received.getByID(mid);
potentialPredecessors.add(msg);
} catch (NoSuchElementException e) {
log.warning("message " + mid + ", acknowledged by " +b.id() + ", is not in the received list. Continuing anyway.");
}
}
// Remove messages that c negatively acknowledged.
for (MessageID mid : c.nacks) {
for (int i = 0; i < potentialPredecessors.size(); i++) {
Message<T> msg = potentialPredecessors.get(i);
if (msg.id().equals(mid)) {
potentialPredecessors.remove(i);
break;
}
}
}
// Try and extend the sequence.
for (Message<T> predecessor : potentialPredecessors) {
seq.add(predecessor);
if (OPDseq(seq, a))
return true;
// Failed; remove the predecessor and try another.
seq.remove(seq.size() - 1);
}
// The sequence cannot be extended to include a.
return false;
}
}
|