diff options
Diffstat (limited to 'src/main/java/derms/net/runicast')
4 files changed, 118 insertions, 14 deletions
diff --git a/src/main/java/derms/net/runicast/Message.java b/src/main/java/derms/net/runicast/Message.java index 802cdb5..7d142da 100644 --- a/src/main/java/derms/net/runicast/Message.java +++ b/src/main/java/derms/net/runicast/Message.java @@ -3,6 +3,7 @@ package derms.net.runicast; import derms.net.MessagePayload; import java.io.Serializable; +import java.net.InetAddress; class Message<T extends MessagePayload> implements Serializable { final long seq; // Sequence number. diff --git a/src/main/java/derms/net/runicast/Receive.java b/src/main/java/derms/net/runicast/Receive.java new file mode 100644 index 0000000..584861b --- /dev/null +++ b/src/main/java/derms/net/runicast/Receive.java @@ -0,0 +1,60 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; + +import java.io.IOException; +import java.net.*; +import java.util.Queue; +import java.util.logging.Logger; + +class Receive<T extends MessagePayload> implements Runnable { + private static final int bufSize = 8192; + + private long seq; // Sequence number. + private final ConcurrentDatagramSocket sock; + private final Queue<T> delivered; + private final Logger log; + + Receive(ConcurrentDatagramSocket sock, Queue<T> delivered) { + this.seq = 0; + this.sock = sock; + this.delivered = delivered; + this.log = Logger.getLogger(getClass().getName()); + } + + @Override + public void run() { + DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + for (;;) { + try { + sock.receive(pkt); + Message<T> msg = (Message<T>) Packet.decode(pkt, Message.class); + SocketAddress sender = pkt.getSocketAddress(); + recv(msg, sender); + } catch (SocketTimeoutException e) { + if (Thread.interrupted()) { + log.info("Interrupted"); + return; + } + } catch (IOException | ClassNotFoundException | ClassCastException e) { + log.warning(e.getMessage()); + } + } + } + + private void recv(Message<T> msg, SocketAddress sender) throws IOException { + if (msg.seq == seq) { + delivered.add(msg.payload); + ack(msg, sender); + seq++; + } + } + + private void ack(Message<T> msg, SocketAddress sender) throws IOException { + Ack ack = new Ack(msg.seq); + DatagramPacket pkt = Packet.encode(ack, sender); + sock.send(pkt); + } +} diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java index 3d63a6f..dc69834 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java @@ -1,4 +1,58 @@ package derms.net.runicast; -public class ReliableUnicastReceiver { +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.util.ThreadPool; + +import java.io.IOException; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.concurrent.*; +import java.util.logging.Logger; + +public class ReliableUnicastReceiver<T extends MessagePayload> { + private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. + + private final ConcurrentDatagramSocket sock; + private final BlockingQueue<T> delivered; + private final Logger log; + private final ExecutorService pool; + + /** + * @param laddr The local IP address and port to listen on. + */ + ReliableUnicastReceiver(SocketAddress laddr) throws IOException { + this.sock = new ConcurrentDatagramSocket(laddr); + this.sock.setSoTimeout(soTimeout); + this.delivered = new LinkedBlockingQueue<T>(); + this.log = Logger.getLogger(getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new Receive<T>(sock, delivered)); + } + + public void close() { + log.info("Shutting down"); + ThreadPool.shutDown(pool, log); + sock.close(); + } + + /** Receive a message, blocking if necessary until one arrives. */ + public T receive() throws InterruptedException { + return delivered.take(); + } + + /** Receive a message, or return null if none are available. */ + public T tryReceive() { + return delivered.poll(); + } + + /** + * Receive a message, waiting up to the specified wait time if necessary. + * + * @return A message, or null if the specified waiting time elapses before + * a message arrives. + */ + public T tryReceive(Duration waitTime) throws InterruptedException { + return delivered.poll(waitTime.toMillis(), TimeUnit.MILLISECONDS); + } } diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java index 2a0943a..ecfece4 100644 --- a/src/main/java/derms/net/runicast/ReliableUnicastSender.java +++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java @@ -3,23 +3,21 @@ package derms.net.runicast; import derms.net.ConcurrentDatagramSocket; import derms.net.MessagePayload; import derms.net.Packet; +import derms.util.ThreadPool; import java.io.IOException; import java.net.DatagramPacket; import java.net.InetSocketAddress; -import java.net.SocketException; import java.time.Duration; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; public class ReliableUnicastSender<T extends MessagePayload> { private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. - private static final Duration terminationTimeout = Duration.ofSeconds(1); private final AtomicLong next; // Next sequence number. private final AtomicLong unacked; // Sequence number of first unacknowledged message. @@ -67,16 +65,7 @@ public class ReliableUnicastSender<T extends MessagePayload> { /** Close the connection immediately, without waiting for acknowledgements. */ public void closeNow() { log.info("Shutting down."); - pool.shutdownNow(); - try { - if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) - log.warning("Thread pool did not terminate after " + terminationTimeout); - } catch (InterruptedException e) { - log.warning("Interrupted while terminating thread pool: " + e.getMessage()); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } - + ThreadPool.shutDown(pool, log); sock.close(); } } |