diff options
Diffstat (limited to 'src/main/java/derms/frontend/FE.java')
| -rw-r--r-- | src/main/java/derms/frontend/FE.java | 57 |
1 files changed, 35 insertions, 22 deletions
diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java index 7c8b3cd..2665116 100644 --- a/src/main/java/derms/frontend/FE.java +++ b/src/main/java/derms/frontend/FE.java @@ -11,25 +11,29 @@ import derms.Response; import derms.net.runicast.ReliableUnicastReceiver; import derms.net.runicast.ReliableUnicastSender; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; //import constants.Constants; public class FE { - private static String sequencerIP = "localhost"; + public static final String usage = "Usage: java FE <FE IP> <Sequencer IP>"; + private static String frontendIP; + private static String sequencerIP; private static ReliableUnicastSender<Request> sequencerSock; private static final String RM_Multicast_group_address = Config.group.toString(); - private static final int FE_PORT = 1999; private static final int RM_Multicast_Port = 1234; - public static String FE_Address = "http://localhost:8067/"+DERMSInterface.class.getSimpleName(); - private static final String FE_IP_Address = "localhost"; - private static AtomicInteger sequenceIDGenerator = new AtomicInteger(0); -// public static String FE_IP_Address = "localhost"; public static void main(String[] args) { try { - sequencerIP = args[0]; + if (args.length < 2) { + System.out.println(usage); + return; + } + frontendIP = args[0]; + sequencerIP = args[1]; System.out.println("Connecting to sequencer (" + sequencerIP + ":" + Config.sequencerInPort + ")..."); sequencerSock = new ReliableUnicastSender<Request>( @@ -68,7 +72,7 @@ public class FE { } }; DERMSServerImpl servant = new DERMSServerImpl(inter); - Endpoint endpoint = Endpoint.publish(FE_Address, servant); + Endpoint endpoint = Endpoint.publish(endpointURL(frontendIP), servant); Runnable task = () -> { listenForUDPResponses(servant); try { @@ -89,6 +93,11 @@ public class FE { // Logger.serverLog(serverID, " Server Shutting down"); } + // The URL where the web service endpoint is published. + public static String endpointURL(String frontendHost) { + return "http://" + frontendHost + ":" + Config.frontendEndpointPort + "/" + DERMSInterface.class.getSimpleName(); + } + private static int sendUnicastToSequencer(Request requestFromClient) { int sequenceID = sequenceIDGenerator.incrementAndGet(); try { @@ -163,22 +172,26 @@ public class FE { // } private static void listenForUDPResponses(DERMSServerImpl servant) { - ReliableUnicastReceiver<Response> receiver = null; + List<ReliableUnicastReceiver<Response>> receivers = new ArrayList<ReliableUnicastReceiver<Response>>(); try { - // Initialize the ReliableUnicastReceiver to listen on the specified address and port - InetSocketAddress laddr = new InetSocketAddress(FE_IP_Address, FE_PORT); - receiver = new ReliableUnicastReceiver<>(laddr); - - System.out.println("FE Server Started on " + FE_IP_Address + ":" + FE_PORT + "............"); + // Initialize a receiver for each RM. + for (int port : Config.frontendResponsePorts) { + receivers.add(new ReliableUnicastReceiver<Response>( + new InetSocketAddress(frontendIP, port))); + System.out.println("FE listening for responses on " + frontendIP + ":" + port + "..."); + } while (true) { - // Blocking call to receive a message from RM - Response response = receiver.receive(); - System.out.println("FE: Response received from RM >>> " + response); - - // Process the received response and add it to the servant - System.out.println("Adding response to FrontEndImplementation:"); - servant.addReceivedResponse(response); + // Receive a response from each RM. + for (ReliableUnicastReceiver<Response> receiver : receivers) { + // Blocking call to receive a message from RM + Response response = receiver.receive(); + System.out.println("FE: Response received from RM >>> " + response); + + // Process the received response and add it to the servant + System.out.println("Adding response to FrontEndImplementation:"); + servant.addReceivedResponse(response); + } } } catch (IOException e) { System.out.println("IO: " + e.getMessage()); @@ -186,7 +199,7 @@ public class FE { System.out.println("Listener interrupted: " + e.getMessage()); Thread.currentThread().interrupt(); } finally { - if (receiver != null) { + for (ReliableUnicastReceiver<Response> receiver : receivers) { try { receiver.close(); System.out.println("ReliableUnicastReceiver closed."); |