summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/derms/replica1/DERMSServer.java21
-rw-r--r--src/main/java/derms/replica1/DERMSServerPublisher.java14
-rw-r--r--src/main/java/derms/replica1/Replica1.java30
-rw-r--r--src/main/java/derms/replica3/Replica3.java23
4 files changed, 59 insertions, 29 deletions
diff --git a/src/main/java/derms/replica1/DERMSServer.java b/src/main/java/derms/replica1/DERMSServer.java
index 585a946..a53d86d 100644
--- a/src/main/java/derms/replica1/DERMSServer.java
+++ b/src/main/java/derms/replica1/DERMSServer.java
@@ -23,11 +23,7 @@ public class DERMSServer implements DERMSInterface {
private static List<String> cities = Arrays.asList("MTL", "QUE", "SHE");
private static List<String> resourceNames = Arrays.asList("AMBULANCE", "FIRETRUCK", "PERSONNEL");
private final Random r = new Random();
- private final Map<String, Integer> portsMap = new HashMap<String, Integer>() {{
- put("MTL", r.nextInt(60000-8000) + 8000);
- put("QUE", r.nextInt(60000-8000) + 8000);
- put("SHE", r.nextInt(60000-8000) + 8000);
- }};
+ private ConcurrentHashMap<String, Integer> portsMap = null;
public DERMSServer() {
// Default constructor to support JAX-WS
@@ -35,11 +31,20 @@ public class DERMSServer implements DERMSInterface {
// this.serverID = "MTL";
// resources = new HashMap<>();
// new Thread(this::listenForMessages).start();
+// portsMap = new ConcurrentHashMap<>();
+// portsMap.put("MTL", r.nextInt(60000-8000) + 8000);
+// portsMap.put("QUE", r.nextInt(60000-8000) + 8000);
+// portsMap.put("SHE", r.nextInt(60000-8000) + 8000);
}
- public DERMSServer(String serverID) throws InterruptedException {
+ public DERMSServer(String serverID, ConcurrentHashMap<String, Integer> m) throws InterruptedException {
this.serverID = serverID;
resources = new HashMap<>();
+// portsMap = new ConcurrentHashMap<>();
+// portsMap.put("MTL", r.nextInt(60000-8000) + 8000);
+// portsMap.put("QUE", r.nextInt(60000-8000) + 8000);
+// portsMap.put("SHE", r.nextInt(60000-8000) + 8000);
+ portsMap = m;
new Thread(this::listenForMessages).start();
Thread.sleep(10);
}
@@ -48,6 +53,7 @@ public class DERMSServer implements DERMSInterface {
try (DatagramSocket socket = new DatagramSocket(getServerPortsFromCentralRepo().get(serverID))) {
this.serverPort.set(socket.getLocalPort());
byte[] buffer = new byte[1024];
+ System.out.println("Listening on port: " + socket.getLocalPort());
while (true) {
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
@@ -94,6 +100,7 @@ public class DERMSServer implements DERMSInterface {
byte[] buffer = message.getBytes();
DatagramPacket request = new DatagramPacket(buffer, buffer.length, address, port);
+ System.out.println("Sending message: " + message + " to " + host + ":" + port);
socket.send(request);
@@ -195,7 +202,7 @@ public class DERMSServer implements DERMSInterface {
for (Map.Entry<String, Integer> server : serverNames.entrySet()) {
callables.add(() -> Arrays.asList(sendMessage(String.format("LIST_RESOURCE_AVAILABILITY %s", resourceName), "localhost", server.getValue()).split(",")));
}
- ExecutorService executor = Executors.newFixedThreadPool(5);
+ ExecutorService executor = Executors.newFixedThreadPool(2);
List<Future<List<String>>> results;
try {
results = executor.invokeAll(callables);
diff --git a/src/main/java/derms/replica1/DERMSServerPublisher.java b/src/main/java/derms/replica1/DERMSServerPublisher.java
index 917c189..c94cb34 100644
--- a/src/main/java/derms/replica1/DERMSServerPublisher.java
+++ b/src/main/java/derms/replica1/DERMSServerPublisher.java
@@ -7,13 +7,13 @@ public class DERMSServerPublisher {
private static Endpoint[] endpoints = new Endpoint[3];
public static void main(String[] args) {
- try {
- endpoints[0] = Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL"));
- endpoints[1] = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
- endpoints[2] = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+// try {
+// endpoints[0] = Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL"));
+// endpoints[1] = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
+// endpoints[2] = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
+// } catch (InterruptedException e) {
+// throw new RuntimeException(e);
+// }
}
public static void stop() {
diff --git a/src/main/java/derms/replica1/Replica1.java b/src/main/java/derms/replica1/Replica1.java
index c9e080d..1520b73 100644
--- a/src/main/java/derms/replica1/Replica1.java
+++ b/src/main/java/derms/replica1/Replica1.java
@@ -8,9 +8,12 @@ import derms.replica2.DermsLogger;
import derms.util.TestLogger;
import derms.util.ThreadPool;
+import javax.xml.ws.Endpoint;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
@@ -21,12 +24,15 @@ public class Replica1 implements Replica {
private final ExecutorService pool;
private final Logger log;
private final InetAddress localAddr;
- private final ResponderClient responderClient;
- private final CoordinatorClient coordinatorClient;
+// private final ResponderClient responderClient;
+// private final CoordinatorClient coordinatorClient;
private final String responderClientID = "MTL";
private final String coordinatorClientID = "MTLC1111";
private final ReplicaManager replicaManager;
private DERMSServer server;
+ private final ConcurrentHashMap<String, Integer> portsMap;
+ private final Random r = new Random();
+
private boolean byzFailure;
public Replica1(ReplicaManager replicaManager) {
@@ -37,13 +43,17 @@ public class Replica1 implements Replica {
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
- responderClient = new ResponderClient(responderClientID);
- coordinatorClient = new CoordinatorClient(coordinatorClientID);
+// responderClient = new ResponderClient(responderClientID);
+// coordinatorClient = new CoordinatorClient(coordinatorClientID);
try {
this.log = DermsLogger.getLogger(getClass());
} catch (IOException e) {
throw new RuntimeException(e);
}
+ portsMap = new ConcurrentHashMap<>();
+ portsMap.put("MTL", r.nextInt(60000-8000) + 8000);
+ portsMap.put("QUE", r.nextInt(60000-8000) + 8000);
+ portsMap.put("SHE", r.nextInt(60000-8000) + 8000);
}
@Override
@@ -68,21 +78,27 @@ public class Replica1 implements Replica {
}
try {
- server = new DERMSServer("MTL");
+ server = new DERMSServer("MTL", portsMap);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
- new DERMSServer("SHE");
+ new DERMSServer("SHE", portsMap);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
- new DERMSServer("QUE");
+ new DERMSServer("QUE", portsMap);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
+// try {
+// Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL"));
+// Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
+// Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
+// } catch (InterruptedException e) {}
+ alive = true;
log.info(getClass().getSimpleName() + " started.");
log.config("Local address is "+localAddr.toString());
}
diff --git a/src/main/java/derms/replica3/Replica3.java b/src/main/java/derms/replica3/Replica3.java
index 9e262e3..456c4f1 100644
--- a/src/main/java/derms/replica3/Replica3.java
+++ b/src/main/java/derms/replica3/Replica3.java
@@ -145,6 +145,7 @@ public class Replica3 implements Replica{
public synchronized String addResource(String resourceID, String resourceName, int duration) {
// Check if the resource name already exists
+ Resource res = new Resource(resourceID, resourceName, duration);
String message = "";
// boolean isIdValid = ValidationService.isResourceIDValid(resourceID);
// boolean isNameValid = ValidationService.isResourceNameValid(resourceName);
@@ -172,7 +173,8 @@ public class Replica3 implements Replica{
// Add the new resource
resource = new Resource(resourceID, resourceName, duration);
resourceMap.put(resourceID, resource);
- message = "New Resource of " + resourceName +" is added: " + resourceID + " : " + resourceName + " : " + duration;
+// message = "New Resource of " + resourceName +" is added: " + resourceID + " : " + resourceName + " : " + duration;
+ message = "Successfully added resource " + res;
}
// if(resourceWaitingQueues.containsKey(resourceID)) {
@@ -217,12 +219,14 @@ public class Replica3 implements Replica{
if (resource != null) {
if (duration >= resource.getDuration()) {
resourceMap.remove(resourceID); // Remove if duration is fully used
- message = "Resource of ID: " + resourceID + " with duration " + resource.getDuration() + " is successfully removed.";
+// message = "Resource of ID: " + resourceID + " with duration " + resource.getDuration() + " is successfully removed.";
+ message = "Successfully removed resource " + resourceID;
activityLogger.log(" REMOVE RESOURCE (" + resourceID + ", " + duration + ") " , " COMPLETED ", message);
return message;
} else {
resource.setDuration(resource.getDuration() - duration); // Reduce the duration
- message = "Duration of the Resource of ID: " + resourceID + " with duration " + duration + " is successfully reduced.";
+// message = "Duration of the Resource of ID: " + resourceID + " with duration " + duration + " is successfully reduced.";
+ message = "Successfully removed resource " + resourceID;
activityLogger.log(" REMOVE RESOURCE (" + resourceID + ", " + duration + ") " , " COMPLETED ", message);
return message;
}
@@ -351,8 +355,8 @@ public class Replica3 implements Replica{
coordinatorResources.put(coordinatorID, lst);
System.out.println(coordinatorResources.keySet());
}
- message = "Coordinator of ID " + coordinatorID + " borrowed resource of ID " + resourceID + " " + "from same server";
-
+// message = "Coordinator of ID " + coordinatorID + " borrowed resource of ID " + resourceID + " " + "from same server";
+ message = "Successfully borrowed " + resourceID;
activityLogger.log(" REQUEST RESOURCE (" + coordinatorID + ", " + resourceID + ", " + duration + ") " , " COMPLETED ", message);
return message;
} else if (resource != null) {
@@ -418,7 +422,8 @@ public class Replica3 implements Replica{
}
activityLogger.log(" REQUEST RESOURCE (" + coordinatorID + ", " + resourceID + ", " + duration + ") " , " COMPLETED ", message);
- return result.toString();
+ message = "Successfully borrowed " + resourceID;
+ return message;
}
@@ -571,7 +576,8 @@ public class Replica3 implements Replica{
if(coordinatorResources.containsKey(coordinatorID)) {
for(Resource res: coordinatorResources.get(coordinatorID)) {
if(res.getResourceID().equalsIgnoreCase(resourceID)) {
- message = "Resource " + resourceID + "-" + res.getDuration() + " returned successfully to coordinator " + coordinatorID;
+// message = "Resource " + resourceID + "-" + res.getDuration() + " returned successfully to coordinator " + coordinatorID;
+ message = "Successfully returned resource " + resourceID;
activityLogger.log(" RETURN RESOURCE (" + coordinatorID + ", " + resourceID + ") " , " COMPLETED ", message);
return message;
}
@@ -609,7 +615,8 @@ public class Replica3 implements Replica{
activityLogger.log(" SWAP RESOURCE (" + coordinatorID + ", " + oldResourceID + ") " , " FAILED ", message);
return message;
}
- message = "Coordinator of ID: " + coordinatorID + " Successfully swapped old resource " + oldResourceID + " with new resource " + newResourceID;
+// message = "Coordinator of ID: " + coordinatorID + " Successfully swapped old resource " + oldResourceID + " with new resource " + newResourceID;
+ message = "Successfully swapped " + oldResourceID + " for " + newResourceID;
// activityLogger.log(" SWAP RESOURCE (" + coordinatorID + ", " + oldResourceID + ", " + oldResourceType + ", " + newResourceID + ", " + newResourceType + ", "+ ") ",
// " COMPLETED ", message);