From 7e3e18c2447355b02dd00cbe5f010cd9c272e546 Mon Sep 17 00:00:00 2001 From: ShazaAli Date: Tue, 3 Dec 2024 10:38:56 -0500 Subject: Replica 1 function between servers working --- src/main/java/derms/replica1/DERMSServer.java | 21 ++++++++++----- .../java/derms/replica1/DERMSServerPublisher.java | 16 ++++++------ src/main/java/derms/replica1/Replica1.java | 30 +++++++++++++++++----- 3 files changed, 45 insertions(+), 22 deletions(-) diff --git a/src/main/java/derms/replica1/DERMSServer.java b/src/main/java/derms/replica1/DERMSServer.java index 585a946..a53d86d 100644 --- a/src/main/java/derms/replica1/DERMSServer.java +++ b/src/main/java/derms/replica1/DERMSServer.java @@ -23,11 +23,7 @@ public class DERMSServer implements DERMSInterface { private static List cities = Arrays.asList("MTL", "QUE", "SHE"); private static List resourceNames = Arrays.asList("AMBULANCE", "FIRETRUCK", "PERSONNEL"); private final Random r = new Random(); - private final Map portsMap = new HashMap() {{ - put("MTL", r.nextInt(60000-8000) + 8000); - put("QUE", r.nextInt(60000-8000) + 8000); - put("SHE", r.nextInt(60000-8000) + 8000); - }}; + private ConcurrentHashMap portsMap = null; public DERMSServer() { // Default constructor to support JAX-WS @@ -35,11 +31,20 @@ public class DERMSServer implements DERMSInterface { // this.serverID = "MTL"; // resources = new HashMap<>(); // new Thread(this::listenForMessages).start(); +// portsMap = new ConcurrentHashMap<>(); +// portsMap.put("MTL", r.nextInt(60000-8000) + 8000); +// portsMap.put("QUE", r.nextInt(60000-8000) + 8000); +// portsMap.put("SHE", r.nextInt(60000-8000) + 8000); } - public DERMSServer(String serverID) throws InterruptedException { + public DERMSServer(String serverID, ConcurrentHashMap m) throws InterruptedException { this.serverID = serverID; resources = new HashMap<>(); +// portsMap = new ConcurrentHashMap<>(); +// portsMap.put("MTL", r.nextInt(60000-8000) + 8000); +// portsMap.put("QUE", r.nextInt(60000-8000) + 8000); +// portsMap.put("SHE", r.nextInt(60000-8000) + 8000); + portsMap = m; new Thread(this::listenForMessages).start(); Thread.sleep(10); } @@ -48,6 +53,7 @@ public class DERMSServer implements DERMSInterface { try (DatagramSocket socket = new DatagramSocket(getServerPortsFromCentralRepo().get(serverID))) { this.serverPort.set(socket.getLocalPort()); byte[] buffer = new byte[1024]; + System.out.println("Listening on port: " + socket.getLocalPort()); while (true) { DatagramPacket packet = new DatagramPacket(buffer, buffer.length); @@ -94,6 +100,7 @@ public class DERMSServer implements DERMSInterface { byte[] buffer = message.getBytes(); DatagramPacket request = new DatagramPacket(buffer, buffer.length, address, port); + System.out.println("Sending message: " + message + " to " + host + ":" + port); socket.send(request); @@ -195,7 +202,7 @@ public class DERMSServer implements DERMSInterface { for (Map.Entry server : serverNames.entrySet()) { callables.add(() -> Arrays.asList(sendMessage(String.format("LIST_RESOURCE_AVAILABILITY %s", resourceName), "localhost", server.getValue()).split(","))); } - ExecutorService executor = Executors.newFixedThreadPool(5); + ExecutorService executor = Executors.newFixedThreadPool(2); List>> results; try { results = executor.invokeAll(callables); diff --git a/src/main/java/derms/replica1/DERMSServerPublisher.java b/src/main/java/derms/replica1/DERMSServerPublisher.java index 3edf16c..44ee879 100644 --- a/src/main/java/derms/replica1/DERMSServerPublisher.java +++ b/src/main/java/derms/replica1/DERMSServerPublisher.java @@ -4,13 +4,13 @@ import javax.xml.ws.Endpoint; public class DERMSServerPublisher { public static void main(String[] args) { - try { - Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL")); - Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE")); - Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE")); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - System.out.println("DERMS Web Service is published at http://localhost:8387/ws/derms"); +// try { +// Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL")); +// Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE")); +// Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE")); +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// System.out.println("DERMS Web Service is published at http://localhost:8387/ws/derms"); } } \ No newline at end of file diff --git a/src/main/java/derms/replica1/Replica1.java b/src/main/java/derms/replica1/Replica1.java index a59ed4c..6863d5c 100644 --- a/src/main/java/derms/replica1/Replica1.java +++ b/src/main/java/derms/replica1/Replica1.java @@ -7,9 +7,12 @@ import derms.Response; import derms.replica2.DermsLogger; import derms.util.ThreadPool; +import javax.xml.ws.Endpoint; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; @@ -20,12 +23,15 @@ public class Replica1 implements Replica { private final ExecutorService pool; private final Logger log; private final InetAddress localAddr; - private final ResponderClient responderClient; - private final CoordinatorClient coordinatorClient; +// private final ResponderClient responderClient; +// private final CoordinatorClient coordinatorClient; private final String responderClientID = "MTL"; private final String coordinatorClientID = "MTLC1111"; private final ReplicaManager replicaManager; private DERMSServer server; + private final ConcurrentHashMap portsMap; + private final Random r = new Random(); + public Replica1(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -35,13 +41,17 @@ public class Replica1 implements Replica { } catch (UnknownHostException e) { throw new RuntimeException(e); } - responderClient = new ResponderClient(responderClientID); - coordinatorClient = new CoordinatorClient(coordinatorClientID); +// responderClient = new ResponderClient(responderClientID); +// coordinatorClient = new CoordinatorClient(coordinatorClientID); try { this.log = DermsLogger.getLogger(getClass()); } catch (IOException e) { throw new RuntimeException(e); } + portsMap = new ConcurrentHashMap<>(); + portsMap.put("MTL", r.nextInt(60000-8000) + 8000); + portsMap.put("QUE", r.nextInt(60000-8000) + 8000); + portsMap.put("SHE", r.nextInt(60000-8000) + 8000); } @Override @@ -52,20 +62,26 @@ public class Replica1 implements Replica { @Override public void startProcess() { try { - server = new DERMSServer("MTL"); + server = new DERMSServer("MTL", portsMap); } catch (InterruptedException e) { throw new RuntimeException(e); } try { - new DERMSServer("SHE"); + new DERMSServer("SHE", portsMap); } catch (InterruptedException e) { throw new RuntimeException(e); } try { - new DERMSServer("QUE"); + new DERMSServer("QUE", portsMap); } catch (InterruptedException e) { throw new RuntimeException(e); } +// try { +// Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL")); +// Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE")); +// Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE")); +// } catch (InterruptedException e) {} + alive = true; log.info(getClass().getSimpleName() + " started."); log.config("Local address is "+localAddr.toString()); -- cgit v1.2.3