diff options
Diffstat (limited to 'src/main/java/derms')
| -rw-r--r-- | src/main/java/derms/net/ConcurrentMulticastSocket.java | 33 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/ReliableMulticast.java | 13 | ||||
| -rw-r--r-- | src/main/java/derms/net/rmulticast/Retransmit.java | 10 |
3 files changed, 43 insertions, 13 deletions
diff --git a/src/main/java/derms/net/ConcurrentMulticastSocket.java b/src/main/java/derms/net/ConcurrentMulticastSocket.java new file mode 100644 index 0000000..dba718b --- /dev/null +++ b/src/main/java/derms/net/ConcurrentMulticastSocket.java @@ -0,0 +1,33 @@ +package derms.net; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; + +public class ConcurrentMulticastSocket { + private final MulticastSocket sock; + + /** Create a socket for sending. */ + public ConcurrentMulticastSocket() throws IOException { + this.sock = new MulticastSocket(); + } + + /** Create a socket bound to the specified port for receiving. */ + public ConcurrentMulticastSocket(int port) throws IOException { + this.sock = new MulticastSocket(port); + } + + /** Join a multicast group. */ + public synchronized void joinGroup(InetAddress mcastaddr) throws IOException { + sock.joinGroup(mcastaddr); + } + + public synchronized void send(DatagramPacket p) throws IOException { + sock.send(p); + } + + public synchronized void receive(DatagramPacket p) throws IOException { + sock.receive(p); + } +} diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java index f63433f..22e16a2 100644 --- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java +++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java @@ -1,5 +1,6 @@ package derms.net.rmulticast; +import derms.net.ConcurrentMulticastSocket; import derms.net.Packet; import java.io.IOException; @@ -21,7 +22,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { private final BlockingQueue<Message<T>> retransmissions; // Messages pending retransmission. private final AtomicReference<Instant> lastSend; private final SocketAddress group; - private final MulticastSocket inSock, outSock; + private final ConcurrentMulticastSocket inSock, outSock; private final InetAddress laddr; // Local address. private final BlockingQueue<Message<T>> delivered; private final Logger log; @@ -35,10 +36,10 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.group = group; - this.inSock = new MulticastSocket(); + this.inSock = new ConcurrentMulticastSocket(); this.inSock.joinGroup(group.getAddress()); - this.outSock = new MulticastSocket(group.getPort()); + this.outSock = new ConcurrentMulticastSocket(group.getPort()); this.outSock.joinGroup(group.getAddress()); this.laddr = laddr; @@ -48,7 +49,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { this.log = Logger.getLogger(this.getClass().getName()); (new Thread(new Receive())).start(); - (new Thread(new Retransmit(retransmissions, outSock, group))).start(); + (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start(); } public void send(T payload) throws IOException { @@ -58,9 +59,7 @@ public class ReliableMulticast<T extends Serializable & Hashable> { positiveAcks.toArray(new MessageID[0]), negativeAcks.toArray(new MessageID[0])); DatagramPacket pkt = Packet.encode(msg, group); - synchronized (outSock) { - outSock.send(pkt); - } + outSock.send(pkt); positiveAcks.clear(); (new Thread(new Timeout(msg.id()))).start(); lastSend.set(Instant.now()); diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java index ad44eb1..3f2fdbb 100644 --- a/src/main/java/derms/net/rmulticast/Retransmit.java +++ b/src/main/java/derms/net/rmulticast/Retransmit.java @@ -1,10 +1,10 @@ package derms.net.rmulticast; +import derms.net.ConcurrentMulticastSocket; import derms.net.Packet; import java.io.Serializable; import java.net.DatagramPacket; -import java.net.MulticastSocket; import java.net.SocketAddress; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; @@ -12,11 +12,11 @@ import java.util.logging.Logger; /** Retransmit dropped messages. */ class Retransmit<T extends Serializable & Hashable> implements Runnable { private final BlockingQueue<Message<T>> retransmissions; - private final MulticastSocket outSock; + private final ConcurrentMulticastSocket outSock; private final SocketAddress group; private final Logger log; - Retransmit(BlockingQueue<Message<T>> retransmissions, MulticastSocket outSock, SocketAddress group) { + Retransmit(BlockingQueue<Message<T>> retransmissions, ConcurrentMulticastSocket outSock, SocketAddress group) { this.retransmissions = retransmissions; this.outSock = outSock; this.group = group; @@ -30,9 +30,7 @@ class Retransmit<T extends Serializable & Hashable> implements Runnable { Message<T> msg = retransmissions.take(); try { DatagramPacket pkt = Packet.encode(msg, group); - synchronized (outSock) { - outSock.send(pkt); - } + outSock.send(pkt); } catch (Exception e) { log.warning(e.getMessage()); } |