From 3f63da76ab16450694d84dd29503d2e7ec0198c8 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Fri, 15 Nov 2024 17:08:36 -0500 Subject: reliable multicast: MessagePayload interface --- src/main/java/derms/net/rmulticast/Message.java | 2 +- src/main/java/derms/net/rmulticast/MessagePayload.java | 5 +++++ src/main/java/derms/net/rmulticast/NullPayload.java | 4 +--- src/main/java/derms/net/rmulticast/Prune.java | 3 +-- src/main/java/derms/net/rmulticast/Receive.java | 3 +-- src/main/java/derms/net/rmulticast/ReceivedSet.java | 3 +-- src/main/java/derms/net/rmulticast/ReliableMulticast.java | 3 +-- src/main/java/derms/net/rmulticast/Retransmit.java | 3 +-- src/main/java/derms/net/rmulticast/Timeout.java | 3 +-- 9 files changed, 13 insertions(+), 16 deletions(-) create mode 100644 src/main/java/derms/net/rmulticast/MessagePayload.java (limited to 'src/main') diff --git a/src/main/java/derms/net/rmulticast/Message.java b/src/main/java/derms/net/rmulticast/Message.java index 180b245..a465f03 100644 --- a/src/main/java/derms/net/rmulticast/Message.java +++ b/src/main/java/derms/net/rmulticast/Message.java @@ -3,7 +3,7 @@ package derms.net.rmulticast; import java.io.Serializable; import java.net.InetAddress; -class Message implements Serializable { +class Message implements Serializable { T payload; InetAddress sender; MessageID[] acks; // IDs of messages that this message positively acknowledges. diff --git a/src/main/java/derms/net/rmulticast/MessagePayload.java b/src/main/java/derms/net/rmulticast/MessagePayload.java new file mode 100644 index 0000000..ca36831 --- /dev/null +++ b/src/main/java/derms/net/rmulticast/MessagePayload.java @@ -0,0 +1,5 @@ +package derms.net.rmulticast; + +import java.io.Serializable; + +public interface MessagePayload extends Serializable, Hashable {} diff --git a/src/main/java/derms/net/rmulticast/NullPayload.java b/src/main/java/derms/net/rmulticast/NullPayload.java index 1eee5d6..4c818e4 100644 --- a/src/main/java/derms/net/rmulticast/NullPayload.java +++ b/src/main/java/derms/net/rmulticast/NullPayload.java @@ -1,8 +1,6 @@ package derms.net.rmulticast; -import java.io.Serializable; - -class NullPayload implements Serializable, Hashable { +class NullPayload implements MessagePayload { @Override public int hash() { return -1; diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java index e748248..c156bcc 100644 --- a/src/main/java/derms/net/rmulticast/Prune.java +++ b/src/main/java/derms/net/rmulticast/Prune.java @@ -1,6 +1,5 @@ package derms.net.rmulticast; -import java.io.Serializable; import java.net.InetAddress; import java.time.Duration; import java.util.ArrayList; @@ -10,7 +9,7 @@ import java.util.Set; import java.util.logging.Logger; /** Free memory from the received list. */ -class Prune implements Runnable { +class Prune implements Runnable { private static final Duration period = Duration.ofMinutes(1); private final ReceivedSet received; diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java index 29ee305..0856a71 100644 --- a/src/main/java/derms/net/rmulticast/Receive.java +++ b/src/main/java/derms/net/rmulticast/Receive.java @@ -4,14 +4,13 @@ import derms.net.ConcurrentMulticastSocket; import derms.net.Packet; import java.io.IOException; -import java.io.Serializable; import java.net.DatagramPacket; import java.net.InetAddress; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; -class Receive implements Runnable { +class Receive implements Runnable { private static final int bufSize = 8192; private final ConcurrentMulticastSocket inSock; diff --git a/src/main/java/derms/net/rmulticast/ReceivedSet.java b/src/main/java/derms/net/rmulticast/ReceivedSet.java index 963b321..831ecbe 100644 --- a/src/main/java/derms/net/rmulticast/ReceivedSet.java +++ b/src/main/java/derms/net/rmulticast/ReceivedSet.java @@ -1,6 +1,5 @@ package derms.net.rmulticast; -import java.io.Serializable; import java.net.InetAddress; import java.util.ArrayList; import java.util.List; @@ -8,7 +7,7 @@ import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -class ReceivedSet { +class ReceivedSet { private final Queue> received; ReceivedSet() { diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index a9e1fa4..2b9d9aa 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -4,7 +4,6 @@ import derms.net.ConcurrentMulticastSocket; import derms.net.Packet; import java.io.IOException; -import java.io.Serializable; import java.net.*; import java.time.Instant; import java.util.Set; @@ -15,7 +14,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; /** TODO */ -public class ReliableMulticast { +public class ReliableMulticast { private final SocketAddress group; private final InetAddress laddr; // Local address. private final Set acks; // Positively acknowledged messages. diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java index 3f2fdbb..32e8d51 100644 --- a/src/main/java/derms/net/rmulticast/Retransmit.java +++ b/src/main/java/derms/net/rmulticast/Retransmit.java @@ -3,14 +3,13 @@ package derms.net.rmulticast; import derms.net.ConcurrentMulticastSocket; import derms.net.Packet; -import java.io.Serializable; import java.net.DatagramPacket; import java.net.SocketAddress; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; /** Retransmit dropped messages. */ -class Retransmit implements Runnable { +class Retransmit implements Runnable { private final BlockingQueue> retransmissions; private final ConcurrentMulticastSocket outSock; private final SocketAddress group; diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java index 7b16f67..79eb606 100644 --- a/src/main/java/derms/net/rmulticast/Timeout.java +++ b/src/main/java/derms/net/rmulticast/Timeout.java @@ -1,13 +1,12 @@ package derms.net.rmulticast; -import java.io.Serializable; import java.time.Duration; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; /** If a message is not positively acknowledged after some time, Timeout puts it in the retransmissions list. */ -class Timeout implements Runnable { +class Timeout implements Runnable { private static final Duration timeout = Duration.ofSeconds(1); private final Message msg; -- cgit v1.2.3