diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-12-02 12:27:38 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-12-02 12:27:38 -0500 |
| commit | d5e43729aa80e6b4156718b453e2e71620917fff (patch) | |
| tree | f374ca1037a77def9618996601a52dbe14f89084 /src | |
| parent | 428d76b5a865eda493163a6ab8fceb0168527e1b (diff) | |
| download | soen423-d5e43729aa80e6b4156718b453e2e71620917fff.zip | |
FE: one receiver per RM
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/derms/Config.java | 3 | ||||
| -rw-r--r-- | src/main/java/derms/ReplicaManager.java | 12 | ||||
| -rw-r--r-- | src/main/java/derms/frontend/FE.java | 35 |
3 files changed, 32 insertions, 18 deletions
diff --git a/src/main/java/derms/Config.java b/src/main/java/derms/Config.java index f401886..430d9de 100644 --- a/src/main/java/derms/Config.java +++ b/src/main/java/derms/Config.java @@ -8,4 +8,7 @@ public class Config { // The multicast group of the RMs and sequencer. public static final InetSocketAddress group = new InetSocketAddress("225.5.5.6", 62311); + + // Ports where the FE listens for responses from each RM. + public static final int[] frontendResponsePorts = {62312, 62313, 62314, 62315}; } diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 9e3a6cc..1f95fac 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -25,19 +25,25 @@ public class ReplicaManager { private Replica replica; private Response response; private final Logger log; - private ReliableUnicastSender<Response> unicastSender = new ReliableUnicastSender<>(new InetSocketAddress("localhost", 1999)); + private InetSocketAddress frontEndAddress; + private ReliableUnicastSender<Response> unicastSender; private TotalOrderMulticastReceiver multicastReceiver; - private final InetSocketAddress frontEndAddress; public ReplicaManager(int replicaId) throws IOException { this.replicaId = replicaId; this.log = Logger.getLogger(getClass().getName()); - this.frontEndAddress = new InetSocketAddress("localhost", 1999); + initUnicastSender(); initReplica(); initMulticastReceiver(); startHeartbeatThread(); } + private void initUnicastSender() throws IOException { + int frontEndPort = Config.frontendResponsePorts[replicaId - 1]; + frontEndAddress = new InetSocketAddress("localhost", frontEndPort); + unicastSender = new ReliableUnicastSender<>(frontEndAddress); + } + private void initReplica() throws IOException { switch (replicaId) { case 1: diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java index 7c8b3cd..ccba647 100644 --- a/src/main/java/derms/frontend/FE.java +++ b/src/main/java/derms/frontend/FE.java @@ -11,6 +11,8 @@ import derms.Response; import derms.net.runicast.ReliableUnicastReceiver; import derms.net.runicast.ReliableUnicastSender; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; //import constants.Constants; @@ -19,7 +21,6 @@ public class FE { private static String sequencerIP = "localhost"; private static ReliableUnicastSender<Request> sequencerSock; private static final String RM_Multicast_group_address = Config.group.toString(); - private static final int FE_PORT = 1999; private static final int RM_Multicast_Port = 1234; public static String FE_Address = "http://localhost:8067/"+DERMSInterface.class.getSimpleName(); private static final String FE_IP_Address = "localhost"; @@ -163,22 +164,26 @@ public class FE { // } private static void listenForUDPResponses(DERMSServerImpl servant) { - ReliableUnicastReceiver<Response> receiver = null; + List<ReliableUnicastReceiver<Response>> receivers = new ArrayList<ReliableUnicastReceiver<Response>>(); try { - // Initialize the ReliableUnicastReceiver to listen on the specified address and port - InetSocketAddress laddr = new InetSocketAddress(FE_IP_Address, FE_PORT); - receiver = new ReliableUnicastReceiver<>(laddr); - - System.out.println("FE Server Started on " + FE_IP_Address + ":" + FE_PORT + "............"); + // Initialize a receiver for each RM. + for (int port : Config.frontendResponsePorts) { + receivers.add(new ReliableUnicastReceiver<Response>( + new InetSocketAddress(FE_IP_Address, port))); + System.out.println("FE listening for responses on " + FE_IP_Address + ":" + port + "..."); + } while (true) { - // Blocking call to receive a message from RM - Response response = receiver.receive(); - System.out.println("FE: Response received from RM >>> " + response); - - // Process the received response and add it to the servant - System.out.println("Adding response to FrontEndImplementation:"); - servant.addReceivedResponse(response); + // Receive a response from each RM. + for (ReliableUnicastReceiver<Response> receiver : receivers) { + // Blocking call to receive a message from RM + Response response = receiver.receive(); + System.out.println("FE: Response received from RM >>> " + response); + + // Process the received response and add it to the servant + System.out.println("Adding response to FrontEndImplementation:"); + servant.addReceivedResponse(response); + } } } catch (IOException e) { System.out.println("IO: " + e.getMessage()); @@ -186,7 +191,7 @@ public class FE { System.out.println("Listener interrupted: " + e.getMessage()); Thread.currentThread().interrupt(); } finally { - if (receiver != null) { + for (ReliableUnicastReceiver<Response> receiver : receivers) { try { receiver.close(); System.out.println("ReliableUnicastReceiver closed."); |