summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/derms/net/rmulticast/ReceivedSet.java74
1 files changed, 46 insertions, 28 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java
index 5fd8ce5..c71a968 100644
--- a/src/main/java/derms/net/rmulticast/ReceivedSet.java
+++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java
@@ -1,17 +1,15 @@
package derms.net.rmulticast;
import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
class ReceivedSet<T extends MessagePayload> {
- private final Queue<Message<T>> received;
+ private final Map<MessageID, Entry<T>> received;
ReceivedSet() {
- this.received = new ConcurrentLinkedQueue<Message<T>>();
+ this.received = new ConcurrentHashMap<MessageID, Entry<T>>();
}
/**
@@ -20,20 +18,15 @@ class ReceivedSet<T extends MessagePayload> {
* @param msg The message to add to the set.
* @return True if the set did not already contain the specified message.
*/
- // TODO: faster insertion.
boolean add(Message<T> msg) {
- if (contains(msg))
- return false;
- received.add(msg);
- return true;
+ return received.put(msg.id(), new Entry<T>(msg)) == null;
}
- // TODO: faster search.
Message<T> getByID(MessageID mid) throws NoSuchElementException {
- for (Message<T> msg : received)
- if (msg.id().equals(mid))
- return msg;
- throw new NoSuchElementException("message " + mid + " not in received list.");
+ Entry<T> e = received.get(mid);
+ if (e == null)
+ throw new NoSuchElementException("message " + mid + " not in received list.");
+ return e.msg;
}
boolean contains(MessageID mid) {
@@ -51,31 +44,56 @@ class ReceivedSet<T extends MessagePayload> {
/** Remove the specified message from the set, if it is present. */
void remove(Message<T> msg) {
- received.remove(msg);
+ received.remove(msg.id());
}
/** Retrieves, but does not remove, the oldest message, or returns null if the set is empty. */
Message<T> peekOldest() {
- return received.peek();
+ Entry<T> oldest = null;
+ for (Entry<T> e : received.values())
+ if (oldest == null || e.timestamp.isBefore(oldest.timestamp))
+ oldest = e;
+ if (oldest == null)
+ return null;
+ return oldest.msg;
}
Message<T> mostRecentSentBy(InetAddress member) throws NoSuchElementException {
- Message<T> recent = null;
- for (Message<T> msg : received) {
- if (msg.sender.equals(member))
- recent = msg;
- }
+ Entry<T> recent = null;
+ for (Entry<T> e : received.values())
+ if (e.msg.sender.equals(member) && (recent == null || e.timestamp.isAfter(recent.timestamp)))
+ recent = e;
if (recent == null)
throw new NoSuchElementException("no message from " + member + " in received list.");
- return recent;
+ return recent.msg;
}
List<Message<T>> allSentBy(InetAddress sender) {
List<Message<T>> sent = new ArrayList<Message<T>>();
- for (Message<T> msg : received) {
- if (msg.sender.equals(sender))
- sent.add(msg);
+ for (Entry<T> e : received.values()) {
+ if (e.msg.sender.equals(sender))
+ sent.add(e.msg);
}
return sent;
}
+
+ private static class Entry<T extends MessagePayload> {
+ private final Message<T> msg;
+ private final Instant timestamp; // The time at which the entry was added to the set.
+
+ private Entry(Message<T> msg) {
+ this.msg = msg;
+ this.timestamp = Instant.now();
+ }
+
+ @Override
+ public int hashCode() {
+ return msg.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return msg.equals(obj);
+ }
+ }
}