summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/rmulticast/ReliableMulticast.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/rmulticast/ReliableMulticast.java')
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java36
1 files changed, 21 insertions, 15 deletions
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index dd2991a..6edab5d 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -7,10 +7,7 @@ import java.io.IOException;
import java.net.*;
import java.time.Duration;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.logging.Logger;
/**
@@ -20,6 +17,8 @@ import java.util.logging.Logger;
* Distributed Systems" in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, 1990.
*/
public class ReliableMulticast<T extends MessagePayload> {
+ private static final Duration terminationTimeout = Duration.ofSeconds(1);
+
private final SocketAddress group;
private final InetAddress laddr; // Local address.
private final Set<MessageID> acks; // Positively acknowledged messages.
@@ -30,6 +29,7 @@ public class ReliableMulticast<T extends MessagePayload> {
private final ConcurrentMulticastSocket outSock;
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
+ private final ExecutorService pool;
/**
* Join the specified multicast group.
@@ -54,20 +54,26 @@ public class ReliableMulticast<T extends MessagePayload> {
this.log = Logger.getLogger(this.getClass().getName());
- ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket(group.getPort());
- inSock.joinGroup(group.getAddress());
- (new Thread(new Receive<T>(inSock, acks, nacks, received, retransmissions, groupMembers, delivered))).start();
-
- (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
-
- (new Thread(new Prune<T>(received, groupMembers))).start();
+ this.pool = Executors.newCachedThreadPool();
+ pool.execute(new Receive<T>(group, acks, nacks, received, retransmissions, groupMembers, delivered));
+ pool.execute(new Retransmit<T>(retransmissions, outSock, group));
+ pool.execute(new Prune<T>(received, groupMembers));
+ pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock));
+ }
+ public void close() {
+ log.info("Shutting down...");
+ pool.shutdownNow();
try {
- (new Thread(new Heartbeat(group, laddr, acks, nacks, outSock))).start();
- } catch (IOException e) {
- log.severe("Failed to start heartbeat thread: " + e.getMessage());
- throw e;
+ if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ log.warning("Thread pool did not terminate after " + terminationTimeout);
+ } catch (InterruptedException e) {
+ log.warning("Interrupted while terminating thread pool: " + e.getMessage());
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
}
+ outSock.close();
+ log.info("Finished shutting down.");
}
/** Send a message to the group. */