summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/tomulticast
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/tomulticast')
-rw-r--r--src/main/java/derms/net/tomulticast/Receive.java3
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticast.java2
-rw-r--r--src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java18
3 files changed, 10 insertions, 13 deletions
diff --git a/src/main/java/derms/net/tomulticast/Receive.java b/src/main/java/derms/net/tomulticast/Receive.java
index aad0a0b..34c29bc 100644
--- a/src/main/java/derms/net/tomulticast/Receive.java
+++ b/src/main/java/derms/net/tomulticast/Receive.java
@@ -5,6 +5,7 @@ import derms.net.MessagePayload;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -32,7 +33,7 @@ class Receive<T extends MessagePayload> extends TotalOrderMulticast<T> implement
tryDeliver();
}
} catch (InterruptedException e) {
- close();
+ log.info("Shutting down.");
}
}
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
index d2f7e61..0d8b690 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticast.java
@@ -37,7 +37,7 @@ public abstract class TotalOrderMulticast<T extends MessagePayload> {
}
/** Close the underlying socket. */
- public void close() {
+ public void close() throws IOException {
sock.close();
}
diff --git a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
index 83f2da5..549af16 100644
--- a/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
+++ b/src/main/java/derms/net/tomulticast/TotalOrderMulticastReceiver.java
@@ -1,6 +1,7 @@
package derms.net.tomulticast;
import derms.net.MessagePayload;
+import derms.util.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
@@ -16,6 +17,7 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> {
private final BlockingQueue<Message<T>> deliver;
private final Logger log;
private final ExecutorService pool;
+ private final Receive<T> receiver;
/**
* Join the specified totally-ordered multicast group as a receiver.
@@ -28,20 +30,14 @@ public class TotalOrderMulticastReceiver<T extends MessagePayload> {
this.log = Logger.getLogger(getClass().getName());
this.pool = Executors.newSingleThreadExecutor();
- pool.execute(new Receive<T>(group, laddr, deliver));
+ this.receiver = new Receive<T>(group, laddr, deliver);
+ pool.execute(receiver);
}
/** Close the underlying socket. */
- public void close() {
- pool.shutdownNow();
- try {
- if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
- log.warning("Thread pool did not terminate after " + terminationTimeout);
- } catch (InterruptedException e) {
- log.warning("Interrupted while terminating thread pool: " + e.getMessage());
- // Preserve interrupt status.
- Thread.currentThread().interrupt();
- }
+ public void close() throws IOException {
+ receiver.close();
+ ThreadPool.shutdown(pool, log);
}
/** Receive a message from the group, blocking if necessary until one arrives. */