summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms')
-rw-r--r--src/main/java/derms/net/ConcurrentMulticastSocket.java33
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java13
-rw-r--r--src/main/java/derms/net/rmulticast/Retransmit.java10
3 files changed, 43 insertions, 13 deletions
diff --git a/src/main/java/derms/net/ConcurrentMulticastSocket.java b/src/main/java/derms/net/ConcurrentMulticastSocket.java
new file mode 100644
index 0000000..dba718b
--- /dev/null
+++ b/src/main/java/derms/net/ConcurrentMulticastSocket.java
@@ -0,0 +1,33 @@
+package derms.net;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+
+public class ConcurrentMulticastSocket {
+ private final MulticastSocket sock;
+
+ /** Create a socket for sending. */
+ public ConcurrentMulticastSocket() throws IOException {
+ this.sock = new MulticastSocket();
+ }
+
+ /** Create a socket bound to the specified port for receiving. */
+ public ConcurrentMulticastSocket(int port) throws IOException {
+ this.sock = new MulticastSocket(port);
+ }
+
+ /** Join a multicast group. */
+ public synchronized void joinGroup(InetAddress mcastaddr) throws IOException {
+ sock.joinGroup(mcastaddr);
+ }
+
+ public synchronized void send(DatagramPacket p) throws IOException {
+ sock.send(p);
+ }
+
+ public synchronized void receive(DatagramPacket p) throws IOException {
+ sock.receive(p);
+ }
+}
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index f63433f..22e16a2 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -1,5 +1,6 @@
package derms.net.rmulticast;
+import derms.net.ConcurrentMulticastSocket;
import derms.net.Packet;
import java.io.IOException;
@@ -21,7 +22,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission.
private final AtomicReference<Instant> lastSend;
private final SocketAddress group;
- private final MulticastSocket inSock, outSock;
+ private final ConcurrentMulticastSocket inSock, outSock;
private final InetAddress laddr; // Local address.
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
@@ -35,10 +36,10 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.group = group;
- this.inSock = new MulticastSocket();
+ this.inSock = new ConcurrentMulticastSocket();
this.inSock.joinGroup(group.getAddress());
- this.outSock = new MulticastSocket(group.getPort());
+ this.outSock = new ConcurrentMulticastSocket(group.getPort());
this.outSock.joinGroup(group.getAddress());
this.laddr = laddr;
@@ -48,7 +49,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
this.log = Logger.getLogger(this.getClass().getName());
(new Thread(new Receive())).start();
- (new Thread(new Retransmit(retransmissions, outSock, group))).start();
+ (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
}
public void send(T payload) throws IOException {
@@ -58,9 +59,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> {
positiveAcks.toArray(new MessageID[0]),
negativeAcks.toArray(new MessageID[0]));
DatagramPacket pkt = Packet.encode(msg, group);
- synchronized (outSock) {
- outSock.send(pkt);
- }
+ outSock.send(pkt);
positiveAcks.clear();
(new Thread(new Timeout(msg.id()))).start();
lastSend.set(Instant.now());
diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java
index ad44eb1..3f2fdbb 100644
--- a/src/main/java/derms/net/rmulticast/Retransmit.java
+++ b/src/main/java/derms/net/rmulticast/Retransmit.java
@@ -1,10 +1,10 @@
package derms.net.rmulticast;
+import derms.net.ConcurrentMulticastSocket;
import derms.net.Packet;
import java.io.Serializable;
import java.net.DatagramPacket;
-import java.net.MulticastSocket;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
@@ -12,11 +12,11 @@ import java.util.logging.Logger;
/** Retransmit dropped messages. */
class Retransmit<T extends Serializable & Hashable> implements Runnable {
private final BlockingQueue<Message<T>> retransmissions;
- private final MulticastSocket outSock;
+ private final ConcurrentMulticastSocket outSock;
private final SocketAddress group;
private final Logger log;
- Retransmit(BlockingQueue<Message<T>> retransmissions, MulticastSocket outSock, SocketAddress group) {
+ Retransmit(BlockingQueue<Message<T>> retransmissions, ConcurrentMulticastSocket outSock, SocketAddress group) {
this.retransmissions = retransmissions;
this.outSock = outSock;
this.group = group;
@@ -30,9 +30,7 @@ class Retransmit<T extends Serializable & Hashable> implements Runnable {
Message<T> msg = retransmissions.take();
try {
DatagramPacket pkt = Packet.encode(msg, group);
- synchronized (outSock) {
- outSock.send(pkt);
- }
+ outSock.send(pkt);
} catch (Exception e) {
log.warning(e.getMessage());
}