From 0912e30fb375f60bade22fd8cf7331e1b8d77659 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Fri, 22 Nov 2024 12:39:53 -0500 Subject: ReliableUnicastReceiver --- .../java/derms/net/ConcurrentDatagramSocket.java | 10 ++++ .../derms/net/rmulticast/ReliableMulticast.java | 13 +---- src/main/java/derms/net/runicast/Message.java | 1 + src/main/java/derms/net/runicast/Receive.java | 60 ++++++++++++++++++++++ .../net/runicast/ReliableUnicastReceiver.java | 56 +++++++++++++++++++- .../derms/net/runicast/ReliableUnicastSender.java | 15 +----- src/main/java/derms/util/ThreadPool.java | 22 ++++++++ 7 files changed, 152 insertions(+), 25 deletions(-) create mode 100644 src/main/java/derms/net/runicast/Receive.java create mode 100644 src/main/java/derms/util/ThreadPool.java (limited to 'src/main/java') diff --git a/src/main/java/derms/net/ConcurrentDatagramSocket.java b/src/main/java/derms/net/ConcurrentDatagramSocket.java index 44d9186..59cc77a 100644 --- a/src/main/java/derms/net/ConcurrentDatagramSocket.java +++ b/src/main/java/derms/net/ConcurrentDatagramSocket.java @@ -7,10 +7,20 @@ import java.time.Duration; public class ConcurrentDatagramSocket { private final DatagramSocket sock; + /** Create a socket bound to any available port on the local machine. */ public ConcurrentDatagramSocket() throws IOException { this.sock = new DatagramSocket(); } + /** + * Creates a socket bound to the specified address and port. + * + * @param laddr The local IP address and port to listen on. + */ + public ConcurrentDatagramSocket(SocketAddress laddr) throws SocketException { + this.sock = new DatagramSocket(laddr); + } + public synchronized void close() { sock.close(); } diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index c201e3a..3894021 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -3,6 +3,7 @@ package derms.net.rmulticast; import derms.net.ConcurrentMulticastSocket; import derms.net.MessagePayload; import derms.net.Packet; +import derms.util.ThreadPool; import java.io.IOException; import java.net.*; @@ -18,8 +19,6 @@ import java.util.logging.Logger; * Distributed Systems" in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, 1990. */ public class ReliableMulticast { - private static final Duration terminationTimeout = Duration.ofSeconds(1); - private final SocketAddress group; private final InetAddress laddr; // Local address. private final Set acks; // Positively acknowledged messages. @@ -64,15 +63,7 @@ public class ReliableMulticast { public void close() { log.info("Shutting down..."); - pool.shutdownNow(); - try { - if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) - log.warning("Thread pool did not terminate after " + terminationTimeout); - } catch (InterruptedException e) { - log.warning("Interrupted while terminating thread pool: " + e.getMessage()); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } + ThreadPool.shutDown(pool, log); outSock.close(); log.info("Finished shutting down."); } diff --git a/src/main/java/derms/net/runicast/Message.java b/src/main/java/derms/net/runicast/Message.java index 802cdb5..7d142da 100644 --- a/src/main/java/derms/net/runicast/Message.java +++ b/src/main/java/derms/net/runicast/Message.java @@ -3,6 +3,7 @@ package derms.net.runicast; import derms.net.MessagePayload; import java.io.Serializable; +import java.net.InetAddress; class Message implements Serializable { final long seq; // Sequence number. diff --git a/src/main/java/derms/net/runicast/Receive.java b/src/main/java/derms/net/runicast/Receive.java new file mode 100644 index 0000000..584861b --- /dev/null +++ b/src/main/java/derms/net/runicast/Receive.java @@ -0,0 +1,60 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; + +import java.io.IOException; +import java.net.*; +import java.util.Queue; +import java.util.logging.Logger; + +class Receive implements Runnable { + private static final int bufSize = 8192; + + private long seq; // Sequence number. + private final ConcurrentDatagramSocket sock; + private final Queue delivered; + private final Logger log; + + Receive(ConcurrentDatagramSocket sock, Queue delivered) { + this.seq = 0; + this.sock = sock; + this.delivered = delivered; + this.log = Logger.getLogger(getClass().getName()); + } + + @Override + public void run() { + DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + for (;;) { + try { + sock.receive(pkt); + Message msg = (Message) Packet.decode(pkt, Message.class); + SocketAddress sender = pkt.getSocketAddress(); + recv(msg, sender); + } catch (SocketTimeoutException e) { + if (Thread.interrupted()) { + log.info("Interrupted"); + return; + } + } catch (IOException | ClassNotFoundException | ClassCastException e) { + log.warning(e.getMessage()); + } + } + } + + private void recv(Message msg, SocketAddress sender) throws IOException { + if (msg.seq == seq) { + delivered.add(msg.payload); + ack(msg, sender); + seq++; + } + } + + private void ack(Message msg, SocketAddress sender) throws IOException { + Ack ack = new Ack(msg.seq); + DatagramPacket pkt = Packet.encode(ack, sender); + sock.send(pkt); + } +} diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java index 3d63a6f..dc69834 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java @@ -1,4 +1,58 @@ package derms.net.runicast; -public class ReliableUnicastReceiver { +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.util.ThreadPool; + +import java.io.IOException; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.logging.Logger; + +public class ReliableUnicastReceiver { + private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. + + private final ConcurrentDatagramSocket sock; + private final BlockingQueue delivered; + private final Logger log; + private final ExecutorService pool; + + /** + * @param laddr The local IP address and port to listen on. + */ + ReliableUnicastReceiver(SocketAddress laddr) throws IOException { + this.sock = new ConcurrentDatagramSocket(laddr); + this.sock.setSoTimeout(soTimeout); + this.delivered = new LinkedBlockingQueue(); + this.log = Logger.getLogger(getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new Receive(sock, delivered)); + } + + public void close() { + log.info("Shutting down"); + ThreadPool.shutDown(pool, log); + sock.close(); + } + + /** Receive a message, blocking if necessary until one arrives. */ + public T receive() throws InterruptedException { + return delivered.take(); + } + + /** Receive a message, or return null if none are available. */ + public T tryReceive() { + return delivered.poll(); + } + + /** + * Receive a message, waiting up to the specified wait time if necessary. + * + * @return A message, or null if the specified waiting time elapses before + * a message arrives. + */ + public T tryReceive(Duration waitTime) throws InterruptedException { + return delivered.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS); + } } diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java index 2a0943a..ecfece4 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastSender.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java @@ -3,23 +3,21 @@ package derms.net.runicast; import derms.net.ConcurrentDatagramSocket; import derms.net.MessagePayload; import derms.net.Packet; +import derms.util.ThreadPool; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetSocketAddress; -import java.net.SocketException; import java.time.Duration; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; public class ReliableUnicastSender { private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. - private static final Duration terminationTimeout = Duration.ofSeconds(1); private final AtomicLong next; // Next sequence number. private final AtomicLong unacked; // Sequence number of first unacknowledged message. @@ -67,16 +65,7 @@ public class ReliableUnicastSender { /** Close the connection immediately, without waiting for acknowledgements. */ public void closeNow() { log.info("Shutting down."); - pool.shutdownNow(); - try { - if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) - log.warning("Thread pool did not terminate after " + terminationTimeout); - } catch (InterruptedException e) { - log.warning("Interrupted while terminating thread pool: " + e.getMessage()); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } - + ThreadPool.shutDown(pool, log); sock.close(); } } diff --git a/src/main/java/derms/util/ThreadPool.java b/src/main/java/derms/util/ThreadPool.java new file mode 100644 index 0000000..33588ff --- /dev/null +++ b/src/main/java/derms/util/ThreadPool.java @@ -0,0 +1,22 @@ +package derms.util; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class ThreadPool { + public static final Duration timeout = Duration.ofSeconds(1); + + public static void shutDown(ExecutorService pool, Logger log) { + pool.shutdownNow(); + try { + if (!pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + timeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + } +} -- cgit v1.2.3