summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-12-02 13:20:42 -0500
committerGitHub <noreply@github.com>2024-12-02 13:20:42 -0500
commit687b126f39c44a083d424fc53fa3843bbdf91473 (patch)
treec9ce898287900b8d303eabb70bebb82b1e8493b4
parent428d76b5a865eda493163a6ab8fceb0168527e1b (diff)
parentbd266a1f58adaa466c3c3b16a46a7bf8d3110a3b (diff)
downloadsoen423-687b126f39c44a083d424fc53fa3843bbdf91473.zip
Merge pull request #2 from sam-rba/multihost
Make Run On Multiple Hosts
-rw-r--r--src/main/java/derms/Config.java6
-rw-r--r--src/main/java/derms/ReplicaManager.java22
-rw-r--r--src/main/java/derms/client/Client.java4
-rw-r--r--src/main/java/derms/frontend/FE.java57
4 files changed, 58 insertions, 31 deletions
diff --git a/src/main/java/derms/Config.java b/src/main/java/derms/Config.java
index f401886..f8954ef 100644
--- a/src/main/java/derms/Config.java
+++ b/src/main/java/derms/Config.java
@@ -3,9 +3,15 @@ package derms;
import java.net.InetSocketAddress;
public class Config {
+ // The port where the FE publishes the web service endpoint.
+ public static final int frontendEndpointPort = 8067;
+
// The port where the sequencer listens for requests from the FE.
public static final int sequencerInPort = 62310;
// The multicast group of the RMs and sequencer.
public static final InetSocketAddress group = new InetSocketAddress("225.5.5.6", 62311);
+
+ // Ports where the FE listens for responses from each RM.
+ public static final int[] frontendResponsePorts = {62312, 62313, 62314, 62315};
}
diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java
index 9e3a6cc..414cdf0 100644
--- a/src/main/java/derms/ReplicaManager.java
+++ b/src/main/java/derms/ReplicaManager.java
@@ -21,23 +21,30 @@ import java.io.ObjectInputStream;
import java.util.logging.Logger;
public class ReplicaManager {
+ public static final String usage = "Usage: java ReplicaManager <replicaId> <frontEndIP>";
private final int replicaId;
private Replica replica;
private Response response;
private final Logger log;
- private ReliableUnicastSender<Response> unicastSender = new ReliableUnicastSender<>(new InetSocketAddress("localhost", 1999));
+ private InetSocketAddress frontEndAddress;
+ private ReliableUnicastSender<Response> unicastSender;
private TotalOrderMulticastReceiver multicastReceiver;
- private final InetSocketAddress frontEndAddress;
- public ReplicaManager(int replicaId) throws IOException {
+ public ReplicaManager(int replicaId, InetAddress frontEndIP) throws IOException {
this.replicaId = replicaId;
this.log = Logger.getLogger(getClass().getName());
- this.frontEndAddress = new InetSocketAddress("localhost", 1999);
+ initUnicastSender(frontEndIP);
initReplica();
initMulticastReceiver();
startHeartbeatThread();
}
+ private void initUnicastSender(InetAddress frontEndIP) throws IOException {
+ int frontEndPort = Config.frontendResponsePorts[replicaId - 1];
+ frontEndAddress = new InetSocketAddress(frontEndIP, frontEndPort);
+ unicastSender = new ReliableUnicastSender<>(frontEndAddress);
+ }
+
private void initReplica() throws IOException {
switch (replicaId) {
case 1:
@@ -128,15 +135,16 @@ public class ReplicaManager {
}
public static void main(String[] args) {
- if (args.length != 1) {
- System.err.println("Usage: java ReplicaManager <replicaId>");
+ if (args.length < 2) {
+ System.err.println(usage);
System.exit(1);
}
int replicaId = Integer.parseInt(args[0]);
try {
- ReplicaManager replicaManager = new ReplicaManager(replicaId);
+ InetAddress frontEndIP = InetAddress.getByName(args[1]);
+ ReplicaManager replicaManager = new ReplicaManager(replicaId, frontEndIP);
System.out.println("ReplicaManager " + replicaId + " is running.");
} catch (IOException e) {
System.err.println("Failed to start ReplicaManager: " + e.getMessage());
diff --git a/src/main/java/derms/client/Client.java b/src/main/java/derms/client/Client.java
index db8d169..11e863f 100644
--- a/src/main/java/derms/client/Client.java
+++ b/src/main/java/derms/client/Client.java
@@ -2,6 +2,7 @@ package derms.client;
import derms.frontend.DERMSInterface;
import derms.frontend.DERMSServerImpl;
+import derms.frontend.FE;
import javax.xml.namespace.QName;
import javax.xml.ws.Service;
@@ -10,13 +11,12 @@ import java.net.URL;
public abstract class Client {
public static final String namespace = "frontend.derms";
- public static final int port = 8067;
public static final QName qname = new QName("http://"+namespace+"/", DERMSServerImpl.class.getSimpleName()+"Service");
protected final DERMSInterface server;
protected Client(String FEhost) throws MalformedURLException {
- URL url = new URL("http://"+FEhost+":"+port+"/"+DERMSInterface.class.getSimpleName()+"?wsdl");
+ URL url = new URL(FE.endpointURL(FEhost) + "?wsdl");
this.server = Service.create(url, qname).getPort(DERMSInterface.class);
}
}
diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java
index 7c8b3cd..2665116 100644
--- a/src/main/java/derms/frontend/FE.java
+++ b/src/main/java/derms/frontend/FE.java
@@ -11,25 +11,29 @@ import derms.Response;
import derms.net.runicast.ReliableUnicastReceiver;
import derms.net.runicast.ReliableUnicastSender;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
//import constants.Constants;
public class FE {
- private static String sequencerIP = "localhost";
+ public static final String usage = "Usage: java FE <FE IP> <Sequencer IP>";
+ private static String frontendIP;
+ private static String sequencerIP;
private static ReliableUnicastSender<Request> sequencerSock;
private static final String RM_Multicast_group_address = Config.group.toString();
- private static final int FE_PORT = 1999;
private static final int RM_Multicast_Port = 1234;
- public static String FE_Address = "http://localhost:8067/"+DERMSInterface.class.getSimpleName();
- private static final String FE_IP_Address = "localhost";
-
private static AtomicInteger sequenceIDGenerator = new AtomicInteger(0);
-// public static String FE_IP_Address = "localhost";
public static void main(String[] args) {
try {
- sequencerIP = args[0];
+ if (args.length < 2) {
+ System.out.println(usage);
+ return;
+ }
+ frontendIP = args[0];
+ sequencerIP = args[1];
System.out.println("Connecting to sequencer ("
+ sequencerIP + ":" + Config.sequencerInPort + ")...");
sequencerSock = new ReliableUnicastSender<Request>(
@@ -68,7 +72,7 @@ public class FE {
}
};
DERMSServerImpl servant = new DERMSServerImpl(inter);
- Endpoint endpoint = Endpoint.publish(FE_Address, servant);
+ Endpoint endpoint = Endpoint.publish(endpointURL(frontendIP), servant);
Runnable task = () -> {
listenForUDPResponses(servant);
try {
@@ -89,6 +93,11 @@ public class FE {
// Logger.serverLog(serverID, " Server Shutting down");
}
+ // The URL where the web service endpoint is published.
+ public static String endpointURL(String frontendHost) {
+ return "http://" + frontendHost + ":" + Config.frontendEndpointPort + "/" + DERMSInterface.class.getSimpleName();
+ }
+
private static int sendUnicastToSequencer(Request requestFromClient) {
int sequenceID = sequenceIDGenerator.incrementAndGet();
try {
@@ -163,22 +172,26 @@ public class FE {
// }
private static void listenForUDPResponses(DERMSServerImpl servant) {
- ReliableUnicastReceiver<Response> receiver = null;
+ List<ReliableUnicastReceiver<Response>> receivers = new ArrayList<ReliableUnicastReceiver<Response>>();
try {
- // Initialize the ReliableUnicastReceiver to listen on the specified address and port
- InetSocketAddress laddr = new InetSocketAddress(FE_IP_Address, FE_PORT);
- receiver = new ReliableUnicastReceiver<>(laddr);
-
- System.out.println("FE Server Started on " + FE_IP_Address + ":" + FE_PORT + "............");
+ // Initialize a receiver for each RM.
+ for (int port : Config.frontendResponsePorts) {
+ receivers.add(new ReliableUnicastReceiver<Response>(
+ new InetSocketAddress(frontendIP, port)));
+ System.out.println("FE listening for responses on " + frontendIP + ":" + port + "...");
+ }
while (true) {
- // Blocking call to receive a message from RM
- Response response = receiver.receive();
- System.out.println("FE: Response received from RM >>> " + response);
-
- // Process the received response and add it to the servant
- System.out.println("Adding response to FrontEndImplementation:");
- servant.addReceivedResponse(response);
+ // Receive a response from each RM.
+ for (ReliableUnicastReceiver<Response> receiver : receivers) {
+ // Blocking call to receive a message from RM
+ Response response = receiver.receive();
+ System.out.println("FE: Response received from RM >>> " + response);
+
+ // Process the received response and add it to the servant
+ System.out.println("Adding response to FrontEndImplementation:");
+ servant.addReceivedResponse(response);
+ }
}
} catch (IOException e) {
System.out.println("IO: " + e.getMessage());
@@ -186,7 +199,7 @@ public class FE {
System.out.println("Listener interrupted: " + e.getMessage());
Thread.currentThread().interrupt();
} finally {
- if (receiver != null) {
+ for (ReliableUnicastReceiver<Response> receiver : receivers) {
try {
receiver.close();
System.out.println("ReliableUnicastReceiver closed.");