From 05c4e2b5770133228daff7c262945f078a4e4456 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Sat, 16 Nov 2024 14:28:13 -0500 Subject: ReliableMulticast.close() --- .../derms/net/rmulticast/ReliableMulticast.java | 36 +++++++++++++--------- 1 file changed, 21 insertions(+), 15 deletions(-) (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java') diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index dd2991a..6edab5d 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -7,10 +7,7 @@ import java.io.IOException; import java.net.*; import java.time.Duration; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.logging.Logger; /** @@ -20,6 +17,8 @@ import java.util.logging.Logger; * Distributed Systems" in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, 1990. */ public class ReliableMulticast { + private static final Duration terminationTimeout = Duration.ofSeconds(1); + private final SocketAddress group; private final InetAddress laddr; // Local address. private final Set acks; // Positively acknowledged messages. @@ -30,6 +29,7 @@ public class ReliableMulticast { private final ConcurrentMulticastSocket outSock; private final BlockingQueue> delivered; private final Logger log; + private final ExecutorService pool; /** * Join the specified multicast group. @@ -54,20 +54,26 @@ public class ReliableMulticast { this.log = Logger.getLogger(this.getClass().getName()); - ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket(group.getPort()); - inSock.joinGroup(group.getAddress()); - (new Thread(new Receive(inSock, acks, nacks, received, retransmissions, groupMembers, delivered))).start(); - - (new Thread(new Retransmit(retransmissions, outSock, group))).start(); - - (new Thread(new Prune(received, groupMembers))).start(); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new Receive(group, acks, nacks, received, retransmissions, groupMembers, delivered)); + pool.execute(new Retransmit(retransmissions, outSock, group)); + pool.execute(new Prune(received, groupMembers)); + pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock)); + } + public void close() { + log.info("Shutting down..."); + pool.shutdownNow(); try { - (new Thread(new Heartbeat(group, laddr, acks, nacks, outSock))).start(); - } catch (IOException e) { - log.severe("Failed to start heartbeat thread: " + e.getMessage()); - throw e; + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); } + outSock.close(); + log.info("Finished shutting down."); } /** Send a message to the group. */ -- cgit v1.2.3