summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore6
-rw-r--r--SystemTest.log3
-rw-r--r--TestExpected.log3
-rw-r--r--TestExpectedByz.log5
-rw-r--r--TestExpectedCombined.log10
-rw-r--r--TestExpectedCrash.log5
-rw-r--r--src/main/java/derms/Replica.java2
-rw-r--r--src/main/java/derms/Replica4.java108
-rw-r--r--src/main/java/derms/Replica4pkg/RemoteServer.java38
-rw-r--r--src/main/java/derms/Replica4pkg/Replica4.java578
-rw-r--r--src/main/java/derms/ReplicaManager.java55
-rw-r--r--src/main/java/derms/client/CoordinatorClient.java112
-rw-r--r--src/main/java/derms/client/CoordinatorClientCLI.java119
-rw-r--r--src/main/java/derms/client/ResponderClient.java100
-rw-r--r--src/main/java/derms/client/ResponderClientCLI.java104
-rw-r--r--src/main/java/derms/frontend/DERMSServerImpl.java5
-rw-r--r--src/main/java/derms/frontend/FE.java3
-rw-r--r--src/main/java/derms/replica1/DERMSServerPublisher.java19
-rw-r--r--src/main/java/derms/replica1/Replica1.java30
-rw-r--r--src/main/java/derms/replica2/Replica2.java32
-rw-r--r--src/main/java/derms/replica3/Replica3.java33
-rw-r--r--src/main/java/derms/util/LogComparator.java27
-rw-r--r--src/main/java/derms/util/TestLogger.java27
-rw-r--r--src/test/java/derms/test/SystemTest.java183
24 files changed, 1269 insertions, 338 deletions
diff --git a/.gitignore b/.gitignore
index 6828b80..4357efe 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,7 +3,11 @@
*.bbl
*.bcf
*.blg
-*.log
+QUEServer.log
+MTLServer.log
+SHEServer.log
+server.log
+SystemTest.log
*.run.xml
*.toc
javadoc
diff --git a/SystemTest.log b/SystemTest.log
new file mode 100644
index 0000000..0b9da65
--- /dev/null
+++ b/SystemTest.log
@@ -0,0 +1,3 @@
+[2024-12-03 12:45:40] REPLICA 1: {BYZANTINE: FALSE}
+[2024-12-03 12:45:40] REPLICA 1: {CRASH: FALSE}
+[2024-12-03 12:46:00] [FAILED: Fail: No response from any server]
diff --git a/TestExpected.log b/TestExpected.log
new file mode 100644
index 0000000..1cea96b
--- /dev/null
+++ b/TestExpected.log
@@ -0,0 +1,3 @@
+[2024-12-03 07:48:36] REPLICA 1: {BYZANTINE: FALSE}
+[2024-12-03 07:48:36] REPLICA 1: {CRASH: FALSE}
+[2024-12-03 08:53:10] [SUCCESS: OK] \ No newline at end of file
diff --git a/TestExpectedByz.log b/TestExpectedByz.log
new file mode 100644
index 0000000..b5cabee
--- /dev/null
+++ b/TestExpectedByz.log
@@ -0,0 +1,5 @@
+[2024-12-03 09:22:35] REPLICA 1: {BYZANTINE: TRUE}
+[2024-12-03 09:22:35] REPLICA 1: {CRASH: FALSE}
+[2024-12-03 09:22:35] REPLICA 1: {BYZANTINE: DETECTED}
+[2024-12-03 09:22:35] REPLICA 1: {RESTARTED}
+[2024-12-03 09:22:56] [SUCCESS: OK] \ No newline at end of file
diff --git a/TestExpectedCombined.log b/TestExpectedCombined.log
new file mode 100644
index 0000000..9d955ad
--- /dev/null
+++ b/TestExpectedCombined.log
@@ -0,0 +1,10 @@
+[2024-12-03 09:27:08] REPLICA 1: {BYZANTINE: TRUE}
+[2024-12-03 09:27:08] REPLICA 1: {CRASH: FALSE}
+[2024-12-03 09:27:08] REPLICA 1: {BYZANTINE: DETECTED}
+[2024-12-03 09:27:08] REPLICA 1: {RESTARTED}
+[2024-12-03 09:27:08] REPLICA 3: {BYZANTINE: FALSE}
+[2024-12-03 09:27:08] REPLICA 3: {CRASH: TRUE}
+[2024-12-03 09:27:08] REPLICA 3: {CRASH: DETECTED}
+[2024-12-03 09:27:08] REPLICA 3: {RESTARTED}
+[2024-12-03 09:27:28] [SUCCESS: OK]
+[2024-12-03 09:27:28] [SUCCESS: OK] \ No newline at end of file
diff --git a/TestExpectedCrash.log b/TestExpectedCrash.log
new file mode 100644
index 0000000..90a0640
--- /dev/null
+++ b/TestExpectedCrash.log
@@ -0,0 +1,5 @@
+[2024-12-03 09:22:35] REPLICA 1: {BYZANTINE: FALSE}
+[2024-12-03 09:22:35] REPLICA 1: {CRASH: TRUE}
+[2024-12-03 09:22:35] REPLICA 1: {CRASH: DETECTED}
+[2024-12-03 09:22:35] REPLICA 1: {RESTARTED}
+[2024-12-03 09:22:56] [SUCCESS: OK] \ No newline at end of file
diff --git a/src/main/java/derms/Replica.java b/src/main/java/derms/Replica.java
index eae0014..a717064 100644
--- a/src/main/java/derms/Replica.java
+++ b/src/main/java/derms/Replica.java
@@ -2,7 +2,7 @@ package derms;
public interface Replica {
boolean isAlive();
- void startProcess();
+ void startProcess(int byzantine, int crash);
void processRequest(Request request);
void restart();
int getId();
diff --git a/src/main/java/derms/Replica4.java b/src/main/java/derms/Replica4.java
deleted file mode 100644
index ed7e90b..0000000
--- a/src/main/java/derms/Replica4.java
+++ /dev/null
@@ -1,108 +0,0 @@
-//package derms;
-//
-////import derms.Replica3pkg.ResponderClient;
-//import java.io.*;
-//import java.util.*;
-//
-//import derms.Replica4pkg.RemoteServer;
-//
-//public class Replica4 implements Replica {
-//
-// private ReplicaManager replicaManager;
-// private RemoteServer remoteServer;
-// private boolean alive = true;
-//
-// public Replica4(ReplicaManager replicaManager){
-// this.replicaManager = replicaManager;
-// }
-//
-// @Override
-// public boolean isAlive() {
-// return alive;
-// }
-//
-// @Override
-// public void startProcess() {
-// this.remoteServer = new RemoteServer();
-// System.out.println("[Replica 4] Process started.");
-// }
-//
-// @Override
-// public void processRequest(Request request) {
-// ResponderClient responderClient;
-// CoordinatorClient coordinatorClient;
-// String responseMessage;
-// switch (request.getFunction()) {
-// case "addResource":
-// responderClient = new ResponderClient(request.getClientID());
-// responseMessage = responderClient.addResource(request.getResourceID(), request.getResourceType(), request.getDuration());
-// break;
-// case "removeResource":
-// responderClient = new ResponderClient(request.getClientID());
-// responseMessage = responderClient.removeResource(request.getResourceID(), request.getDuration());
-// break;
-// case "listResourceAvailability":
-// responderClient = new ResponderClient(request.getClientID());
-// responseMessage = responderClient.listResourceAvailability(request.getResourceType());
-// break;
-// case "requestResource":
-// coordinatorClient = new CoordinatorClient(request.getClientID());
-// responseMessage = coordinatorClient.requestResource(request.getResourceID(), request.getDuration());
-// break;
-// case "findResource":
-// coordinatorClient = new CoordinatorClient(request.getClientID());
-// responseMessage = coordinatorClient.findResource(request.getResourceType());
-// break;
-// case "returnResource":
-// coordinatorClient = new CoordinatorClient(request.getClientID());
-// responseMessage = coordinatorClient.returnResource(request.getResourceID());
-// break;
-// case "swapResource":
-// coordinatorClient = new CoordinatorClient(request.getClientID());
-// responseMessage = coordinatorClient.swapResource(
-// request.getClientID(),
-// request.getOldResourceID(),
-// request.getOldResourceType(),
-// request.getResourceID(),
-// request.getResourceType()
-// );
-// break;
-// default:
-// responseMessage = "Unrecognized function: " + request.getFunction();
-// log("Unrecognized function in request: " + request.getFunction());
-// break;
-// }
-//
-// Response response = new Response(request.getSequenceNumber(), responseMessage);
-// log("Replica " + 4 + " processed request: " + request + ", response: " + response);
-// replicaManager.sendResponseToFE(response);
-// }
-//
-// @Override
-// public void restart() {
-// shutDown();
-// startProcess();
-// }
-//
-// public void shutDown(){
-// this.remoteServer.stopServers();
-// }
-//
-// @Override
-// public int getId() {
-// return 4;
-// }
-//
-// public synchronized void log(String message) {
-// String logMessage = new Date() + " - " + message;
-// System.out.println(logMessage);
-//
-// try (FileWriter fw = new FileWriter("Replica4_log.txt", true);
-// BufferedWriter bw = new BufferedWriter(fw);
-// PrintWriter out = new PrintWriter(bw)) {
-// out.println(logMessage);
-// } catch (IOException e) {
-// e.printStackTrace();
-// }
-// }
-//} \ No newline at end of file
diff --git a/src/main/java/derms/Replica4pkg/RemoteServer.java b/src/main/java/derms/Replica4pkg/RemoteServer.java
index a572dbe..6bc7411 100644
--- a/src/main/java/derms/Replica4pkg/RemoteServer.java
+++ b/src/main/java/derms/Replica4pkg/RemoteServer.java
@@ -5,25 +5,25 @@ import javax.xml.ws.Endpoint;
import java.util.*;
public class RemoteServer {
- private List<Endpoint> endpoints = new ArrayList<>();
-
- public RemoteServer() {
- try {
- Map<String, Integer> UDPPorts = new HashMap<>();
- UDPPorts.put("MTL", 4000);
- UDPPorts.put("QUE", 5000);
- UDPPorts.put("SHE", 6000);
-
- String[] serverNames = {"MTL", "QUE", "SHE"};
- int i = 0;
- for (String serverName : serverNames) {
- int UDPPort = UDPPorts.get(serverName);
- Server server = new Server();
- server.initServer(serverName, UDPPort, UDPPorts);
- int port = 8080 + i;
- String url = "http://localhost:" + port + "/DERMS/" + serverName;
- Endpoint endpoint = Endpoint.publish(url, server);
- endpoints.add(endpoint); // Keep track of the Endpoint
+ private static List<Endpoint> endpoints = new ArrayList<>();
+
+ public static void main(String[] args) {
+ try {
+ Map<String, Integer> UDPPorts = new HashMap<>();
+ UDPPorts.put("MTL", 4000);
+ UDPPorts.put("QUE", 5000);
+ UDPPorts.put("SHE", 6000);
+
+ String[] serverNames = {"MTL", "QUE", "SHE"};
+ int i = 0;
+ for (String serverName : serverNames) {
+ int UDPPort = UDPPorts.get(serverName);
+ Server server = new Server();
+ server.initServer(serverName, UDPPort, UDPPorts);
+ int port = 8080 + i;
+ String url = "http://localhost:" + port + "/DERMS/" + serverName;
+ Endpoint endpoint = Endpoint.publish(url, server);
+ endpoints.add(endpoint); // Keep track of the Endpoint
i++;
System.out.println(serverName + " Server ready and waiting ...");
}
diff --git a/src/main/java/derms/Replica4pkg/Replica4.java b/src/main/java/derms/Replica4pkg/Replica4.java
new file mode 100644
index 0000000..91bc303
--- /dev/null
+++ b/src/main/java/derms/Replica4pkg/Replica4.java
@@ -0,0 +1,578 @@
+package derms.Replica4pkg;
+
+import java.io.*;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.util.*;
+
+import javax.jws.WebMethod;
+
+import derms.Replica;
+import derms.ReplicaManager;
+import derms.Request;
+import derms.Response;
+import derms.util.TestLogger;
+import derms.util.ThreadPool;
+
+
+
+public class Replica4 implements Replica {
+
+ private ReplicaManager replicaManager;
+ private boolean alive = true;
+ private boolean byzFailure;
+
+ private String serverName;
+ private int udpPort;
+ private Map<String, Map<String, Resource>> resourceMap; //resourceName -> (resourceID -> Resource)
+ private Map<String, Set<String>> coordinatorResources; //coordinatorID -> Set<resourceID>
+ private Map<String, Map<String, Integer>> coordinatorResourceDurations; //coordinatorID -> (resourceID -> allocatedDuration)
+ private Map<String, Integer> serverUdpPorts; //serverName -> UDP port
+
+ public Replica4(ReplicaManager replicaManager){
+ this.replicaManager = replicaManager;
+ this.resourceMap = new HashMap<>();
+ this.coordinatorResources = new HashMap<>();
+ this.coordinatorResourceDurations = new HashMap<>();
+ startUDPListener();
+ }
+
+ @Override
+ public boolean isAlive() {
+ return alive;
+ }
+
+ @Override
+ public void startProcess(int byzantine, int crash) {
+ // [TEST] Detect crash
+ if (crash == 1) {
+ alive = false;
+ } else {
+ alive = true;
+ }
+
+ // [TEST] Detect byzantine failure
+ if (byzantine == 1) {
+ byzFailure = true;
+ } else {
+ byzFailure = false;
+ }
+
+ System.out.println("[Replica 4] Process started.");
+ }
+
+ @Override
+ public void processRequest(Request request) {
+ log(request.toString());
+
+ // [TEST] Simulate byzantine failure (return incorrect value)
+ if (byzFailure == true) {
+ Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false);
+ replicaManager.sendResponseToFE(response);
+ return;
+ }
+
+ String status = "";
+ try {
+ switch (request.getFunction()) {
+ case "addResource":
+ status = addResource(request.getResourceID(), request.getResourceType(), request.getDuration());
+ break;
+ case "removeResource":
+ status = removeResource(request.getResourceID(), request.getDuration());
+ break;
+ case "listResourceAvailability":
+ status = listResourceAvailability(request.getResourceType());
+ break;
+ case "requestResource":
+ status = requestResource(request.getClientID(), request.getResourceID(), request.getDuration());
+ break;
+ case "findResource":
+ status = findResource(request.getClientID(), request.getResourceType());
+ break;
+ case "returnResource":
+ status = returnResource(request.getClientID(), request.getResourceID());
+ break;
+ case "swapResource":
+ status = swapResource(request.getClientID(), request.getOldResourceID(), request.getOldResourceType(), request.getResourceID(), request.getResourceType());
+ break;
+ default:
+ status = "Failure: unknown function '" + request.getFunction() + "'";
+ }
+ } catch (Exception e) {
+ log(e.getMessage());
+ status = "Failure: " + request.getFunction() + ": " + e.getMessage();
+ }
+
+ Response response = new Response(request, replicaManager.getReplicaId(), status, false); // TODO: isSuccess flag
+ log("Processed request " + request + "; response: " + response);
+ replicaManager.sendResponseToFE(response);
+ }
+
+ @Override
+ public void restart() {
+ shutdown();
+
+ // [TEST] Restart process without byzantine failure or crash
+ TestLogger.log("REPLICA 2: {RESTARTED}");
+ startProcess(0, 0);
+ }
+
+ public void shutdown() {
+ log("Shutting down...");
+ alive = false;
+ log("Finished shutting down.");
+ }
+
+ @Override
+ public int getId() {
+ return 4;
+ }
+
+ public synchronized void log(String message) {
+ String logMessage = new Date() + " - " + message;
+ System.out.println(logMessage);
+
+ try (FileWriter fw = new FileWriter("Replica4_log.txt", true);
+ BufferedWriter bw = new BufferedWriter(fw);
+ PrintWriter out = new PrintWriter(bw)) {
+ out.println(logMessage);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @WebMethod(exclude = true)
+ public void initReplica4(String serverName, int udpPort, Map<String, Integer> serverUdpPorts) {
+ this.serverName = serverName;
+ this.udpPort = udpPort;
+ this.serverUdpPorts = serverUdpPorts;
+ }
+
+ //Start UDP Listener
+ private void startUDPListener() {
+ new Thread(() -> {
+ try (DatagramSocket socket = new DatagramSocket(udpPort)) {
+ byte[] buffer = new byte[4096];
+ System.out.println(serverName + " UDP Server started on port " + udpPort);
+
+ while (true) {
+ DatagramPacket request = new DatagramPacket(buffer, buffer.length);
+ socket.receive(request);
+ new Thread(() -> handleUDPRequest(request)).start();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }).start();
+ }
+
+ //Handle UDP Request
+ private void handleUDPRequest(DatagramPacket requestPacket) {
+ try {
+ String requestData = new String(requestPacket.getData(), 0, requestPacket.getLength());
+ String[] parts = requestData.split(":");
+ String methodName = parts[0];
+ String[] params = Arrays.copyOfRange(parts, 1, parts.length);
+
+ String responseData = "";
+
+ switch (methodName) {
+ case "listResourceAvailability":
+ responseData = handleListResourceAvailability(params[0]);
+ break;
+ case "requestResource":
+ responseData = handleLocalResourceRequest(params[0], params[1], Integer.parseInt(params[2]));
+ break;
+ case "findResource":
+ responseData = handleFindResource(params[0], params[1]);
+ break;
+ case "returnResource":
+ responseData = handleReturnResource(params[0], params[1]);
+ break;
+ case "swapRequestResource":
+ responseData = handleSwapRequestResource(params[0], params[1], Integer.parseInt(params[2]));
+ break;
+ case "swapReturnResource":
+ responseData = handleSwapReturnResource(params[0], params[1]);
+ break;
+ case "checkCoordinatorResource":
+ responseData = checkCoordinatorHasResource(params[0], params[1]);
+ break;
+ case "getCoordinatorResourceDuration":
+ int duration = getCoordinatorResourceDuration(params[0], params[1]);
+ responseData = String.valueOf(duration);
+ break;
+ default:
+ responseData = "Invalid method";
+ }
+
+ //Send response
+ byte[] responseBytes = responseData.getBytes();
+ DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length, requestPacket.getAddress(), requestPacket.getPort());
+ DatagramSocket socket = new DatagramSocket();
+ socket.send(responsePacket);
+ socket.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ //Operation Handlers
+ private synchronized String handleListResourceAvailability(String resourceName) {
+ StringBuilder result = new StringBuilder();
+ resourceName = resourceName.toUpperCase();
+ Map<String, Resource> resources = resourceMap.get(resourceName);
+
+ if (resources != null) {
+ for (Resource resource : resources.values()) {
+ if (resource.getDuration() > 0) {
+ result.append(resource.getResourceName()).append(" - ").append(serverName).append(" ").append(resource.getDuration()).append("\n");
+ }
+ }
+ }
+ return result.toString();
+ }
+
+ private synchronized String handleLocalResourceRequest(String coordinatorID, String resourceID, int duration) {
+ for (Map<String, Resource> resources : resourceMap.values()) {
+ Resource resource = resources.get(resourceID);
+ if (resource != null) {
+ if (resource.getDuration() >= duration) {
+ resource.subDuration(duration);
+ coordinatorResources.computeIfAbsent(coordinatorID, k -> new HashSet<>()).add(resourceID);
+ coordinatorResourceDurations.computeIfAbsent(coordinatorID, k -> new HashMap<>()).put(resourceID, duration);
+
+ log("Resource " + resourceID + " allocated to " + coordinatorID + " for duration " + duration);
+ return "Resource " + resourceID + " allocated successfully.";
+ } else {
+ return "Insufficient duration for resource " + resourceID;
+ }
+ }
+ }
+ return "Resource " + resourceID + " not found.";
+ }
+
+ private synchronized String handleFindResource(String coordinatorID, String resourceName) {
+ StringBuilder result = new StringBuilder();
+ resourceName = resourceName.toUpperCase();
+ Set<String> resources = coordinatorResources.get(coordinatorID);
+ if (resources != null) {
+ Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID);
+ if (resourceDurations != null) {
+ for (String resourceID : resources) {
+ Resource resource = getResourceByID(resourceID);
+ if (resource != null && resource.getResourceName().equals(resourceName)) {
+ Integer allocatedDuration = resourceDurations.get(resourceID);
+ if (allocatedDuration != null) {
+ result.append(resource.getResourceName()).append(" - ").append(resourceID).append(" ").append(allocatedDuration).append("\n");
+ }
+ }
+ }
+ }
+ }
+ return result.toString();
+ }
+
+ private synchronized String handleReturnResource(String coordinatorID, String resourceID) {
+ Set<String> resources = coordinatorResources.get(coordinatorID);
+ if (resources != null && resources.contains(resourceID)) {
+ resources.remove(resourceID);
+ if (resources.isEmpty()) {
+ coordinatorResources.remove(coordinatorID);
+ }
+ Resource resource = getResourceByID(resourceID);
+ if (resource != null) {
+ Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID);
+ if (resourceDurations != null) {
+ Integer allocatedDuration = resourceDurations.remove(resourceID);
+ if (allocatedDuration != null) {
+ resource.addDuration(allocatedDuration);
+ }
+ if (resourceDurations.isEmpty()) {
+ coordinatorResourceDurations.remove(coordinatorID);
+ }
+ }
+ log("Coordinator " + coordinatorID + " returned resource " + resourceID);
+ return "Resource " + resourceID + " returned successfully.";
+ } else {
+ return "Resource " + resourceID + " not found.";
+ }
+ } else {
+ return "You do not occupy resource " + resourceID + ".";
+ }
+ }
+
+ //Helper Methods
+ private Resource getResourceByID(String resourceID) {
+ for (Map<String, Resource> resources : resourceMap.values()) {
+ if (resources.containsKey(resourceID)) {
+ return resources.get(resourceID);
+ }
+ }
+ return null;
+ }
+
+ private synchronized String handleSwapRequestResource(String coordinatorID, String resourceID, int duration) {
+ //Attempt to allocate the resource to the coordinator
+ for (Map<String, Resource> resources : resourceMap.values()) {
+ Resource resource = resources.get(resourceID);
+ if (resource != null) {
+ if (resource.getDuration() >= duration) {
+ resource.subDuration(duration);
+ coordinatorResources.computeIfAbsent(coordinatorID, k -> new HashSet<>()).add(resourceID);
+ coordinatorResourceDurations.computeIfAbsent(coordinatorID, k -> new HashMap<>()).put(resourceID, duration);
+ log("Resource " + resourceID + " temporarily allocated to " + coordinatorID + " for swapping, duration " + duration);
+ return "Success";
+ } else {
+ return "Insufficient duration for resource " + resourceID;
+ }
+ }
+ }
+ return "Resource " + resourceID + " not found.";
+ }
+
+ private synchronized String handleSwapReturnResource(String coordinatorID, String resourceID) {
+ Set<String> resources = coordinatorResources.get(coordinatorID);
+ if (resources != null && resources.contains(resourceID)) {
+ resources.remove(resourceID);
+ if (resources.isEmpty()) {
+ coordinatorResources.remove(coordinatorID);
+ }
+ Resource resource = getResourceByID(resourceID);
+ if (resource != null) {
+ Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID);
+ if (resourceDurations != null) {
+ Integer allocatedDuration = resourceDurations.remove(resourceID);
+ if (allocatedDuration != null) {
+ resource.addDuration(allocatedDuration);
+ }
+ if (resourceDurations.isEmpty()) {
+ coordinatorResourceDurations.remove(coordinatorID);
+ }
+ }
+ log("Coordinator " + coordinatorID + " resource " + resourceID + " allocation undone during swap");
+ return "Success";
+ } else {
+ return "Resource " + resourceID + " not found.";
+ }
+ } else {
+ return "Coordinator did not acquire resource " + resourceID;
+ }
+ }
+
+ private String sendUDPRequest(String serverName, String methodName, String... params) {
+ String response = "";
+ try {
+ InetAddress host = InetAddress.getByName("localhost");
+ int port = serverUdpPorts.get(serverName);
+ String requestData = methodName + ":" + String.join(":", params);
+
+ byte[] requestBytes = requestData.getBytes();
+ DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length, host, port);
+
+ DatagramSocket socket = new DatagramSocket();
+ socket.send(requestPacket);
+
+ byte[] buffer = new byte[4096];
+ DatagramPacket responsePacket = new DatagramPacket(buffer, buffer.length);
+ socket.receive(responsePacket);
+
+ response = new String(responsePacket.getData(), 0, responsePacket.getLength());
+ socket.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return response;
+ }
+
+ //Implement ResponderInterface Methods
+ public synchronized String addResource(String resourceID, String resourceName, int duration) {
+ resourceName = resourceName.toUpperCase();
+ Map<String, Resource> resources = resourceMap.computeIfAbsent(resourceName, k -> new HashMap<>());
+ Resource resource = resources.get(resourceID);
+
+ if (resource != null) {
+ if (duration > resource.getDuration()) {
+ resource.setDuration(duration);
+ log("Updated duration of resource " + resourceID);
+ return "Resource " + resourceID + " duration updated.";
+ } else {
+ log("Resource " + resourceID + " already exists with greater or equal duration.");
+ return "Resource " + resourceID + " already exists with greater or equal duration.";
+ }
+ } else {
+ resource = new Resource(resourceName, duration);
+ resources.put(resourceID, resource);
+ log("Added new resource " + resourceID);
+ return "Resource " + resourceID + " added successfully.";
+ }
+ }
+
+ public synchronized String removeResource(String resourceID, int duration) {
+ for (Map<String, Resource> resources : resourceMap.values()) {
+ Resource resource = resources.get(resourceID);
+ if (resource != null) {
+ if (duration >= resource.getDuration()) {
+ resources.remove(resourceID);
+ log("Resource " + resourceID + " completely removed.");
+ return "Resource " + resourceID + " completely removed.";
+ } else {
+ resource.subDuration(duration);
+ log("Decreased duration of resource " + resourceID);
+ return "Resource " + resourceID + " duration decreased by " + duration + ".";
+ }
+ }
+ }
+ log("Resource " + resourceID + " not found.");
+ return "Resource " + resourceID + " not found.";
+ }
+
+ public String listResourceAvailability(String resourceName) {
+ resourceName = resourceName.toUpperCase();
+ StringBuilder result = new StringBuilder();
+
+ synchronized (this) {
+ Map<String, Resource> resources = resourceMap.get(resourceName);
+ if (resources != null) {
+ for (Resource resource : resources.values()) {
+ if (resource.getDuration() > 0) {
+ result.append(resource.getResourceName()).append(" - ").append(serverName).append(" ").append(resource.getDuration()).append("\n");
+ }
+ }
+ }
+ }
+
+ for (String otherServer : serverUdpPorts.keySet()) {
+ if (!otherServer.equals(serverName)) {
+ String response = sendUDPRequest(otherServer, "listResourceAvailability", resourceName);
+ result.append(response);
+ }
+ }
+
+ return result.toString();
+ }
+
+ public String requestResource(String coordinatorID, String resourceID, int duration) {
+ String resourceServer = resourceID.substring(0, 3).toUpperCase();
+ String response;
+
+ if (resourceServer.equals(serverName)) {
+ response = handleLocalResourceRequest(coordinatorID, resourceID, duration);
+ } else {
+ response = sendUDPRequest(resourceServer, "requestResource", coordinatorID, resourceID, String.valueOf(duration));
+ }
+
+ log("Coordinator " + coordinatorID + " requested resource " + resourceID + ": " + response);
+ return response;
+ }
+
+ public String findResource(String coordinatorID, String resourceName) {
+ resourceName = resourceName.toUpperCase();
+ StringBuilder result = new StringBuilder();
+
+ for (String otherServer : serverUdpPorts.keySet()) {
+ String response;
+ if (otherServer.equals(serverName)) {
+ response = handleFindResource(coordinatorID, resourceName);
+ } else {
+ response = sendUDPRequest(otherServer, "findResource", coordinatorID, resourceName);
+ }
+ result.append(response);
+ }
+
+ return result.toString();
+ }
+
+ public String returnResource(String coordinatorID, String resourceID) {
+ String resourceServer = resourceID.substring(0, 3).toUpperCase();
+ String response;
+
+ if (resourceServer.equals(serverName)) {
+ response = handleReturnResource(coordinatorID, resourceID);
+ } else {
+ response = sendUDPRequest(resourceServer, "returnResource", coordinatorID, resourceID);
+ }
+
+ log("Coordinator " + coordinatorID + " returned resource " + resourceID + ": " + response);
+ return response;
+ }
+
+ private synchronized int getCoordinatorResourceDuration(String coordinatorID, String resourceID) {
+ Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID);
+ if (resourceDurations != null && resourceDurations.containsKey(resourceID)) {
+ return resourceDurations.get(resourceID);
+ }
+ return 0;
+ }
+
+ private synchronized String checkCoordinatorHasResource(String coordinatorID, String resourceID) {
+ Set<String> resources = coordinatorResources.get(coordinatorID);
+ if (resources != null && resources.contains(resourceID)) {
+ return "true";
+ }
+ return "false";
+ }
+
+ public String swapResource(String coordinatorID, String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) {
+ String oldResourceServer = oldResourceID.substring(0, 3).toUpperCase();
+ String newResourceServer = newResourceID.substring(0, 3).toUpperCase();
+
+ //Check if the coordinator has the old resource
+ String checkOldResourceResponse;
+ if (oldResourceServer.equals(serverName)) {
+ checkOldResourceResponse = checkCoordinatorHasResource(coordinatorID, oldResourceID);
+ } else {
+ checkOldResourceResponse = sendUDPRequest(oldResourceServer, "checkCoordinatorResource", coordinatorID, oldResourceID);
+ }
+
+ if (!checkOldResourceResponse.equals("true")) {
+ return "Coordinator has not acquired the old resource.";
+ }
+
+ //Get the duration allocated to the old resource
+ int duration;
+ if (oldResourceServer.equals(serverName)) {
+ duration = getCoordinatorResourceDuration(coordinatorID, oldResourceID);
+ } else {
+ String durationStr = sendUDPRequest(oldResourceServer, "getCoordinatorResourceDuration", coordinatorID, oldResourceID);
+ duration = Integer.parseInt(durationStr);
+ }
+
+ //Attempt to acquire the new resource
+ String requestResponse;
+ if (newResourceServer.equals(serverName)) {
+ requestResponse = handleLocalResourceRequest(coordinatorID, newResourceID, duration);
+ } else {
+ requestResponse = sendUDPRequest(newResourceServer, "requestResource", coordinatorID, newResourceID, String.valueOf(duration));
+ }
+
+ if (!requestResponse.contains("allocated successfully")) {
+ return "Failed to acquire new resource: " + requestResponse;
+ }
+
+ //Return the old resource
+ String returnResponse;
+ if (oldResourceServer.equals(serverName)) {
+ returnResponse = handleReturnResource(coordinatorID, oldResourceID);
+ } else {
+ returnResponse = sendUDPRequest(oldResourceServer, "returnResource", coordinatorID, oldResourceID);
+ }
+
+ if (!returnResponse.contains("returned successfully")) {
+ //Undo the allocation of the new resource
+ if (newResourceServer.equals(serverName)) {
+ handleReturnResource(coordinatorID, newResourceID);
+ } else {
+ sendUDPRequest(newResourceServer, "returnResource", coordinatorID, newResourceID);
+ }
+ return "Failed to return old resource: " + returnResponse;
+ }
+
+ log("Coordinator " + coordinatorID + " swapped resource " + oldResourceID + " with " + newResourceID);
+
+ return "Resource swap successful.";
+ }
+}
+
diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java
index 6ff5de2..a3008de 100644
--- a/src/main/java/derms/ReplicaManager.java
+++ b/src/main/java/derms/ReplicaManager.java
@@ -10,6 +10,7 @@ import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;
import derms.replica1.Replica1;
import derms.replica2.Replica2;
+import derms.util.*;
import java.io.IOException;
import java.net.InetAddress;
@@ -22,7 +23,7 @@ import java.util.Objects;
import java.util.logging.Logger;
public class ReplicaManager {
- public static final String usage = "Usage: java ReplicaManager <replicaId> <city> <frontEndIP>";
+ public static final String usage = "Usage: java ReplicaManager <replicaId> <city> <replicaManagerIP> <frontEndIP> <byzantine(0 or 1)> <crash(0 or 1)>";
private final int replicaId;
private final String city;
private Replica replica;
@@ -32,13 +33,13 @@ public class ReplicaManager {
private ReliableUnicastSender<Response> unicastSender;
private TotalOrderMulticastReceiver multicastReceiver;
- public ReplicaManager(int replicaId, String city, InetAddress frontEndIP) throws IOException {
+ public ReplicaManager(int replicaId, String city, InetAddress replicaManagerIP, InetAddress frontEndIP, int byzantine, int crash) throws IOException {
this.replicaId = replicaId;
this.city = city;
this.log = Logger.getLogger(getClass().getName());
initUnicastSender(frontEndIP);
- initReplica();
- initMulticastReceiver();
+ initReplica(byzantine, crash);
+ initMulticastReceiver(replicaManagerIP);
startHeartbeatThread();
}
@@ -48,7 +49,7 @@ public class ReplicaManager {
unicastSender = new ReliableUnicastSender<>(frontEndAddress);
}
- private void initReplica() throws IOException {
+ private void initReplica(int byzantine, int crash) throws IOException {
switch (replicaId) {
case 1:
replica = new Replica1(this);
@@ -63,14 +64,27 @@ public class ReplicaManager {
replica = new derms.replica2.Replica2(city, this);
break;
}
- replica.startProcess();
+
+ // [TEST] Logging
+ if (byzantine == 0) {
+ TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: FALSE}");
+ } else {
+ TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: TRUE}");
+ }
+
+ if (crash == 0) {
+ TestLogger.log("REPLICA " + replicaId + ": {CRASH: FALSE}");
+ } else {
+ TestLogger.log("REPLICA " + replicaId + ": {CRASH: TRUE}");
+ }
+
+ replica.startProcess(byzantine, crash);
}
- private void initMulticastReceiver() throws IOException {
+ private void initMulticastReceiver(InetAddress replicaManagerIP) throws IOException {
InetSocketAddress group = Config.group;
- InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
- NetworkInterface netInterface = NetworkInterface.getByInetAddress(localAddress);
- multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress, netInterface);
+ NetworkInterface netInterface = NetworkInterface.getByInetAddress(replicaManagerIP);
+ multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, replicaManagerIP, netInterface);
new Thread(() -> {
while (true) {
@@ -97,8 +111,12 @@ public class ReplicaManager {
new Thread(() -> {
while (true) {
if (!replica.isAlive()) {
+ // [TEST] Logging
+ TestLogger.log("REPLICA " + replicaId + ": {CRASH: DETECTED}");
+
informFrontEndRmIsDown(replica.getId());
replica.restart();
+ //TestLogger.log("REPLICA " + replicaId + ": {RESTARTED}");
}
try {
Thread.sleep(5000); // Example 5 seconds.
@@ -119,8 +137,13 @@ public class ReplicaManager {
public void handleByzantineFailure() {
log.severe("Byzantine failure detected in Replica " + replica.getId());
+
+ // [TEST] Logging
+ TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: DETECTED}");
+
replica.restart();
informFrontEndRmHasBug(replica.getId());
+ //TestLogger.log("REPLICA " + replicaId + ": {RESTARTED}");
}
private void informFrontEndRmIsDown(int replicaId) {
@@ -129,6 +152,7 @@ public class ReplicaManager {
out.writeObject("RM_DOWN:" + replicaId);
} catch (IOException e) {
log.severe("Failed to inform FE that RM is down: " + e.getMessage());
+ TestLogger.log("[FAILED TO INFORM FE (RM IS DOWN)]");
}
}
@@ -138,11 +162,12 @@ public class ReplicaManager {
out.writeObject("RM_BUG:" + replicaId);
} catch (IOException e) {
log.severe("Failed to inform FE that RM has a bug: " + e.getMessage());
+ TestLogger.log("[FAILED TO INFORM FE (RM HAS A BUG)]");
}
}
public static void main(String[] args) {
- if (args.length < 3) {
+ if (args.length < 4) {
System.err.println(usage);
System.exit(1);
}
@@ -150,11 +175,15 @@ public class ReplicaManager {
try {
int replicaId = Integer.parseInt(args[0]);
String city = args[1];
- InetAddress frontEndIP = InetAddress.getByName(args[2]);
- ReplicaManager replicaManager = new ReplicaManager(replicaId, city, frontEndIP);
+ InetAddress replicaManagerIP = InetAddress.getByName(args[2]);
+ InetAddress frontEndIP = InetAddress.getByName(args[3]);
+ int byzantine = Integer.parseInt(args[4]);
+ int crash = Integer.parseInt(args[5]);
+ ReplicaManager replicaManager = new ReplicaManager(replicaId, city, replicaManagerIP, frontEndIP, byzantine, crash);
System.out.println("ReplicaManager " + replicaId + " is running.");
} catch (IOException e) {
System.err.println("Failed to start ReplicaManager: " + e.getMessage());
+ TestLogger.log("[FAILED TO START RM]");
e.printStackTrace();
}
}
diff --git a/src/main/java/derms/client/CoordinatorClient.java b/src/main/java/derms/client/CoordinatorClient.java
index 570d76b..40d8a94 100644
--- a/src/main/java/derms/client/CoordinatorClient.java
+++ b/src/main/java/derms/client/CoordinatorClient.java
@@ -4,116 +4,28 @@ import derms.frontend.DERMSInterface;
import java.net.MalformedURLException;
-public class CoordinatorClient extends CLI {
- public static final String usage = "Usage: java derms.client.CoordinatorClient <coordinator ID> <FE host>";
-
- private final String coordinatorID;
+public class CoordinatorClient {
private final DERMSInterface server;
+ private final String coordinatorID;
- private CoordinatorClient(String coordinatorID, String FEhost) throws MalformedURLException {
- this.coordinatorID = coordinatorID;
+ public CoordinatorClient(String coordinatorID, String FEhost) throws MalformedURLException {
this.server = Client.connectToServer(FEhost);
-
- commands.put("request", new Request());
- cmdDescriptions.add(new Description(
- "request <resource ID> <duration>",
- "Borrow a resource."));
-
- commands.put("find", new Find());
- cmdDescriptions.add(new Description(
- "find <resource name>",
- "List borrowed resources."));
-
- commands.put("return", new Return());
- cmdDescriptions.add(new Description(
- "return <resource ID>",
- "Return a borrowed resource."));
-
- commands.put("swap", new Swap());
- cmdDescriptions.add(new Description(
- "swap <old resource ID> <old resource type> <new resource ID> <new resource type>",
- "Return the old resource and borrow the new one."));
- }
-
- public static void main(String[] args) {
- if (args.length < 2) {
- System.err.println(usage);
- System.exit(1);
- }
-
- String coordinatorID = args[0];
- String FEhost = args[1];
-
- try {
- (new CoordinatorClient(coordinatorID, FEhost)).run();
- } catch (MalformedURLException e) {
- e.printStackTrace();
- }
+ this.coordinatorID = coordinatorID;
}
- private class Request implements Command {
- @Override
- public void exec(String[] args) {
- if (args.length < 2)
- System.out.println("invalid arguments for 'request'");
- else
- request(args[0], args[1]);
- }
-
- private void request(String resourceID, String durationStr) {
- try {
- int duration = Integer.parseInt(durationStr);
- if (duration < 0)
- throw new NumberFormatException("duration less than 0");
- String response = server.requestResource(coordinatorID, resourceID, duration);
- System.out.println(response);
- } catch (NumberFormatException e) {
- System.out.println("invalid duration: " + e.getMessage());
- }
- }
+ public String requestResource(String resourceID, int duration) {
+ return server.requestResource(coordinatorID, resourceID, duration);
}
- private class Find implements Command {
- @Override
- public void exec(String[] args) {
- if (args.length < 1)
- System.out.println("invalid arguments for 'find'");
- else find(args[0]);
- }
-
- private void find(String resourceID) {
- String response = server.findResource(coordinatorID, resourceID);
- System.out.println(response);
- }
+ public String findResource(String resourceName) {
+ return server.findResource(coordinatorID, resourceName);
}
- private class Return implements Command {
- @Override
- public void exec(String[] args) {
- if (args.length < 1)
- System.out.println("invalid arguments for 'return'");
- else
- returnResource(args[0]);
- }
-
- private void returnResource(String resourceID) {
- String response = server.returnResource(coordinatorID, resourceID);
- System.out.println(response);
- }
+ public String returnResource(String resourceID) {
+ return server.returnResource(coordinatorID, resourceID);
}
- private class Swap implements Command {
- @Override
- public void exec(String[] args) {
- if (args.length < 4)
- System.out.println("invalid arguments for 'swap'");
- else
- swap(args[0], args[1], args[2], args[3]);
- }
-
- private void swap(String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) {
- String response = server.swapResource(coordinatorID, oldResourceID, oldResourceType, newResourceID, newResourceType);
- System.out.println(response);
- }
+ public String swapResource(String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) {
+ return server.swapResource(coordinatorID, oldResourceID, oldResourceType, newResourceID, newResourceType);
}
}
diff --git a/src/main/java/derms/client/CoordinatorClientCLI.java b/src/main/java/derms/client/CoordinatorClientCLI.java
new file mode 100644
index 0000000..bddc6e6
--- /dev/null
+++ b/src/main/java/derms/client/CoordinatorClientCLI.java
@@ -0,0 +1,119 @@
+package derms.client;
+
+import derms.frontend.DERMSInterface;
+
+import java.net.MalformedURLException;
+
+public class CoordinatorClientCLI extends CLI {
+ public static final String usage = "Usage: java derms.client.CoordinatorClient <coordinator ID> <FE host>";
+
+ private final String coordinatorID;
+ private final DERMSInterface server;
+
+ private CoordinatorClientCLI(String coordinatorID, String FEhost) throws MalformedURLException {
+ this.coordinatorID = coordinatorID;
+ this.server = Client.connectToServer(FEhost);
+
+ commands.put("request", new Request());
+ cmdDescriptions.add(new CLI.Description(
+ "request <resource ID> <duration>",
+ "Borrow a resource."));
+
+ commands.put("find", new Find());
+ cmdDescriptions.add(new CLI.Description(
+ "find <resource name>",
+ "List borrowed resources."));
+
+ commands.put("return", new Return());
+ cmdDescriptions.add(new CLI.Description(
+ "return <resource ID>",
+ "Return a borrowed resource."));
+
+ commands.put("swap", new Swap());
+ cmdDescriptions.add(new CLI.Description(
+ "swap <old resource ID> <old resource type> <new resource ID> <new resource type>",
+ "Return the old resource and borrow the new one."));
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+ String coordinatorID = args[0];
+ String FEhost = args[1];
+
+ try {
+ (new CoordinatorClientCLI(coordinatorID, FEhost)).run();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private class Request implements CLI.Command {
+ @Override
+ public void exec(String[] args) {
+ if (args.length < 2)
+ System.out.println("invalid arguments for 'request'");
+ else
+ request(args[0], args[1]);
+ }
+
+ private void request(String resourceID, String durationStr) {
+ try {
+ int duration = Integer.parseInt(durationStr);
+ if (duration < 0)
+ throw new NumberFormatException("duration less than 0");
+ String response = server.requestResource(coordinatorID, resourceID, duration);
+ System.out.println(response);
+ } catch (NumberFormatException e) {
+ System.out.println("invalid duration: " + e.getMessage());
+ }
+ }
+ }
+
+ private class Find implements CLI.Command {
+ @Override
+ public void exec(String[] args) {
+ if (args.length < 1)
+ System.out.println("invalid arguments for 'find'");
+ else find(args[0]);
+ }
+
+ private void find(String resourceID) {
+ String response = server.findResource(coordinatorID, resourceID);
+ System.out.println(response);
+ }
+ }
+
+ private class Return implements CLI.Command {
+ @Override
+ public void exec(String[] args) {
+ if (args.length < 1)
+ System.out.println("invalid arguments for 'return'");
+ else
+ returnResource(args[0]);
+ }
+
+ private void returnResource(String resourceID) {
+ String response = server.returnResource(coordinatorID, resourceID);
+ System.out.println(response);
+ }
+ }
+
+ private class Swap implements CLI.Command {
+ @Override
+ public void exec(String[] args) {
+ if (args.length < 4)
+ System.out.println("invalid arguments for 'swap'");
+ else
+ swap(args[0], args[1], args[2], args[3]);
+ }
+
+ private void swap(String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) {
+ String response = server.swapResource(coordinatorID, oldResourceID, oldResourceType, newResourceID, newResourceType);
+ System.out.println(response);
+ }
+ }
+}
diff --git a/src/main/java/derms/client/ResponderClient.java b/src/main/java/derms/client/ResponderClient.java
index 23361d7..6834c76 100644
--- a/src/main/java/derms/client/ResponderClient.java
+++ b/src/main/java/derms/client/ResponderClient.java
@@ -1,103 +1,33 @@
package derms.client;
import derms.frontend.DERMSInterface;
+import derms.util.TestLogger;
import java.net.MalformedURLException;
+import java.util.Objects;
-public class ResponderClient extends CLI {
- public static final String usage = "Usage: java derms.client.ResponderClient <FE host>";
-
+public class ResponderClient {
private final DERMSInterface server;
- private ResponderClient(String FEhost) throws MalformedURLException {
+ public ResponderClient(String FEhost) throws MalformedURLException {
server = Client.connectToServer(FEhost);
-
- commands.put("add", new Add());
- cmdDescriptions.add(new Description(
- "add <resource ID> <resource type> <duration>",
- "Add ad resource to the server"));
-
- commands.put("remove", new Remove());
- cmdDescriptions.add(new Description(
- "remove <resource ID> <duration>",
- "Decrease the duration of a resource. If duration is negative, the resource is removed entirely."));
-
- commands.put("list", new List());
- cmdDescriptions.add(new Description(
- "list <resource name>",
- "List available resources"));
- }
-
- public static void main(String[] args) {
- if (args.length < 1) {
- System.err.println(usage);
- System.exit(1);
- }
-
- String FEhost = args[0];
-
- try {
- (new ResponderClient(FEhost)).run();
- } catch (MalformedURLException e) {
- e.printStackTrace();
- }
}
- private class Add implements Command {
- @Override
- public void exec(String[] args) {
- if (args.length < 3)
- System.out.println("invalid arguments for 'add'");
- else
- add(args[0], args[1], args[2]);
- }
-
- private void add(String resourceID, String resourceName, String durationStr) {
- try {
- int duration = Integer.parseInt(durationStr);
- if (duration < 0) {
- throw new NumberFormatException("duration less than 0");
- }
- String response = server.addResource(resourceID, resourceName, duration);
- System.out.println(response);
- } catch (NumberFormatException e) {
- System.out.println("invalid duration: " + durationStr);
- }
+ public String addResource(String resourceID, String resourceName, int duration) {
+ String res = server.addResource(resourceID, resourceName, duration);
+ if (res.contains("Fail")) {
+ TestLogger.log("[FAILED: " + res + "]");
+ } else {
+ TestLogger.log("[SUCCESS: " + res + "]");
}
+ return res;
}
- private class Remove implements Command {
- @Override
- public void exec(String[] args) {
- if (args.length < 2)
- System.out.println("invalid arguments for 'remove'");
- else
- remove(args[0], args[1]);
- }
-
- private void remove(String resourceID, String durationStr) {
- try {
- int duration = Integer.parseInt(durationStr);
- String response = server.removeResource(resourceID, duration);
- System.out.println(response);
- } catch (NumberFormatException e) {
- System.out.println("invalid duration: " + durationStr);
- }
- }
+ public String removeResource(String resourceID, int duration) {
+ return server.removeResource(resourceID, duration);
}
- private class List implements Command {
- @Override
- public void exec(String[] args) {
- if (args.length < 1)
- System.out.println("invalid arguments for 'list'");
- else
- list(args[0]);
- }
-
- private void list(String resourceName) {
- String response = server.listResourceAvailability(resourceName);
- System.out.println(response);
- }
+ public String listResourceAvailability(String resourceName) {
+ return server.listResourceAvailability(resourceName);
}
}
diff --git a/src/main/java/derms/client/ResponderClientCLI.java b/src/main/java/derms/client/ResponderClientCLI.java
new file mode 100644
index 0000000..de046d5
--- /dev/null
+++ b/src/main/java/derms/client/ResponderClientCLI.java
@@ -0,0 +1,104 @@
+package derms.client;
+
+import derms.frontend.DERMSInterface;
+import derms.util.TestLogger;
+
+import java.net.MalformedURLException;
+
+public class ResponderClientCLI extends CLI {
+ public static final String usage = "Usage: java derms.client.ResponderClient <FE host>";
+
+ private final DERMSInterface server;
+
+ private ResponderClientCLI(String FEhost) throws MalformedURLException {
+ server = Client.connectToServer(FEhost);
+
+ commands.put("add", new Add());
+ cmdDescriptions.add(new CLI.Description(
+ "add <resource ID> <resource type> <duration>",
+ "Add ad resource to the server"));
+
+ commands.put("remove", new Remove());
+ cmdDescriptions.add(new CLI.Description(
+ "remove <resource ID> <duration>",
+ "Decrease the duration of a resource. If duration is negative, the resource is removed entirely."));
+
+ commands.put("list", new List());
+ cmdDescriptions.add(new CLI.Description(
+ "list <resource name>",
+ "List available resources"));
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+ String FEhost = args[0];
+
+ try {
+ (new ResponderClientCLI(FEhost)).run();
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private class Add implements CLI.Command {
+ @Override
+ public void exec(String[] args) {
+ if (args.length < 3)
+ System.out.println("invalid arguments for 'add'");
+ else
+ add(args[0], args[1], args[2]);
+ }
+
+ private void add(String resourceID, String resourceName, String durationStr) {
+ try {
+ int duration = Integer.parseInt(durationStr);
+ if (duration < 0) {
+ throw new NumberFormatException("duration less than 0");
+ }
+ String response = server.addResource(resourceID, resourceName, duration);
+ System.out.println(response);
+ } catch (NumberFormatException e) {
+ System.out.println("invalid duration: " + durationStr);
+ }
+ }
+ }
+
+ private class Remove implements CLI.Command {
+ @Override
+ public void exec(String[] args) {
+ if (args.length < 2)
+ System.out.println("invalid arguments for 'remove'");
+ else
+ remove(args[0], args[1]);
+ }
+
+ private void remove(String resourceID, String durationStr) {
+ try {
+ int duration = Integer.parseInt(durationStr);
+ String response = server.removeResource(resourceID, duration);
+ System.out.println(response);
+ } catch (NumberFormatException e) {
+ System.out.println("invalid duration: " + durationStr);
+ }
+ }
+ }
+
+ private class List implements CLI.Command {
+ @Override
+ public void exec(String[] args) {
+ if (args.length < 1)
+ System.out.println("invalid arguments for 'list'");
+ else
+ list(args[0]);
+ }
+
+ private void list(String resourceName) {
+ String response = server.listResourceAvailability(resourceName);
+ System.out.println(response);
+ }
+ }
+}
diff --git a/src/main/java/derms/frontend/DERMSServerImpl.java b/src/main/java/derms/frontend/DERMSServerImpl.java
index 1877cef..2a5dfa9 100644
--- a/src/main/java/derms/frontend/DERMSServerImpl.java
+++ b/src/main/java/derms/frontend/DERMSServerImpl.java
@@ -5,10 +5,6 @@ package derms.frontend;
//import model.Resource;
import java.util.*;
import java.util.stream.Collectors;
-import java.util.Optional;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -23,6 +19,7 @@ import javax.jws.soap.SOAPBinding.Style;
//import interfaces.DERMSInterface;
import derms.Request;
import derms.Response;
+import derms.util.TestLogger;
@WebService(endpointInterface = "derms.frontend.DERMSInterface")
public class DERMSServerImpl implements DERMSInterface {
diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java
index f29459c..b3e100f 100644
--- a/src/main/java/derms/frontend/FE.java
+++ b/src/main/java/derms/frontend/FE.java
@@ -10,6 +10,7 @@ import derms.Request;
import derms.Response;
import derms.net.runicast.ReliableUnicastReceiver;
import derms.net.runicast.ReliableUnicastSender;
+import derms.util.TestLogger;
import java.util.ArrayList;
import java.util.List;
@@ -51,6 +52,7 @@ public class FE {
System.out.println("Rm:" + RmNumber + "has bug");
// sendMulticastFaultMessageToRms(errorMessage);
sendUnicastToSequencer(errorMessage);
+ //TestLogger.log("FE: {BYZANTINE: INFORM REPLICA" + RmNumber + "}");
}
@Override
@@ -60,6 +62,7 @@ public class FE {
System.out.println("Rm:" + RmNumber + "is down");
// sendMulticastFaultMessageToRms(errorMessage);
sendUnicastToSequencer(errorMessage);
+ //TestLogger.log("FE: {CRASH: INFORM REPLICA" + RmNumber + "}");
}
@Override
diff --git a/src/main/java/derms/replica1/DERMSServerPublisher.java b/src/main/java/derms/replica1/DERMSServerPublisher.java
index 44ee879..c94cb34 100644
--- a/src/main/java/derms/replica1/DERMSServerPublisher.java
+++ b/src/main/java/derms/replica1/DERMSServerPublisher.java
@@ -3,8 +3,27 @@ package derms.replica1;
import javax.xml.ws.Endpoint;
public class DERMSServerPublisher {
+
+ private static Endpoint[] endpoints = new Endpoint[3];
+
public static void main(String[] args) {
// try {
+// endpoints[0] = Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL"));
+// endpoints[1] = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
+// endpoints[2] = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
+// } catch (InterruptedException e) {
+// throw new RuntimeException(e);
+// }
+ }
+
+ public static void stop() {
+ for (Endpoint endpoint : endpoints) {
+ if (endpoint != null && endpoint.isPublished()) {
+ endpoint.stop();
+ System.out.println("DERMS Server is stopped.");
+ }
+ }
+// try {
// Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL"));
// Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
// Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
diff --git a/src/main/java/derms/replica1/Replica1.java b/src/main/java/derms/replica1/Replica1.java
index 6863d5c..1520b73 100644
--- a/src/main/java/derms/replica1/Replica1.java
+++ b/src/main/java/derms/replica1/Replica1.java
@@ -5,6 +5,7 @@ import derms.ReplicaManager;
import derms.Request;
import derms.Response;
import derms.replica2.DermsLogger;
+import derms.util.TestLogger;
import derms.util.ThreadPool;
import javax.xml.ws.Endpoint;
@@ -32,6 +33,7 @@ public class Replica1 implements Replica {
private final ConcurrentHashMap<String, Integer> portsMap;
private final Random r = new Random();
+ private boolean byzFailure;
public Replica1(ReplicaManager replicaManager) {
this.replicaManager = replicaManager;
@@ -60,7 +62,21 @@ public class Replica1 implements Replica {
}
@Override
- public void startProcess() {
+ public void startProcess(int byzantine, int crash) {
+ // [TEST] Detect crash
+ if (crash == 1) {
+ alive = false;
+ } else {
+ alive = true;
+ }
+
+ // [TEST] Detect byzantine failure
+ if (byzantine == 1) {
+ byzFailure = true;
+ } else {
+ byzFailure = false;
+ }
+
try {
server = new DERMSServer("MTL", portsMap);
} catch (InterruptedException e) {
@@ -89,6 +105,13 @@ public class Replica1 implements Replica {
@Override
public void processRequest(Request request) {
+ // [TEST] Simulate byzantine failure (return incorrect value)
+ if (byzFailure == true) {
+ Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false);
+ replicaManager.sendResponseToFE(response);
+ return;
+ }
+
log.info(request.toString());
String status = "";
@@ -137,7 +160,10 @@ public class Replica1 implements Replica {
ThreadPool.shutdown(pool, log);
alive = false;
log.info("Finished shutting down.");
- startProcess();
+
+ // [TEST] Restart process without byzantine failure or crash
+ TestLogger.log("REPLICA 1: {RESTARTED}");
+ startProcess(0, 0);
}
@Override
diff --git a/src/main/java/derms/replica2/Replica2.java b/src/main/java/derms/replica2/Replica2.java
index 3c9f764..cf21d74 100644
--- a/src/main/java/derms/replica2/Replica2.java
+++ b/src/main/java/derms/replica2/Replica2.java
@@ -1,6 +1,7 @@
package derms.replica2;
import derms.*;
+import derms.util.TestLogger;
import derms.util.ThreadPool;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -24,6 +25,7 @@ public class Replica2 implements Replica {
private final ReplicaManager replicaManager;
private final ExecutorService pool;
private boolean alive = false;
+ private boolean byzFailure;
public Replica2(City city, ReplicaManager replicaManager) throws IOException {
this.city = city;
@@ -57,7 +59,21 @@ public class Replica2 implements Replica {
public boolean isAlive() { return alive; }
@Override
- public void startProcess() {
+ public void startProcess(int byzantine, int crash) {
+ // [TEST] Detect crash
+ if (crash == 1) {
+ alive = false;
+ } else {
+ alive = true;
+ }
+
+ // [TEST] Detect byzantine failure
+ if (byzantine == 1) {
+ byzFailure = true;
+ } else {
+ byzFailure = false;
+ }
+
try {
pool.execute(new ResourceAvailability.Server(localAddr, resources));
} catch (IOException e) {
@@ -105,7 +121,7 @@ public class Replica2 implements Replica {
log.info("Running");
log.config("Local address is "+localAddr.toString());
- alive = true;
+ //alive = true;
log.info(getClass().getSimpleName() + " started.");
}
@@ -113,6 +129,13 @@ public class Replica2 implements Replica {
public void processRequest(Request request) {
log.info(request.toString());
+ // [TEST] Simulate byzantine failure (return incorrect value)
+ if (byzFailure == true) {
+ Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false);
+ replicaManager.sendResponseToFE(response);
+ return;
+ }
+
String status = "";
try {
switch (request.getFunction()) {
@@ -153,7 +176,10 @@ public class Replica2 implements Replica {
@Override
public void restart() {
shutdown();
- startProcess();
+
+ // [TEST] Restart process without byzantine failure or crash
+ TestLogger.log("REPLICA 2: {RESTARTED}");
+ startProcess(0, 0);
}
@Override
diff --git a/src/main/java/derms/replica3/Replica3.java b/src/main/java/derms/replica3/Replica3.java
index 5c6a113..456c4f1 100644
--- a/src/main/java/derms/replica3/Replica3.java
+++ b/src/main/java/derms/replica3/Replica3.java
@@ -13,6 +13,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import derms.replica3.Logger;
+import derms.util.TestLogger;
public class Replica3 implements Replica{
static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555);
@@ -27,6 +28,7 @@ public class Replica3 implements Replica{
// private final Logger log;
private boolean alive;
+ private boolean byzFailure;
private final ReplicaManager replicaManager;
public Replica3(City city, ReplicaManager replicaManager) throws IOException {
@@ -42,6 +44,7 @@ public class Replica3 implements Replica{
// log.config("Local address is "+localAddr.toString());
this.alive = true;
+ this.byzFailure = false;
}
public Replica3(String city, ReplicaManager replicaManager) throws IOException {
@@ -52,7 +55,21 @@ public class Replica3 implements Replica{
public boolean isAlive() { return alive; }
@Override
- public void startProcess() {
+ public void startProcess(int byzantine, int crash) {
+ // [TEST] Detect crash
+ if (crash == 1) {
+ alive = false;
+ } else {
+ alive = true;
+ }
+
+ // [TEST] Detect byzantine failure
+ if (byzantine == 1) {
+ byzFailure = true;
+ } else {
+ byzFailure = false;
+ }
+
// TODO
// log.info(getClass().getSimpleName() + " started.");
System.out.println("process started");
@@ -61,6 +78,14 @@ public class Replica3 implements Replica{
@Override
public void processRequest(Request request) {
// log.info(request.toString());
+
+ // [TEST] Simulate byzantine failure (return incorrect value)
+ if (byzFailure == true) {
+ Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false);
+ replicaManager.sendResponseToFE(response);
+ return;
+ }
+
System.out.println("process request and good");
String status = "";
try {
@@ -104,7 +129,10 @@ public class Replica3 implements Replica{
public void restart() {
// TODO
shutdown();
- startProcess();
+
+ // [TEST] Restart process without byzantine failure or crash
+ TestLogger.log("REPLICA 3: {RESTARTED}");
+ startProcess(0, 0);
}
@Override
@@ -112,6 +140,7 @@ public class Replica3 implements Replica{
private void shutdown() {
// TODO
+ alive = false;
}
public synchronized String addResource(String resourceID, String resourceName, int duration) {
diff --git a/src/main/java/derms/util/LogComparator.java b/src/main/java/derms/util/LogComparator.java
new file mode 100644
index 0000000..da3736c
--- /dev/null
+++ b/src/main/java/derms/util/LogComparator.java
@@ -0,0 +1,27 @@
+package derms.util;
+
+import java.io.*;
+import java.nio.file.*;
+
+public class LogComparator {
+ public static boolean compareLineCounts(String actualFilePath, String expectedFilePath) throws IOException {
+ long actualLineCount = Files.lines(Paths.get(actualFilePath)).count();
+ long expectedLineCount = Files.lines(Paths.get(expectedFilePath)).count();
+ System.out.println("XXXXXXXXX ACTUAL LINE: " + actualLineCount);
+ System.out.println("XXXXXXXXX EXPECTED: " + expectedLineCount);
+ return actualLineCount == expectedLineCount;
+ }
+public static boolean containsSuccess(String filePath) throws IOException {
+ return Files.lines(Paths.get(filePath)).anyMatch(line -> line.contains("SUCCESS"));
+}
+
+public static boolean compareFiles(String actualFilePath, String expectedFilePath) throws IOException {
+ boolean lineCountsMatch = compareLineCounts(actualFilePath, expectedFilePath);
+ boolean actualContainsSuccess = containsSuccess(actualFilePath);
+ System.out.println("XXXXXXXXX ACTUAL SUCCESS: " + actualContainsSuccess);
+ boolean expectedContainsSuccess = containsSuccess(expectedFilePath);
+ System.out.println("XXXXXXXXX EXPECTED SUCCESS: " + expectedContainsSuccess);
+
+ return lineCountsMatch && actualContainsSuccess && expectedContainsSuccess;
+}
+} \ No newline at end of file
diff --git a/src/main/java/derms/util/TestLogger.java b/src/main/java/derms/util/TestLogger.java
new file mode 100644
index 0000000..7bb34db
--- /dev/null
+++ b/src/main/java/derms/util/TestLogger.java
@@ -0,0 +1,27 @@
+package derms.util;
+
+import java.io.*;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+public class TestLogger {
+ private static final String LOG_FILE = "SystemTest.log";
+
+ public static void log(String message) {
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(LOG_FILE, true))) {
+ String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+ writer.write("[" + timestamp + "] " + message);
+ writer.newLine();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void clearLog() {
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(LOG_FILE))) {
+ // Clear the file by overwriting it with nothing
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+} \ No newline at end of file
diff --git a/src/test/java/derms/test/SystemTest.java b/src/test/java/derms/test/SystemTest.java
new file mode 100644
index 0000000..818b4e9
--- /dev/null
+++ b/src/test/java/derms/test/SystemTest.java
@@ -0,0 +1,183 @@
+package derms.test;
+
+import derms.ReplicaManager;
+import derms.Sequencer;
+import derms.client.ResponderClient;
+import derms.frontend.FE;
+import derms.replica1.DERMSServerPublisher;
+
+import org.junit.jupiter.api.*;
+import java.io.*;
+import java.net.MalformedURLException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.file.*;
+import java.util.*;
+import derms.util.*;
+import derms.replica3.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class SystemTest {
+
+ private static final String TEST_LOG_PATH = "SystemTest.log";
+ private static final String EXPECTED_LOG_PATH_NORM = "TestExpected.log";
+ private static final String EXPECTED_LOG_PATH_BYZ = "TestExpectedByz.log";
+ private static final String EXPECTED_LOG_PATH_CRASH = "TestExpectedCrash.log";
+ private static final String EXPECTED_LOG_PATH_COMBINED = "TestExpectedCombined.log";
+
+ // [TODO]
+ // input IP and NET config
+ private static String IP = "172.16.62.225";
+
+ @BeforeEach
+ void clearLogFile() throws IOException {
+ TestLogger.clearLog();
+ }
+
+ @BeforeAll
+ static void runMains() throws IOException {
+ String[] argsFE = {IP, IP};
+
+ Thread feThread = new Thread(() -> {
+ try {
+ FE.main(argsFE);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ feThread.start();
+
+ Thread sequencerThread = new Thread(() -> {
+ try {
+ InetAddress ip = InetAddress.getByName(IP);
+ NetworkInterface netIfc = NetworkInterface.getByInetAddress(ip);
+ Sequencer sequencer = new Sequencer(ip, netIfc);
+ sequencer.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ sequencerThread.start();
+ }
+
+ @AfterEach
+ void stopServers() {
+ // Stop the DERMSServerPublisher
+ DERMSServerPublisher.stop();
+ }
+
+ @Test
+ void testNormal() throws IOException {
+ // Replica 1
+ String[] argsRM = {"1", "MTL", IP, IP, "0", "0"};
+
+ // [TODO]
+ // Run the main function of the desired replica, for example:
+ DERMSServerPublisher.main(new String[0]);
+
+ ReplicaManager.main(argsRM);
+ ResponderClient responderClient = new ResponderClient(IP);
+ responderClient.addResource("MTL1001", "ambulance", 10);
+
+ // Compare the number of lines in the log files, to determine if they match or not
+ assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_NORM));
+ }
+
+ @Test
+ void testByzantine() throws IOException {
+ // Replica 1
+ String[] argsRM = {"1", "MTL", IP, IP, "1", "0"};
+
+ // [TODO]
+ // Run the main function of the desired replica, for example:
+ DERMSServerPublisher.main(new String[0]);
+
+ ReplicaManager.main(argsRM);
+ ResponderClient responderClient = new ResponderClient(IP);
+ responderClient.addResource("MTL1001", "ambulance", 10);
+
+ // Compare the number of lines in the log files, to determine if they match or not
+ assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_BYZ));
+ }
+
+ @Test
+ void testCrash() throws IOException {
+ // Replica 1
+ String[] argsRM = {"1", "MTL", IP, IP, "0", "1"};
+
+ // [TODO]
+ // Run the main function of the desired replica, for example:
+ DERMSServerPublisher.main(new String[0]);
+
+ ReplicaManager.main(argsRM);
+ ResponderClient responderClient = new ResponderClient(IP);
+ responderClient.addResource("MTL1001", "ambulance", 10);
+
+ // Compare the number of lines in the log files, to determine if they match or not
+ assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_CRASH));
+ }
+
+ @Test
+ void testCombined() throws IOException {
+ // Replica 1 and 2
+ String[] argsRM1 = {"1", "MTL", IP, IP, "1", "0"};
+ String[] argsRM3 = {"3", "MTL", IP, IP, "0", "1"};
+
+ // [TODO]
+ // Run the main function of the desired TWO replicas, for example:
+ DERMSServerPublisher.main(new String[0]);
+ MTLServer.main(new String[0]);
+ QUEServer.main(new String[0]);
+ SHEServer.main(new String[0]);
+
+ ReplicaManager.main(argsRM1);
+ ReplicaManager.main(argsRM3);
+
+ Thread thread1 = new Thread(() -> {
+ ResponderClient responderClient = null;
+ try {
+ responderClient = new ResponderClient(IP);
+ } catch (MalformedURLException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ if (responderClient != null) {
+ responderClient.addResource("MTL1001", "ambulance", 10);
+ }
+ }
+ });
+
+ Thread thread2 = new Thread(() -> {
+ ResponderClient responderClient2 = null;
+ try {
+ responderClient2 = new ResponderClient(IP);
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ } finally {
+ if (responderClient2 != null) {
+ responderClient2.addResource("MTL1002", "ambulance", 11);
+ }
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+
+ try {
+ thread1.join();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ try {
+ thread2.join();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // Compare the number of lines in the log files, to determine if they match or not
+ assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_COMBINED));
+ }
+} \ No newline at end of file