From d5e43729aa80e6b4156718b453e2e71620917fff Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Mon, 2 Dec 2024 12:27:38 -0500 Subject: FE: one receiver per RM --- src/main/java/derms/Config.java | 3 +++ src/main/java/derms/ReplicaManager.java | 12 ++++++++--- src/main/java/derms/frontend/FE.java | 35 +++++++++++++++++++-------------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/main/java/derms/Config.java b/src/main/java/derms/Config.java index f401886..430d9de 100644 --- a/src/main/java/derms/Config.java +++ b/src/main/java/derms/Config.java @@ -8,4 +8,7 @@ public class Config { // The multicast group of the RMs and sequencer. public static final InetSocketAddress group = new InetSocketAddress("225.5.5.6", 62311); + + // Ports where the FE listens for responses from each RM. + public static final int[] frontendResponsePorts = {62312, 62313, 62314, 62315}; } diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 9e3a6cc..1f95fac 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -25,19 +25,25 @@ public class ReplicaManager { private Replica replica; private Response response; private final Logger log; - private ReliableUnicastSender unicastSender = new ReliableUnicastSender<>(new InetSocketAddress("localhost", 1999)); + private InetSocketAddress frontEndAddress; + private ReliableUnicastSender unicastSender; private TotalOrderMulticastReceiver multicastReceiver; - private final InetSocketAddress frontEndAddress; public ReplicaManager(int replicaId) throws IOException { this.replicaId = replicaId; this.log = Logger.getLogger(getClass().getName()); - this.frontEndAddress = new InetSocketAddress("localhost", 1999); + initUnicastSender(); initReplica(); initMulticastReceiver(); startHeartbeatThread(); } + private void initUnicastSender() throws IOException { + int frontEndPort = Config.frontendResponsePorts[replicaId - 1]; + frontEndAddress = new InetSocketAddress("localhost", frontEndPort); + unicastSender = new ReliableUnicastSender<>(frontEndAddress); + } + private void initReplica() throws IOException { switch (replicaId) { case 1: diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java index 7c8b3cd..ccba647 100644 --- a/src/main/java/derms/frontend/FE.java +++ b/src/main/java/derms/frontend/FE.java @@ -11,6 +11,8 @@ import derms.Response; import derms.net.runicast.ReliableUnicastReceiver; import derms.net.runicast.ReliableUnicastSender; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; //import constants.Constants; @@ -19,7 +21,6 @@ public class FE { private static String sequencerIP = "localhost"; private static ReliableUnicastSender sequencerSock; private static final String RM_Multicast_group_address = Config.group.toString(); - private static final int FE_PORT = 1999; private static final int RM_Multicast_Port = 1234; public static String FE_Address = "http://localhost:8067/"+DERMSInterface.class.getSimpleName(); private static final String FE_IP_Address = "localhost"; @@ -163,22 +164,26 @@ public class FE { // } private static void listenForUDPResponses(DERMSServerImpl servant) { - ReliableUnicastReceiver receiver = null; + List> receivers = new ArrayList>(); try { - // Initialize the ReliableUnicastReceiver to listen on the specified address and port - InetSocketAddress laddr = new InetSocketAddress(FE_IP_Address, FE_PORT); - receiver = new ReliableUnicastReceiver<>(laddr); - - System.out.println("FE Server Started on " + FE_IP_Address + ":" + FE_PORT + "............"); + // Initialize a receiver for each RM. + for (int port : Config.frontendResponsePorts) { + receivers.add(new ReliableUnicastReceiver( + new InetSocketAddress(FE_IP_Address, port))); + System.out.println("FE listening for responses on " + FE_IP_Address + ":" + port + "..."); + } while (true) { - // Blocking call to receive a message from RM - Response response = receiver.receive(); - System.out.println("FE: Response received from RM >>> " + response); - - // Process the received response and add it to the servant - System.out.println("Adding response to FrontEndImplementation:"); - servant.addReceivedResponse(response); + // Receive a response from each RM. + for (ReliableUnicastReceiver receiver : receivers) { + // Blocking call to receive a message from RM + Response response = receiver.receive(); + System.out.println("FE: Response received from RM >>> " + response); + + // Process the received response and add it to the servant + System.out.println("Adding response to FrontEndImplementation:"); + servant.addReceivedResponse(response); + } } } catch (IOException e) { System.out.println("IO: " + e.getMessage()); @@ -186,7 +191,7 @@ public class FE { System.out.println("Listener interrupted: " + e.getMessage()); Thread.currentThread().interrupt(); } finally { - if (receiver != null) { + for (ReliableUnicastReceiver receiver : receivers) { try { receiver.close(); System.out.println("ReliableUnicastReceiver closed."); -- cgit v1.2.3 From 141212aeaa705052499229e26437a08a9096c46a Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Mon, 2 Dec 2024 12:58:18 -0500 Subject: pass FE ip address as cmdline arg --- src/main/java/derms/Config.java | 3 +++ src/main/java/derms/ReplicaManager.java | 16 +++++++++------- src/main/java/derms/frontend/FE.java | 26 +++++++++++++++++--------- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/main/java/derms/Config.java b/src/main/java/derms/Config.java index 430d9de..f8954ef 100644 --- a/src/main/java/derms/Config.java +++ b/src/main/java/derms/Config.java @@ -3,6 +3,9 @@ package derms; import java.net.InetSocketAddress; public class Config { + // The port where the FE publishes the web service endpoint. + public static final int frontendEndpointPort = 8067; + // The port where the sequencer listens for requests from the FE. public static final int sequencerInPort = 62310; diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 1f95fac..414cdf0 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -21,6 +21,7 @@ import java.io.ObjectInputStream; import java.util.logging.Logger; public class ReplicaManager { + public static final String usage = "Usage: java ReplicaManager "; private final int replicaId; private Replica replica; private Response response; @@ -29,18 +30,18 @@ public class ReplicaManager { private ReliableUnicastSender unicastSender; private TotalOrderMulticastReceiver multicastReceiver; - public ReplicaManager(int replicaId) throws IOException { + public ReplicaManager(int replicaId, InetAddress frontEndIP) throws IOException { this.replicaId = replicaId; this.log = Logger.getLogger(getClass().getName()); - initUnicastSender(); + initUnicastSender(frontEndIP); initReplica(); initMulticastReceiver(); startHeartbeatThread(); } - private void initUnicastSender() throws IOException { + private void initUnicastSender(InetAddress frontEndIP) throws IOException { int frontEndPort = Config.frontendResponsePorts[replicaId - 1]; - frontEndAddress = new InetSocketAddress("localhost", frontEndPort); + frontEndAddress = new InetSocketAddress(frontEndIP, frontEndPort); unicastSender = new ReliableUnicastSender<>(frontEndAddress); } @@ -134,15 +135,16 @@ public class ReplicaManager { } public static void main(String[] args) { - if (args.length != 1) { - System.err.println("Usage: java ReplicaManager "); + if (args.length < 2) { + System.err.println(usage); System.exit(1); } int replicaId = Integer.parseInt(args[0]); try { - ReplicaManager replicaManager = new ReplicaManager(replicaId); + InetAddress frontEndIP = InetAddress.getByName(args[1]); + ReplicaManager replicaManager = new ReplicaManager(replicaId, frontEndIP); System.out.println("ReplicaManager " + replicaId + " is running."); } catch (IOException e) { System.err.println("Failed to start ReplicaManager: " + e.getMessage()); diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java index ccba647..2665116 100644 --- a/src/main/java/derms/frontend/FE.java +++ b/src/main/java/derms/frontend/FE.java @@ -18,19 +18,22 @@ import java.util.concurrent.atomic.AtomicInteger; //import constants.Constants; public class FE { - private static String sequencerIP = "localhost"; + public static final String usage = "Usage: java FE "; + private static String frontendIP; + private static String sequencerIP; private static ReliableUnicastSender sequencerSock; private static final String RM_Multicast_group_address = Config.group.toString(); private static final int RM_Multicast_Port = 1234; - public static String FE_Address = "http://localhost:8067/"+DERMSInterface.class.getSimpleName(); - private static final String FE_IP_Address = "localhost"; - private static AtomicInteger sequenceIDGenerator = new AtomicInteger(0); -// public static String FE_IP_Address = "localhost"; public static void main(String[] args) { try { - sequencerIP = args[0]; + if (args.length < 2) { + System.out.println(usage); + return; + } + frontendIP = args[0]; + sequencerIP = args[1]; System.out.println("Connecting to sequencer (" + sequencerIP + ":" + Config.sequencerInPort + ")..."); sequencerSock = new ReliableUnicastSender( @@ -69,7 +72,7 @@ public class FE { } }; DERMSServerImpl servant = new DERMSServerImpl(inter); - Endpoint endpoint = Endpoint.publish(FE_Address, servant); + Endpoint endpoint = Endpoint.publish(endpointURL(frontendIP), servant); Runnable task = () -> { listenForUDPResponses(servant); try { @@ -90,6 +93,11 @@ public class FE { // Logger.serverLog(serverID, " Server Shutting down"); } + // The URL where the web service endpoint is published. + public static String endpointURL(String frontendHost) { + return "http://" + frontendHost + ":" + Config.frontendEndpointPort + "/" + DERMSInterface.class.getSimpleName(); + } + private static int sendUnicastToSequencer(Request requestFromClient) { int sequenceID = sequenceIDGenerator.incrementAndGet(); try { @@ -169,8 +177,8 @@ public class FE { // Initialize a receiver for each RM. for (int port : Config.frontendResponsePorts) { receivers.add(new ReliableUnicastReceiver( - new InetSocketAddress(FE_IP_Address, port))); - System.out.println("FE listening for responses on " + FE_IP_Address + ":" + port + "..."); + new InetSocketAddress(frontendIP, port))); + System.out.println("FE listening for responses on " + frontendIP + ":" + port + "..."); } while (true) { -- cgit v1.2.3 From bd266a1f58adaa466c3c3b16a46a7bf8d3110a3b Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Mon, 2 Dec 2024 13:02:53 -0500 Subject: client: get endpoint url from FE method --- src/main/java/derms/client/Client.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/derms/client/Client.java b/src/main/java/derms/client/Client.java index db8d169..11e863f 100644 --- a/src/main/java/derms/client/Client.java +++ b/src/main/java/derms/client/Client.java @@ -2,6 +2,7 @@ package derms.client; import derms.frontend.DERMSInterface; import derms.frontend.DERMSServerImpl; +import derms.frontend.FE; import javax.xml.namespace.QName; import javax.xml.ws.Service; @@ -10,13 +11,12 @@ import java.net.URL; public abstract class Client { public static final String namespace = "frontend.derms"; - public static final int port = 8067; public static final QName qname = new QName("http://"+namespace+"/", DERMSServerImpl.class.getSimpleName()+"Service"); protected final DERMSInterface server; protected Client(String FEhost) throws MalformedURLException { - URL url = new URL("http://"+FEhost+":"+port+"/"+DERMSInterface.class.getSimpleName()+"?wsdl"); + URL url = new URL(FE.endpointURL(FEhost) + "?wsdl"); this.server = Service.create(url, qname).getPort(DERMSInterface.class); } } -- cgit v1.2.3