summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/frontend/FE.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/frontend/FE.java')
-rw-r--r--src/main/java/derms/frontend/FE.java85
1 files changed, 50 insertions, 35 deletions
diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java
index 8d6fc59..f29459c 100644
--- a/src/main/java/derms/frontend/FE.java
+++ b/src/main/java/derms/frontend/FE.java
@@ -6,13 +6,16 @@ import java.net.*;
import javax.xml.ws.Endpoint;
import derms.Config;
-import derms.Request;
-import derms.Response;
+import derms.Request;
+import derms.Response;
import derms.net.runicast.ReliableUnicastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
//import constants.Constants;
@@ -25,6 +28,7 @@ public class FE {
private static final String RM_Multicast_group_address = Config.group.toString();
private static final int RM_Multicast_Port = 1234;
private static AtomicInteger sequenceIDGenerator = new AtomicInteger(0);
+ private static final ExecutorService executor = Executors.newFixedThreadPool(4);
public static void main(String[] args) {
try {
@@ -73,9 +77,10 @@ public class FE {
};
DERMSServerImpl servant = new DERMSServerImpl(inter);
Endpoint endpoint = Endpoint.publish(endpointURL(frontendIP), servant);
+
Runnable task = () -> {
- listenForUDPResponses(servant);
try {
+ listenForUDPResponses(servant);
sequencerSock.close();
} catch (Exception e) {
e.printStackTrace();
@@ -83,6 +88,7 @@ public class FE {
};
Thread thread = new Thread(task);
thread.start();
+
} catch (Exception e) {
// System.err.println("Exception: " + e);
e.printStackTrace(System.out);
@@ -167,47 +173,56 @@ public class FE {
// } catch (IOException e) {
// System.out.println("IO: " + e.getMessage());
// } finally {
-//// if (aSocket != null)
-//// aSocket.close();
+
+ /// / if (aSocket != null)
+ /// / aSocket.close();
// }
// }
-
private static void listenForUDPResponses(DERMSServerImpl servant) {
List<ReliableUnicastReceiver<Response>> receivers = new ArrayList<ReliableUnicastReceiver<Response>>();
- try {
- // Initialize a receiver for each RM.
- for (int port : Config.frontendResponsePorts) {
+ // Initialize a receiver for each RM.
+ for (int port : Config.frontendResponsePorts) {
+ try {
receivers.add(new ReliableUnicastReceiver<Response>(
new InetSocketAddress(frontendIP, port)));
- System.out.println("FE listening for responses on " + frontendIP + ":" + port + "...");
- }
-
- while (true) {
- // Receive a response from each RM.
- for (ReliableUnicastReceiver<Response> receiver : receivers) {
- // Blocking call to receive a message from RM
- Response response = receiver.receive();
- System.out.println("FE: Response received from RM >>> " + response);
-
- // Process the received response and add it to the servant
- System.out.println("Adding response to FrontEndImplementation:");
- servant.addReceivedResponse(response);
- }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- } catch (IOException e) {
- System.out.println("IO: " + e.getMessage());
- } catch (InterruptedException e) {
- System.out.println("Listener interrupted: " + e.getMessage());
- Thread.currentThread().interrupt();
- } finally {
- for (ReliableUnicastReceiver<Response> receiver : receivers) {
+ System.out.println("FE listening for responses on " + frontendIP + ":" + port + "...");
+ }
+ for (ReliableUnicastReceiver<Response> receiver : receivers) {
+ executor.execute(() -> {
try {
- receiver.close();
- System.out.println("ReliableUnicastReceiver closed.");
- } catch (IOException e) {
- System.out.println("Error closing ReliableUnicastReceiver: " + e.getMessage());
+ while (true) {
+ // Receive a response from each RM.
+
+ // Blocking call to receive a message from RM
+ Response response;
+
+ response = receiver.receive();
+ System.out.println("FE: Response received from RM >>> " + response);
+
+ // Process the received response and add it to the servant
+ System.out.println("Adding response to FrontEndImplementation:");
+ servant.addReceivedResponse(response);
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Listener interrupted: " + e.getMessage());
+ Thread.currentThread().interrupt();
+ } finally {
+ try {
+ receiver.close();
+ System.out.println("ReliableUnicastReceiver closed.");
+ } catch (IOException e) {
+ System.out.println("Error closing ReliableUnicastReceiver: " + e.getMessage());
+ }
}
- }
+ });
+ }
+ try {
+ executor.awaitTermination(100, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}