diff options
Diffstat (limited to 'src/main/java/derms/net/tomulticast')
3 files changed, 10 insertions, 13 deletions
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java index aad0a0b..34c29bc 100644 --- a/src/main/java/derms/net/tomulticast/Receive.java +++ b/src/main/java/derms/net/tomulticast/Receive.java @@ -5,6 +5,7 @@ import derms.net.MessagePayload; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -32,7 +33,7 @@ class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implement tryDeliver(); } } catch (InterruptedException e) { - close(); + log.info("Shutting down."); } } diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java index d2f7e61..0d8b690 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java @@ -37,7 +37,7 @@ public abstract class TotalOrderMulticast<T extends MessagePayload> { } /** Close the underlying socket. */ - public void close() { + public void close() throws IOException { sock.close(); } diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java index 83f2da5..549af16 100644 --- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java +++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java @@ -1,6 +1,7 @@ package derms.net.tomulticast; import derms.net.MessagePayload; +import derms.util.ThreadPool; import java.io.IOException; import java.net.InetAddress; @@ -16,6 +17,7 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> { private final BlockingQueue<Message<T>> deliver; private final Logger log; private final ExecutorService pool; + private final Receive<T> receiver; /** * Join the specified totally-ordered multicast group as a receiver. @@ -28,20 +30,14 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> { this.log = Logger.getLogger(getClass().getName()); this.pool = Executors.newSingleThreadExecutor(); - pool.execute(new Receive<T>(group, laddr, deliver)); + this.receiver = new Receive<T>(group, laddr, deliver); + pool.execute(receiver); } /** Close the underlying socket. */ - public void close() { - pool.shutdownNow(); - try { - if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) - log.warning("Thread pool did not terminate after " + terminationTimeout); - } catch (InterruptedException e) { - log.warning("Interrupted while terminating thread pool: " + e.getMessage()); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } + public void close() throws IOException { + receiver.close(); + ThreadPool.shutdown(pool, log); } /** Receive a message from the group, blocking if necessary until one arrives. */ |