summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/derms/net/ConcurrentMulticastSocket.java10
-rw-r--r--src/main/java/derms/net/rmulticast/Heartbeat.java2
-rw-r--r--src/main/java/derms/net/rmulticast/Prune.java2
-rw-r--r--src/main/java/derms/net/rmulticast/Receive.java17
-rw-r--r--src/main/java/derms/net/rmulticast/ReliableMulticast.java36
-rw-r--r--src/main/java/derms/net/rmulticast/Retransmit.java2
-rw-r--r--src/main/java/derms/net/rmulticast/Timeout.java2
-rw-r--r--src/main/java/derms/util/Wait.java2
8 files changed, 52 insertions, 21 deletions
diff --git a/src/main/java/derms/net/ConcurrentMulticastSocket.java b/src/main/java/derms/net/ConcurrentMulticastSocket.java
index 24a040c..6aeb5a8 100644
--- a/src/main/java/derms/net/ConcurrentMulticastSocket.java
+++ b/src/main/java/derms/net/ConcurrentMulticastSocket.java
@@ -4,6 +4,8 @@ import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
+import java.net.SocketException;
+import java.time.Duration;
public class ConcurrentMulticastSocket {
private final MulticastSocket sock;
@@ -31,6 +33,14 @@ public class ConcurrentMulticastSocket {
sock.receive(p);
}
+ public synchronized void setSoTimeout(Duration timeout) throws SocketException {
+ sock.setSoTimeout((int) timeout.toMillis());
+ }
+
+ public synchronized void close() {
+ sock.close();
+ }
+
@Override
public String toString() {
return sock.getLocalSocketAddress().toString();
diff --git a/src/main/java/derms/net/rmulticast/Heartbeat.java b/src/main/java/derms/net/rmulticast/Heartbeat.java
index 43eaaf5..5dc4be2 100644
--- a/src/main/java/derms/net/rmulticast/Heartbeat.java
+++ b/src/main/java/derms/net/rmulticast/Heartbeat.java
@@ -47,7 +47,7 @@ class Heartbeat implements Runnable {
}
}
} catch (InterruptedException e) {
- log.info("Heartbeat thread interrupted: " + e.getMessage());
+ log.info("Interrupted. Shutting down.");
}
}
diff --git a/src/main/java/derms/net/rmulticast/Prune.java b/src/main/java/derms/net/rmulticast/Prune.java
index 8dfadf6..04f8423 100644
--- a/src/main/java/derms/net/rmulticast/Prune.java
+++ b/src/main/java/derms/net/rmulticast/Prune.java
@@ -32,7 +32,7 @@ class Prune<T extends MessagePayload> implements Runnable {
prune();
}
} catch (InterruptedException e) {
- log.info("Prune thread interrupted: " + e.getMessage());
+ log.info("Interrupted. Shutting down.");
}
}
diff --git a/src/main/java/derms/net/rmulticast/Receive.java b/src/main/java/derms/net/rmulticast/Receive.java
index 922f357..bf583dd 100644
--- a/src/main/java/derms/net/rmulticast/Receive.java
+++ b/src/main/java/derms/net/rmulticast/Receive.java
@@ -6,6 +6,9 @@ import derms.net.Packet;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -13,6 +16,7 @@ import java.util.logging.Logger;
class Receive<T extends MessagePayload> implements Runnable {
private static final int bufSize = 8192;
+ private static final Duration sockTimeout = Duration.ofMillis(500);
private final ConcurrentMulticastSocket inSock;
private final Set<MessageID> acks;
@@ -23,8 +27,11 @@ class Receive<T extends MessagePayload> implements Runnable {
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
- Receive(ConcurrentMulticastSocket inSock, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) {
- this.inSock = inSock;
+ Receive(InetSocketAddress group, Set<MessageID> acks, Set<MessageID> nacks, ReceivedSet<T> received, BlockingQueue<Message<T>> retransmissions, Set<InetAddress> groupMembers, BlockingQueue<Message<T>> delivered) throws IOException {
+ this.inSock = new ConcurrentMulticastSocket(group.getPort());
+ this.inSock.joinGroup(group.getAddress());
+ this.inSock.setSoTimeout(sockTimeout);
+
this.acks = acks;
this.nacks = nacks;
this.received = received;
@@ -44,6 +51,12 @@ class Receive<T extends MessagePayload> implements Runnable {
Message<?> msg = Packet.decode(pkt, Message.class);
receive(msg);
log.info("Received " + msg);
+ } catch (SocketTimeoutException e) {
+ if (Thread.interrupted()) {
+ log.info("Interrupted. Shutting down.");
+ inSock.close();
+ return;
+ }
} catch (IOException | ClassNotFoundException | ClassCastException e) {
log.warning(e.getMessage());
}
diff --git a/src/main/java/derms/net/rmulticast/ReliableMulticast.java b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
index dd2991a..6edab5d 100644
--- a/src/main/java/derms/net/rmulticast/ReliableMulticast.java
+++ b/src/main/java/derms/net/rmulticast/ReliableMulticast.java
@@ -7,10 +7,7 @@ import java.io.IOException;
import java.net.*;
import java.time.Duration;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.logging.Logger;
/**
@@ -20,6 +17,8 @@ import java.util.logging.Logger;
* Distributed Systems" in IEEE Transactions on Parallel and Distributed Systems, vol. 1, no. 1, 1990.
*/
public class ReliableMulticast<T extends MessagePayload> {
+ private static final Duration terminationTimeout = Duration.ofSeconds(1);
+
private final SocketAddress group;
private final InetAddress laddr; // Local address.
private final Set<MessageID> acks; // Positively acknowledged messages.
@@ -30,6 +29,7 @@ public class ReliableMulticast<T extends MessagePayload> {
private final ConcurrentMulticastSocket outSock;
private final BlockingQueue<Message<T>> delivered;
private final Logger log;
+ private final ExecutorService pool;
/**
* Join the specified multicast group.
@@ -54,20 +54,26 @@ public class ReliableMulticast<T extends MessagePayload> {
this.log = Logger.getLogger(this.getClass().getName());
- ConcurrentMulticastSocket inSock = new ConcurrentMulticastSocket(group.getPort());
- inSock.joinGroup(group.getAddress());
- (new Thread(new Receive<T>(inSock, acks, nacks, received, retransmissions, groupMembers, delivered))).start();
-
- (new Thread(new Retransmit<T>(retransmissions, outSock, group))).start();
-
- (new Thread(new Prune<T>(received, groupMembers))).start();
+ this.pool = Executors.newCachedThreadPool();
+ pool.execute(new Receive<T>(group, acks, nacks, received, retransmissions, groupMembers, delivered));
+ pool.execute(new Retransmit<T>(retransmissions, outSock, group));
+ pool.execute(new Prune<T>(received, groupMembers));
+ pool.execute(new Heartbeat(group, laddr, acks, nacks, outSock));
+ }
+ public void close() {
+ log.info("Shutting down...");
+ pool.shutdownNow();
try {
- (new Thread(new Heartbeat(group, laddr, acks, nacks, outSock))).start();
- } catch (IOException e) {
- log.severe("Failed to start heartbeat thread: " + e.getMessage());
- throw e;
+ if (!pool.awaitTermination(terminationTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ log.warning("Thread pool did not terminate after " + terminationTimeout);
+ } catch (InterruptedException e) {
+ log.warning("Interrupted while terminating thread pool: " + e.getMessage());
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
}
+ outSock.close();
+ log.info("Finished shutting down.");
}
/** Send a message to the group. */
diff --git a/src/main/java/derms/net/rmulticast/Retransmit.java b/src/main/java/derms/net/rmulticast/Retransmit.java
index 4f7754d..6205697 100644
--- a/src/main/java/derms/net/rmulticast/Retransmit.java
+++ b/src/main/java/derms/net/rmulticast/Retransmit.java
@@ -36,7 +36,7 @@ class Retransmit<T extends MessagePayload> implements Runnable {
}
}
} catch (InterruptedException e) {
- log.info("Retransmit thread interrupted: " + e.getMessage());
+ log.info("Interrupted. Shutting down.");
}
}
}
diff --git a/src/main/java/derms/net/rmulticast/Timeout.java b/src/main/java/derms/net/rmulticast/Timeout.java
index d55abe0..bfae630 100644
--- a/src/main/java/derms/net/rmulticast/Timeout.java
+++ b/src/main/java/derms/net/rmulticast/Timeout.java
@@ -40,7 +40,7 @@ class Timeout<T extends MessagePayload> implements Runnable {
}
}
} catch (InterruptedException e) {
- log.info("Timeout thread interrupted: " + e.getMessage());
+ log.info("Interrupted. Shutting down.");
}
}
}
diff --git a/src/main/java/derms/util/Wait.java b/src/main/java/derms/util/Wait.java
index 1e77020..f726835 100644
--- a/src/main/java/derms/util/Wait.java
+++ b/src/main/java/derms/util/Wait.java
@@ -10,6 +10,8 @@ public class Wait {
Duration elapsed;
do {
Thread.yield();
+ if (Thread.interrupted())
+ throw new InterruptedException();
elapsed = Duration.between(start, Instant.now());
} while (elapsed.compareTo(dur) < 0);
}