summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/net/runicast/ReliableUnicastReceiver.java')
-rw-r--r--src/main/java/derms/net/runicast/ReliableUnicastReceiver.java14
1 files changed, 6 insertions, 8 deletions
diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
index 81e3502..ff0b72a 100644
--- a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
+++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java
@@ -1,20 +1,18 @@
package derms.net.runicast;
-import derms.net.ConcurrentDatagramSocket;
import derms.net.MessagePayload;
import derms.util.ThreadPool;
import java.io.IOException;
import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.logging.Logger;
/** The receiving end of a reliable unicast connection. */
public class ReliableUnicastReceiver<T extends MessagePayload> {
- private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout.
-
- private final ConcurrentDatagramSocket sock;
+ private final DatagramChannel sock;
private final BlockingQueue<T> delivered;
private final Logger log;
private final ExecutorService pool;
@@ -25,18 +23,18 @@ public class ReliableUnicastReceiver<T extends MessagePayload> {
* @param laddr The local IP address and port to listen on.
*/
public ReliableUnicastReceiver(SocketAddress laddr) throws IOException {
- this.sock = new ConcurrentDatagramSocket(laddr);
- this.sock.setSoTimeout(soTimeout);
+ this.sock = DatagramChannel.open();
+ sock.bind(laddr);
this.delivered = new LinkedBlockingQueue<T>();
this.log = Logger.getLogger(getClass().getName());
this.pool = Executors.newCachedThreadPool();
pool.execute(new Receive<T>(sock, delivered));
}
- public void close() {
+ public void close() throws IOException {
log.info("Shutting down");
- ThreadPool.shutDown(pool, log);
sock.close();
+ ThreadPool.shutdown(pool, log);
}
/** Receive a message, blocking if necessary until one arrives. */