diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 14:28:13 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-16 14:28:13 -0500 |
| commit | 05c4e2b5770133228daff7c262945f078a4e4456 (patch) | |
| tree | db2252c3f7209baafc5a43edc7d2eced7b294b2f /src/main/java/derms/net/rmulticast/ReliableMulticast.java | |
| parent | d1406ab917339aa1531060da2c91043790f66d16 (diff) | |
| download | soen423-05c4e2b5770133228daff7c262945f078a4e4456.zip | |
ReliableMulticast.close()
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 36 |
1 files changed, 21 insertions, 15 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index dd2991a..6edab5d 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -7,10 +7,7 @@ import java.io.IOException; import java.net.*; import java.time.Duration; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.logging.Logger; /** @@ -20,6 +17,8 @@ import java.util.logging.Logger; * Distributed Systems" in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, 1990. */ public class ReliableMulticast<T extends MessagePayload> { + private static final Duration terminationTimeout = Duration.ofSeconds(1); + private final SocketAddress group; private final InetAddress laddr; // Local address. private final Set<MessageID> acks; // Positively acknowledged messages. @@ -30,6 +29,7 @@ public class ReliableMulticast<T extends MessagePayload> { private final ConcurrentMulticastSocket outSock; private final BlockingQueue<Message<T>> delivered; private final Logger log; + private final ExecutorService pool; /** * Join the specified multicast group. @@ -54,20 +54,26 @@ public class ReliableMulticast<T extends MessagePayload> { this.log = Logger.getLogger(this.getClass().getName()); - ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket(group.getPort()); - inSock.joinGroup(group.getAddress()); - (new Thread(new Receive<T>(inSock, acks, nacks, received, retransmissions, groupMembers, delivered))).start(); - - (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start(); - - (new Thread(new Prune<T>(received, groupMembers))).start(); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new Receive<T>(group, acks, nacks, received, retransmissions, groupMembers, delivered)); + pool.execute(new Retransmit<T>(retransmissions, outSock, group)); + pool.execute(new Prune<T>(received, groupMembers)); + pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock)); + } + public void close() { + log.info("Shutting down..."); + pool.shutdownNow(); try { - (new Thread(new Heartbeat(group, laddr, acks, nacks, outSock))).start(); - } catch (IOException e) { - log.severe("Failed to start heartbeat thread: " + e.getMessage()); - throw e; + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); } + outSock.close(); + log.info("Finished shutting down."); } /** Send a message to the group. */ |