diff options
Diffstat (limited to 'src/main/java/derms/frontend/FE.java')
| -rw-r--r-- | src/main/java/derms/frontend/FE.java | 85 |
1 files changed, 50 insertions, 35 deletions
diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java index 8d6fc59..f29459c 100644 --- a/src/main/java/derms/frontend/FE.java +++ b/src/main/java/derms/frontend/FE.java @@ -6,13 +6,16 @@ import java.net.*; import javax.xml.ws.Endpoint; import derms.Config; -import derms.Request; -import derms.Response; +import derms.Request; +import derms.Response; import derms.net.runicast.ReliableUnicastReceiver; import derms.net.runicast.ReliableUnicastSender; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; //import constants.Constants; @@ -25,6 +28,7 @@ public class FE { private static final String RM_Multicast_group_address = Config.group.toString(); private static final int RM_Multicast_Port = 1234; private static AtomicInteger sequenceIDGenerator = new AtomicInteger(0); + private static final ExecutorService executor = Executors.newFixedThreadPool(4); public static void main(String[] args) { try { @@ -73,9 +77,10 @@ public class FE { }; DERMSServerImpl servant = new DERMSServerImpl(inter); Endpoint endpoint = Endpoint.publish(endpointURL(frontendIP), servant); + Runnable task = () -> { - listenForUDPResponses(servant); try { + listenForUDPResponses(servant); sequencerSock.close(); } catch (Exception e) { e.printStackTrace(); @@ -83,6 +88,7 @@ public class FE { }; Thread thread = new Thread(task); thread.start(); + } catch (Exception e) { // System.err.println("Exception: " + e); e.printStackTrace(System.out); @@ -167,47 +173,56 @@ public class FE { // } catch (IOException e) { // System.out.println("IO: " + e.getMessage()); // } finally { -//// if (aSocket != null) -//// aSocket.close(); + + /// / if (aSocket != null) + /// / aSocket.close(); // } // } - private static void listenForUDPResponses(DERMSServerImpl servant) { List<ReliableUnicastReceiver<Response>> receivers = new ArrayList<ReliableUnicastReceiver<Response>>(); - try { - // Initialize a receiver for each RM. - for (int port : Config.frontendResponsePorts) { + // Initialize a receiver for each RM. + for (int port : Config.frontendResponsePorts) { + try { receivers.add(new ReliableUnicastReceiver<Response>( new InetSocketAddress(frontendIP, port))); - System.out.println("FE listening for responses on " + frontendIP + ":" + port + "..."); - } - - while (true) { - // Receive a response from each RM. - for (ReliableUnicastReceiver<Response> receiver : receivers) { - // Blocking call to receive a message from RM - Response response = receiver.receive(); - System.out.println("FE: Response received from RM >>> " + response); - - // Process the received response and add it to the servant - System.out.println("Adding response to FrontEndImplementation:"); - servant.addReceivedResponse(response); - } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - System.out.println("IO: " + e.getMessage()); - } catch (InterruptedException e) { - System.out.println("Listener interrupted: " + e.getMessage()); - Thread.currentThread().interrupt(); - } finally { - for (ReliableUnicastReceiver<Response> receiver : receivers) { + System.out.println("FE listening for responses on " + frontendIP + ":" + port + "..."); + } + for (ReliableUnicastReceiver<Response> receiver : receivers) { + executor.execute(() -> { try { - receiver.close(); - System.out.println("ReliableUnicastReceiver closed."); - } catch (IOException e) { - System.out.println("Error closing ReliableUnicastReceiver: " + e.getMessage()); + while (true) { + // Receive a response from each RM. + + // Blocking call to receive a message from RM + Response response; + + response = receiver.receive(); + System.out.println("FE: Response received from RM >>> " + response); + + // Process the received response and add it to the servant + System.out.println("Adding response to FrontEndImplementation:"); + servant.addReceivedResponse(response); + } + } catch (InterruptedException e) { + System.out.println("Listener interrupted: " + e.getMessage()); + Thread.currentThread().interrupt(); + } finally { + try { + receiver.close(); + System.out.println("ReliableUnicastReceiver closed."); + } catch (IOException e) { + System.out.println("Error closing ReliableUnicastReceiver: " + e.getMessage()); + } } - } + }); + } + try { + executor.awaitTermination(100, TimeUnit.HOURS); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } |