diff options
Diffstat (limited to 'src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java')
| -rw-r--r-- | src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java | 18 |
1 files changed, 7 insertions, 11 deletions
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java index 83f2da5..549af16 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -1,6 +1,7 @@ package derms.net.tomulticast; import derms.net.MessagePayload; +import derms.util.ThreadPool; import java.io.IOException; import java.net.InetAddress; @@ -16,6 +17,7 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> { private final BlockingQueue<Message<T>> deliver; private final Logger log; private final ExecutorService pool; + private final Receive<T> receiver; /** * Join the specified totally-ordered multicast group as a receiver. @@ -28,20 +30,14 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> { this.log = Logger.getLogger(getClass().getName()); this.pool = Executors.newSingleThreadExecutor(); - pool.execute(new Receive<T>(group, laddr, deliver)); + this.receiver = new Receive<T>(group, laddr, deliver); + pool.execute(receiver); } /** Close the underlying socket. */ - public void close() { - pool.shutdownNow(); - try { - if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) - log.warning("Thread pool did not terminate after " + terminationTimeout); - } catch (InterruptedException e) { - log.warning("Interrupted while terminating thread pool: " + e.getMessage()); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } + public void close() throws IOException { + receiver.close(); + ThreadPool.shutdown(pool, log); } /** Receive a message from the group, blocking if necessary until one arrives. */ |