diff options
Diffstat (limited to 'src/main/java/derms/net/runicast')
| -rw-r--r-- | src/main/java/derms/net/runicast/Ack.java | 12 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/Message.java | 33 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/ReceiveAcks.java | 59 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/ReliableUnicastReceiver.java | 4 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/ReliableUnicastSender.java | 82 | ||||
| -rw-r--r-- | src/main/java/derms/net/runicast/Retransmit.java | 57 |
6 files changed, 247 insertions, 0 deletions
diff --git a/src/main/java/derms/net/runicast/Ack.java b/src/main/java/derms/net/runicast/Ack.java new file mode 100644 index 0000000..df2211c --- /dev/null +++ b/src/main/java/derms/net/runicast/Ack.java @@ -0,0 +1,12 @@ +package derms.net.runicast; + +import java.io.Serializable; + +/** A message acknowledgement. */ +class Ack implements Serializable { + final long seq; // The sequence number of the acknowledged message. + + Ack(long seq) { + this.seq = seq; + } +} diff --git a/src/main/java/derms/net/runicast/Message.java b/src/main/java/derms/net/runicast/Message.java new file mode 100644 index 0000000..802cdb5 --- /dev/null +++ b/src/main/java/derms/net/runicast/Message.java @@ -0,0 +1,33 @@ +package derms.net.runicast; + +import derms.net.MessagePayload; + +import java.io.Serializable; + +class Message<T extends MessagePayload> implements Serializable { + final long seq; // Sequence number. + final T payload; + + Message(long seq, T payload) { + this.seq = seq; + this.payload = payload; + } + + @Override + public int hashCode() { return (int) seq * payload.hash(); } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + if (obj.getClass() != this.getClass()) + return false; + Message<?> other = (Message<?>) obj; + if (other.payload.getClass() != this.payload.getClass()) + return false; + return other.seq == this.seq && other.payload.equals(this.payload); + } + + @Override + public String toString() { return getClass().getSimpleName() + "{" + seq + ", " + payload + "}"; } +} diff --git a/src/main/java/derms/net/runicast/ReceiveAcks.java b/src/main/java/derms/net/runicast/ReceiveAcks.java new file mode 100644 index 0000000..0f585ff --- /dev/null +++ b/src/main/java/derms/net/runicast/ReceiveAcks.java @@ -0,0 +1,59 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.SocketTimeoutException; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** Receive acknowledgements. Remove messages from the sent queue once they are acknowledged. */ +class ReceiveAcks<T extends MessagePayload> implements Runnable { + private static final int bufSize = 8192; + + private final AtomicLong unacked; + private final Queue<Message<T>> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + + ReceiveAcks(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) { + this.unacked = unacked; + this.sent = sent; + this.sock = sock; + this.log = Logger.getLogger(getClass().getName()); + } + + @Override + public void run() { + DatagramPacket pkt = new DatagramPacket(new byte[bufSize], bufSize); + for (;;) { + try { + sock.receive(pkt); + Ack ack = Packet.decode(pkt, Ack.class); + recvAck(ack.seq); + } catch (SocketTimeoutException e) { + if (Thread.interrupted()) { + log.info("Interrupted."); + return; + } + } catch (IOException | ClassNotFoundException | ClassCastException e) { + log.warning(e.getMessage()); + } + } + } + + private void recvAck(long ack) { + unacked.updateAndGet((unacked) -> { + if (ack >= unacked) + return ack+1; + return unacked; + }); + + while (!sent.isEmpty() && sent.peek().seq <= ack) + sent.remove(); + } +} diff --git a/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java new file mode 100644 index 0000000..3d63a6f --- /dev/null +++ b/src/main/java/derms/net/runicast/ReliableUnicastReceiver.java @@ -0,0 +1,4 @@ +package derms.net.runicast; + +public class ReliableUnicastReceiver { +} diff --git a/src/main/java/derms/net/runicast/ReliableUnicastSender.java b/src/main/java/derms/net/runicast/ReliableUnicastSender.java new file mode 100644 index 0000000..2a0943a --- /dev/null +++ b/src/main/java/derms/net/runicast/ReliableUnicastSender.java @@ -0,0 +1,82 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +public class ReliableUnicastSender<T extends MessagePayload> { + private static final Duration soTimeout = Duration.ofMillis(500); // Socket timeout. + private static final Duration terminationTimeout = Duration.ofSeconds(1); + + private final AtomicLong next; // Next sequence number. + private final AtomicLong unacked; // Sequence number of first unacknowledged message. + private final Queue<Message<T>> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + private final ExecutorService pool; + + /** + * @param raddr Remote IP address to connect to. + */ + ReliableUnicastSender(InetSocketAddress raddr) throws IOException { + this.next = new AtomicLong(0); + this.unacked = new AtomicLong(0); + this.sent = new LinkedBlockingQueue<Message<T>>(); + this.sock = new ConcurrentDatagramSocket(); + this.sock.connect(raddr); + this.sock.setSoTimeout(soTimeout); + this.log = Logger.getLogger(getClass().getName()); + this.pool = Executors.newCachedThreadPool(); + pool.execute(new ReceiveAcks<T>(unacked, sent, sock)); + pool.execute(new Retransmit<T>(unacked, sent, sock)); + } + + public void send(T payload) throws IOException { + Message<T> msg = new Message<T>(next.get(), payload); + DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); + sock.send(pkt); + sent.add(msg); + next.incrementAndGet(); + } + + /** Wait for all messages to be acknowledged and close the connection. */ + public void close() throws InterruptedException { + // Wait for receiver to acknowledge all sent messages. + while (unacked.get() < next.get()) { + Thread.yield(); + if (Thread.interrupted()) + throw new InterruptedException(); + } + + closeNow(); + } + + /** Close the connection immediately, without waiting for acknowledgements. */ + public void closeNow() { + log.info("Shutting down."); + pool.shutdownNow(); + try { + if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS)) + log.warning("Thread pool did not terminate after " + terminationTimeout); + } catch (InterruptedException e) { + log.warning("Interrupted while terminating thread pool: " + e.getMessage()); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + + sock.close(); + } +} diff --git a/src/main/java/derms/net/runicast/Retransmit.java b/src/main/java/derms/net/runicast/Retransmit.java new file mode 100644 index 0000000..affd00c --- /dev/null +++ b/src/main/java/derms/net/runicast/Retransmit.java @@ -0,0 +1,57 @@ +package derms.net.runicast; + +import derms.net.ConcurrentDatagramSocket; +import derms.net.MessagePayload; +import derms.net.Packet; +import derms.util.Wait; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** Retransmit unacknowledged messages. */ +class Retransmit<T extends MessagePayload> implements Runnable { + private static final Duration timeout = Duration.ofMillis(500); + + private final AtomicLong unacked; + private final Queue<Message<T>> sent; + private final ConcurrentDatagramSocket sock; + private final Logger log; + + Retransmit(AtomicLong unacked, Queue<Message<T>> sent, ConcurrentDatagramSocket sock) { + this.unacked = unacked; + this.sent = sent; + this.sock = sock; + this.log = Logger.getLogger(getClass().getName()); + } + + @Override + public void run() { + try { + for (;;) { + Wait.forDuration(timeout); + + for (Message<T> msg : sent) { + if (msg.seq >= unacked.get()) { + retransmit(msg); + } + } + } + } catch (InterruptedException e) { + log.info("Interrupted."); + } + } + + private void retransmit(Message<T> msg) { + try { + DatagramPacket pkt = Packet.encode(msg, sock.getRemoteSocketAddress()); + sock.send(pkt); + log.info("Retransmitted " + msg); + } catch (IOException e) { + log.warning("Failed to retransmit " + msg + ": " + e.getMessage()); + } + } +} |