diff options
| author | ShazaAhmed <ShazaMamdouh@aucegypt.edu> | 2024-12-03 13:26:06 -0500 |
|---|---|---|
| committer | ShazaAhmed <ShazaMamdouh@aucegypt.edu> | 2024-12-03 13:27:17 -0500 |
| commit | b7f7d9872b5820521b77f4a3175c5d1c1a0c4058 (patch) | |
| tree | 81fc7c283e327cca153544d7e3600933f54a5132 | |
| parent | a0a6fcd7bd2dd2fb736477bcbd3d034b38565fba (diff) | |
| parent | b0214bbc7e7a7ee6aac9dca2610060ac0f88dc77 (diff) | |
| download | soen423-b7f7d9872b5820521b77f4a3175c5d1c1a0c4058.zip | |
Merge branch 'test'
24 files changed, 1269 insertions, 338 deletions
@@ -3,7 +3,11 @@ *.bbl *.bcf *.blg -*.log +QUEServer.log +MTLServer.log +SHEServer.log +server.log +SystemTest.log *.run.xml *.toc javadoc diff --git a/SystemTest.log b/SystemTest.log new file mode 100644 index 0000000..0b9da65 --- /dev/null +++ b/SystemTest.log @@ -0,0 +1,3 @@ +[2024-12-03 12:45:40] REPLICA 1: {BYZANTINE: FALSE} +[2024-12-03 12:45:40] REPLICA 1: {CRASH: FALSE} +[2024-12-03 12:46:00] [FAILED: Fail: No response from any server] diff --git a/TestExpected.log b/TestExpected.log new file mode 100644 index 0000000..1cea96b --- /dev/null +++ b/TestExpected.log @@ -0,0 +1,3 @@ +[2024-12-03 07:48:36] REPLICA 1: {BYZANTINE: FALSE} +[2024-12-03 07:48:36] REPLICA 1: {CRASH: FALSE} +[2024-12-03 08:53:10] [SUCCESS: OK]
\ No newline at end of file diff --git a/TestExpectedByz.log b/TestExpectedByz.log new file mode 100644 index 0000000..b5cabee --- /dev/null +++ b/TestExpectedByz.log @@ -0,0 +1,5 @@ +[2024-12-03 09:22:35] REPLICA 1: {BYZANTINE: TRUE} +[2024-12-03 09:22:35] REPLICA 1: {CRASH: FALSE} +[2024-12-03 09:22:35] REPLICA 1: {BYZANTINE: DETECTED} +[2024-12-03 09:22:35] REPLICA 1: {RESTARTED} +[2024-12-03 09:22:56] [SUCCESS: OK]
\ No newline at end of file diff --git a/TestExpectedCombined.log b/TestExpectedCombined.log new file mode 100644 index 0000000..9d955ad --- /dev/null +++ b/TestExpectedCombined.log @@ -0,0 +1,10 @@ +[2024-12-03 09:27:08] REPLICA 1: {BYZANTINE: TRUE} +[2024-12-03 09:27:08] REPLICA 1: {CRASH: FALSE} +[2024-12-03 09:27:08] REPLICA 1: {BYZANTINE: DETECTED} +[2024-12-03 09:27:08] REPLICA 1: {RESTARTED} +[2024-12-03 09:27:08] REPLICA 3: {BYZANTINE: FALSE} +[2024-12-03 09:27:08] REPLICA 3: {CRASH: TRUE} +[2024-12-03 09:27:08] REPLICA 3: {CRASH: DETECTED} +[2024-12-03 09:27:08] REPLICA 3: {RESTARTED} +[2024-12-03 09:27:28] [SUCCESS: OK] +[2024-12-03 09:27:28] [SUCCESS: OK]
\ No newline at end of file diff --git a/TestExpectedCrash.log b/TestExpectedCrash.log new file mode 100644 index 0000000..90a0640 --- /dev/null +++ b/TestExpectedCrash.log @@ -0,0 +1,5 @@ +[2024-12-03 09:22:35] REPLICA 1: {BYZANTINE: FALSE} +[2024-12-03 09:22:35] REPLICA 1: {CRASH: TRUE} +[2024-12-03 09:22:35] REPLICA 1: {CRASH: DETECTED} +[2024-12-03 09:22:35] REPLICA 1: {RESTARTED} +[2024-12-03 09:22:56] [SUCCESS: OK]
\ No newline at end of file diff --git a/src/main/java/derms/Replica.java b/src/main/java/derms/Replica.java index eae0014..a717064 100644 --- a/src/main/java/derms/Replica.java +++ b/src/main/java/derms/Replica.java @@ -2,7 +2,7 @@ package derms; public interface Replica { boolean isAlive(); - void startProcess(); + void startProcess(int byzantine, int crash); void processRequest(Request request); void restart(); int getId(); diff --git a/src/main/java/derms/Replica4.java b/src/main/java/derms/Replica4.java deleted file mode 100644 index ed7e90b..0000000 --- a/src/main/java/derms/Replica4.java +++ /dev/null @@ -1,108 +0,0 @@ -//package derms; -// -////import derms.Replica3pkg.ResponderClient; -//import java.io.*; -//import java.util.*; -// -//import derms.Replica4pkg.RemoteServer; -// -//public class Replica4 implements Replica { -// -// private ReplicaManager replicaManager; -// private RemoteServer remoteServer; -// private boolean alive = true; -// -// public Replica4(ReplicaManager replicaManager){ -// this.replicaManager = replicaManager; -// } -// -// @Override -// public boolean isAlive() { -// return alive; -// } -// -// @Override -// public void startProcess() { -// this.remoteServer = new RemoteServer(); -// System.out.println("[Replica 4] Process started."); -// } -// -// @Override -// public void processRequest(Request request) { -// ResponderClient responderClient; -// CoordinatorClient coordinatorClient; -// String responseMessage; -// switch (request.getFunction()) { -// case "addResource": -// responderClient = new ResponderClient(request.getClientID()); -// responseMessage = responderClient.addResource(request.getResourceID(), request.getResourceType(), request.getDuration()); -// break; -// case "removeResource": -// responderClient = new ResponderClient(request.getClientID()); -// responseMessage = responderClient.removeResource(request.getResourceID(), request.getDuration()); -// break; -// case "listResourceAvailability": -// responderClient = new ResponderClient(request.getClientID()); -// responseMessage = responderClient.listResourceAvailability(request.getResourceType()); -// break; -// case "requestResource": -// coordinatorClient = new CoordinatorClient(request.getClientID()); -// responseMessage = coordinatorClient.requestResource(request.getResourceID(), request.getDuration()); -// break; -// case "findResource": -// coordinatorClient = new CoordinatorClient(request.getClientID()); -// responseMessage = coordinatorClient.findResource(request.getResourceType()); -// break; -// case "returnResource": -// coordinatorClient = new CoordinatorClient(request.getClientID()); -// responseMessage = coordinatorClient.returnResource(request.getResourceID()); -// break; -// case "swapResource": -// coordinatorClient = new CoordinatorClient(request.getClientID()); -// responseMessage = coordinatorClient.swapResource( -// request.getClientID(), -// request.getOldResourceID(), -// request.getOldResourceType(), -// request.getResourceID(), -// request.getResourceType() -// ); -// break; -// default: -// responseMessage = "Unrecognized function: " + request.getFunction(); -// log("Unrecognized function in request: " + request.getFunction()); -// break; -// } -// -// Response response = new Response(request.getSequenceNumber(), responseMessage); -// log("Replica " + 4 + " processed request: " + request + ", response: " + response); -// replicaManager.sendResponseToFE(response); -// } -// -// @Override -// public void restart() { -// shutDown(); -// startProcess(); -// } -// -// public void shutDown(){ -// this.remoteServer.stopServers(); -// } -// -// @Override -// public int getId() { -// return 4; -// } -// -// public synchronized void log(String message) { -// String logMessage = new Date() + " - " + message; -// System.out.println(logMessage); -// -// try (FileWriter fw = new FileWriter("Replica4_log.txt", true); -// BufferedWriter bw = new BufferedWriter(fw); -// PrintWriter out = new PrintWriter(bw)) { -// out.println(logMessage); -// } catch (IOException e) { -// e.printStackTrace(); -// } -// } -//}
\ No newline at end of file diff --git a/src/main/java/derms/Replica4pkg/RemoteServer.java b/src/main/java/derms/Replica4pkg/RemoteServer.java index a572dbe..6bc7411 100644 --- a/src/main/java/derms/Replica4pkg/RemoteServer.java +++ b/src/main/java/derms/Replica4pkg/RemoteServer.java @@ -5,25 +5,25 @@ import javax.xml.ws.Endpoint; import java.util.*; public class RemoteServer { - private List<Endpoint> endpoints = new ArrayList<>(); - - public RemoteServer() { - try { - Map<String, Integer> UDPPorts = new HashMap<>(); - UDPPorts.put("MTL", 4000); - UDPPorts.put("QUE", 5000); - UDPPorts.put("SHE", 6000); - - String[] serverNames = {"MTL", "QUE", "SHE"}; - int i = 0; - for (String serverName : serverNames) { - int UDPPort = UDPPorts.get(serverName); - Server server = new Server(); - server.initServer(serverName, UDPPort, UDPPorts); - int port = 8080 + i; - String url = "http://localhost:" + port + "/DERMS/" + serverName; - Endpoint endpoint = Endpoint.publish(url, server); - endpoints.add(endpoint); // Keep track of the Endpoint + private static List<Endpoint> endpoints = new ArrayList<>(); + + public static void main(String[] args) { + try { + Map<String, Integer> UDPPorts = new HashMap<>(); + UDPPorts.put("MTL", 4000); + UDPPorts.put("QUE", 5000); + UDPPorts.put("SHE", 6000); + + String[] serverNames = {"MTL", "QUE", "SHE"}; + int i = 0; + for (String serverName : serverNames) { + int UDPPort = UDPPorts.get(serverName); + Server server = new Server(); + server.initServer(serverName, UDPPort, UDPPorts); + int port = 8080 + i; + String url = "http://localhost:" + port + "/DERMS/" + serverName; + Endpoint endpoint = Endpoint.publish(url, server); + endpoints.add(endpoint); // Keep track of the Endpoint i++; System.out.println(serverName + " Server ready and waiting ..."); } diff --git a/src/main/java/derms/Replica4pkg/Replica4.java b/src/main/java/derms/Replica4pkg/Replica4.java new file mode 100644 index 0000000..91bc303 --- /dev/null +++ b/src/main/java/derms/Replica4pkg/Replica4.java @@ -0,0 +1,578 @@ +package derms.Replica4pkg; + +import java.io.*; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.util.*; + +import javax.jws.WebMethod; + +import derms.Replica; +import derms.ReplicaManager; +import derms.Request; +import derms.Response; +import derms.util.TestLogger; +import derms.util.ThreadPool; + + + +public class Replica4 implements Replica { + + private ReplicaManager replicaManager; + private boolean alive = true; + private boolean byzFailure; + + private String serverName; + private int udpPort; + private Map<String, Map<String, Resource>> resourceMap; //resourceName -> (resourceID -> Resource) + private Map<String, Set<String>> coordinatorResources; //coordinatorID -> Set<resourceID> + private Map<String, Map<String, Integer>> coordinatorResourceDurations; //coordinatorID -> (resourceID -> allocatedDuration) + private Map<String, Integer> serverUdpPorts; //serverName -> UDP port + + public Replica4(ReplicaManager replicaManager){ + this.replicaManager = replicaManager; + this.resourceMap = new HashMap<>(); + this.coordinatorResources = new HashMap<>(); + this.coordinatorResourceDurations = new HashMap<>(); + startUDPListener(); + } + + @Override + public boolean isAlive() { + return alive; + } + + @Override + public void startProcess(int byzantine, int crash) { + // [TEST] Detect crash + if (crash == 1) { + alive = false; + } else { + alive = true; + } + + // [TEST] Detect byzantine failure + if (byzantine == 1) { + byzFailure = true; + } else { + byzFailure = false; + } + + System.out.println("[Replica 4] Process started."); + } + + @Override + public void processRequest(Request request) { + log(request.toString()); + + // [TEST] Simulate byzantine failure (return incorrect value) + if (byzFailure == true) { + Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false); + replicaManager.sendResponseToFE(response); + return; + } + + String status = ""; + try { + switch (request.getFunction()) { + case "addResource": + status = addResource(request.getResourceID(), request.getResourceType(), request.getDuration()); + break; + case "removeResource": + status = removeResource(request.getResourceID(), request.getDuration()); + break; + case "listResourceAvailability": + status = listResourceAvailability(request.getResourceType()); + break; + case "requestResource": + status = requestResource(request.getClientID(), request.getResourceID(), request.getDuration()); + break; + case "findResource": + status = findResource(request.getClientID(), request.getResourceType()); + break; + case "returnResource": + status = returnResource(request.getClientID(), request.getResourceID()); + break; + case "swapResource": + status = swapResource(request.getClientID(), request.getOldResourceID(), request.getOldResourceType(), request.getResourceID(), request.getResourceType()); + break; + default: + status = "Failure: unknown function '" + request.getFunction() + "'"; + } + } catch (Exception e) { + log(e.getMessage()); + status = "Failure: " + request.getFunction() + ": " + e.getMessage(); + } + + Response response = new Response(request, replicaManager.getReplicaId(), status, false); // TODO: isSuccess flag + log("Processed request " + request + "; response: " + response); + replicaManager.sendResponseToFE(response); + } + + @Override + public void restart() { + shutdown(); + + // [TEST] Restart process without byzantine failure or crash + TestLogger.log("REPLICA 2: {RESTARTED}"); + startProcess(0, 0); + } + + public void shutdown() { + log("Shutting down..."); + alive = false; + log("Finished shutting down."); + } + + @Override + public int getId() { + return 4; + } + + public synchronized void log(String message) { + String logMessage = new Date() + " - " + message; + System.out.println(logMessage); + + try (FileWriter fw = new FileWriter("Replica4_log.txt", true); + BufferedWriter bw = new BufferedWriter(fw); + PrintWriter out = new PrintWriter(bw)) { + out.println(logMessage); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @WebMethod(exclude = true) + public void initReplica4(String serverName, int udpPort, Map<String, Integer> serverUdpPorts) { + this.serverName = serverName; + this.udpPort = udpPort; + this.serverUdpPorts = serverUdpPorts; + } + + //Start UDP Listener + private void startUDPListener() { + new Thread(() -> { + try (DatagramSocket socket = new DatagramSocket(udpPort)) { + byte[] buffer = new byte[4096]; + System.out.println(serverName + " UDP Server started on port " + udpPort); + + while (true) { + DatagramPacket request = new DatagramPacket(buffer, buffer.length); + socket.receive(request); + new Thread(() -> handleUDPRequest(request)).start(); + } + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + } + + //Handle UDP Request + private void handleUDPRequest(DatagramPacket requestPacket) { + try { + String requestData = new String(requestPacket.getData(), 0, requestPacket.getLength()); + String[] parts = requestData.split(":"); + String methodName = parts[0]; + String[] params = Arrays.copyOfRange(parts, 1, parts.length); + + String responseData = ""; + + switch (methodName) { + case "listResourceAvailability": + responseData = handleListResourceAvailability(params[0]); + break; + case "requestResource": + responseData = handleLocalResourceRequest(params[0], params[1], Integer.parseInt(params[2])); + break; + case "findResource": + responseData = handleFindResource(params[0], params[1]); + break; + case "returnResource": + responseData = handleReturnResource(params[0], params[1]); + break; + case "swapRequestResource": + responseData = handleSwapRequestResource(params[0], params[1], Integer.parseInt(params[2])); + break; + case "swapReturnResource": + responseData = handleSwapReturnResource(params[0], params[1]); + break; + case "checkCoordinatorResource": + responseData = checkCoordinatorHasResource(params[0], params[1]); + break; + case "getCoordinatorResourceDuration": + int duration = getCoordinatorResourceDuration(params[0], params[1]); + responseData = String.valueOf(duration); + break; + default: + responseData = "Invalid method"; + } + + //Send response + byte[] responseBytes = responseData.getBytes(); + DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length, requestPacket.getAddress(), requestPacket.getPort()); + DatagramSocket socket = new DatagramSocket(); + socket.send(responsePacket); + socket.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Operation Handlers + private synchronized String handleListResourceAvailability(String resourceName) { + StringBuilder result = new StringBuilder(); + resourceName = resourceName.toUpperCase(); + Map<String, Resource> resources = resourceMap.get(resourceName); + + if (resources != null) { + for (Resource resource : resources.values()) { + if (resource.getDuration() > 0) { + result.append(resource.getResourceName()).append(" - ").append(serverName).append(" ").append(resource.getDuration()).append("\n"); + } + } + } + return result.toString(); + } + + private synchronized String handleLocalResourceRequest(String coordinatorID, String resourceID, int duration) { + for (Map<String, Resource> resources : resourceMap.values()) { + Resource resource = resources.get(resourceID); + if (resource != null) { + if (resource.getDuration() >= duration) { + resource.subDuration(duration); + coordinatorResources.computeIfAbsent(coordinatorID, k -> new HashSet<>()).add(resourceID); + coordinatorResourceDurations.computeIfAbsent(coordinatorID, k -> new HashMap<>()).put(resourceID, duration); + + log("Resource " + resourceID + " allocated to " + coordinatorID + " for duration " + duration); + return "Resource " + resourceID + " allocated successfully."; + } else { + return "Insufficient duration for resource " + resourceID; + } + } + } + return "Resource " + resourceID + " not found."; + } + + private synchronized String handleFindResource(String coordinatorID, String resourceName) { + StringBuilder result = new StringBuilder(); + resourceName = resourceName.toUpperCase(); + Set<String> resources = coordinatorResources.get(coordinatorID); + if (resources != null) { + Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID); + if (resourceDurations != null) { + for (String resourceID : resources) { + Resource resource = getResourceByID(resourceID); + if (resource != null && resource.getResourceName().equals(resourceName)) { + Integer allocatedDuration = resourceDurations.get(resourceID); + if (allocatedDuration != null) { + result.append(resource.getResourceName()).append(" - ").append(resourceID).append(" ").append(allocatedDuration).append("\n"); + } + } + } + } + } + return result.toString(); + } + + private synchronized String handleReturnResource(String coordinatorID, String resourceID) { + Set<String> resources = coordinatorResources.get(coordinatorID); + if (resources != null && resources.contains(resourceID)) { + resources.remove(resourceID); + if (resources.isEmpty()) { + coordinatorResources.remove(coordinatorID); + } + Resource resource = getResourceByID(resourceID); + if (resource != null) { + Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID); + if (resourceDurations != null) { + Integer allocatedDuration = resourceDurations.remove(resourceID); + if (allocatedDuration != null) { + resource.addDuration(allocatedDuration); + } + if (resourceDurations.isEmpty()) { + coordinatorResourceDurations.remove(coordinatorID); + } + } + log("Coordinator " + coordinatorID + " returned resource " + resourceID); + return "Resource " + resourceID + " returned successfully."; + } else { + return "Resource " + resourceID + " not found."; + } + } else { + return "You do not occupy resource " + resourceID + "."; + } + } + + //Helper Methods + private Resource getResourceByID(String resourceID) { + for (Map<String, Resource> resources : resourceMap.values()) { + if (resources.containsKey(resourceID)) { + return resources.get(resourceID); + } + } + return null; + } + + private synchronized String handleSwapRequestResource(String coordinatorID, String resourceID, int duration) { + //Attempt to allocate the resource to the coordinator + for (Map<String, Resource> resources : resourceMap.values()) { + Resource resource = resources.get(resourceID); + if (resource != null) { + if (resource.getDuration() >= duration) { + resource.subDuration(duration); + coordinatorResources.computeIfAbsent(coordinatorID, k -> new HashSet<>()).add(resourceID); + coordinatorResourceDurations.computeIfAbsent(coordinatorID, k -> new HashMap<>()).put(resourceID, duration); + log("Resource " + resourceID + " temporarily allocated to " + coordinatorID + " for swapping, duration " + duration); + return "Success"; + } else { + return "Insufficient duration for resource " + resourceID; + } + } + } + return "Resource " + resourceID + " not found."; + } + + private synchronized String handleSwapReturnResource(String coordinatorID, String resourceID) { + Set<String> resources = coordinatorResources.get(coordinatorID); + if (resources != null && resources.contains(resourceID)) { + resources.remove(resourceID); + if (resources.isEmpty()) { + coordinatorResources.remove(coordinatorID); + } + Resource resource = getResourceByID(resourceID); + if (resource != null) { + Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID); + if (resourceDurations != null) { + Integer allocatedDuration = resourceDurations.remove(resourceID); + if (allocatedDuration != null) { + resource.addDuration(allocatedDuration); + } + if (resourceDurations.isEmpty()) { + coordinatorResourceDurations.remove(coordinatorID); + } + } + log("Coordinator " + coordinatorID + " resource " + resourceID + " allocation undone during swap"); + return "Success"; + } else { + return "Resource " + resourceID + " not found."; + } + } else { + return "Coordinator did not acquire resource " + resourceID; + } + } + + private String sendUDPRequest(String serverName, String methodName, String... params) { + String response = ""; + try { + InetAddress host = InetAddress.getByName("localhost"); + int port = serverUdpPorts.get(serverName); + String requestData = methodName + ":" + String.join(":", params); + + byte[] requestBytes = requestData.getBytes(); + DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length, host, port); + + DatagramSocket socket = new DatagramSocket(); + socket.send(requestPacket); + + byte[] buffer = new byte[4096]; + DatagramPacket responsePacket = new DatagramPacket(buffer, buffer.length); + socket.receive(responsePacket); + + response = new String(responsePacket.getData(), 0, responsePacket.getLength()); + socket.close(); + } catch (Exception e) { + e.printStackTrace(); + } + return response; + } + + //Implement ResponderInterface Methods + public synchronized String addResource(String resourceID, String resourceName, int duration) { + resourceName = resourceName.toUpperCase(); + Map<String, Resource> resources = resourceMap.computeIfAbsent(resourceName, k -> new HashMap<>()); + Resource resource = resources.get(resourceID); + + if (resource != null) { + if (duration > resource.getDuration()) { + resource.setDuration(duration); + log("Updated duration of resource " + resourceID); + return "Resource " + resourceID + " duration updated."; + } else { + log("Resource " + resourceID + " already exists with greater or equal duration."); + return "Resource " + resourceID + " already exists with greater or equal duration."; + } + } else { + resource = new Resource(resourceName, duration); + resources.put(resourceID, resource); + log("Added new resource " + resourceID); + return "Resource " + resourceID + " added successfully."; + } + } + + public synchronized String removeResource(String resourceID, int duration) { + for (Map<String, Resource> resources : resourceMap.values()) { + Resource resource = resources.get(resourceID); + if (resource != null) { + if (duration >= resource.getDuration()) { + resources.remove(resourceID); + log("Resource " + resourceID + " completely removed."); + return "Resource " + resourceID + " completely removed."; + } else { + resource.subDuration(duration); + log("Decreased duration of resource " + resourceID); + return "Resource " + resourceID + " duration decreased by " + duration + "."; + } + } + } + log("Resource " + resourceID + " not found."); + return "Resource " + resourceID + " not found."; + } + + public String listResourceAvailability(String resourceName) { + resourceName = resourceName.toUpperCase(); + StringBuilder result = new StringBuilder(); + + synchronized (this) { + Map<String, Resource> resources = resourceMap.get(resourceName); + if (resources != null) { + for (Resource resource : resources.values()) { + if (resource.getDuration() > 0) { + result.append(resource.getResourceName()).append(" - ").append(serverName).append(" ").append(resource.getDuration()).append("\n"); + } + } + } + } + + for (String otherServer : serverUdpPorts.keySet()) { + if (!otherServer.equals(serverName)) { + String response = sendUDPRequest(otherServer, "listResourceAvailability", resourceName); + result.append(response); + } + } + + return result.toString(); + } + + public String requestResource(String coordinatorID, String resourceID, int duration) { + String resourceServer = resourceID.substring(0, 3).toUpperCase(); + String response; + + if (resourceServer.equals(serverName)) { + response = handleLocalResourceRequest(coordinatorID, resourceID, duration); + } else { + response = sendUDPRequest(resourceServer, "requestResource", coordinatorID, resourceID, String.valueOf(duration)); + } + + log("Coordinator " + coordinatorID + " requested resource " + resourceID + ": " + response); + return response; + } + + public String findResource(String coordinatorID, String resourceName) { + resourceName = resourceName.toUpperCase(); + StringBuilder result = new StringBuilder(); + + for (String otherServer : serverUdpPorts.keySet()) { + String response; + if (otherServer.equals(serverName)) { + response = handleFindResource(coordinatorID, resourceName); + } else { + response = sendUDPRequest(otherServer, "findResource", coordinatorID, resourceName); + } + result.append(response); + } + + return result.toString(); + } + + public String returnResource(String coordinatorID, String resourceID) { + String resourceServer = resourceID.substring(0, 3).toUpperCase(); + String response; + + if (resourceServer.equals(serverName)) { + response = handleReturnResource(coordinatorID, resourceID); + } else { + response = sendUDPRequest(resourceServer, "returnResource", coordinatorID, resourceID); + } + + log("Coordinator " + coordinatorID + " returned resource " + resourceID + ": " + response); + return response; + } + + private synchronized int getCoordinatorResourceDuration(String coordinatorID, String resourceID) { + Map<String, Integer> resourceDurations = coordinatorResourceDurations.get(coordinatorID); + if (resourceDurations != null && resourceDurations.containsKey(resourceID)) { + return resourceDurations.get(resourceID); + } + return 0; + } + + private synchronized String checkCoordinatorHasResource(String coordinatorID, String resourceID) { + Set<String> resources = coordinatorResources.get(coordinatorID); + if (resources != null && resources.contains(resourceID)) { + return "true"; + } + return "false"; + } + + public String swapResource(String coordinatorID, String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) { + String oldResourceServer = oldResourceID.substring(0, 3).toUpperCase(); + String newResourceServer = newResourceID.substring(0, 3).toUpperCase(); + + //Check if the coordinator has the old resource + String checkOldResourceResponse; + if (oldResourceServer.equals(serverName)) { + checkOldResourceResponse = checkCoordinatorHasResource(coordinatorID, oldResourceID); + } else { + checkOldResourceResponse = sendUDPRequest(oldResourceServer, "checkCoordinatorResource", coordinatorID, oldResourceID); + } + + if (!checkOldResourceResponse.equals("true")) { + return "Coordinator has not acquired the old resource."; + } + + //Get the duration allocated to the old resource + int duration; + if (oldResourceServer.equals(serverName)) { + duration = getCoordinatorResourceDuration(coordinatorID, oldResourceID); + } else { + String durationStr = sendUDPRequest(oldResourceServer, "getCoordinatorResourceDuration", coordinatorID, oldResourceID); + duration = Integer.parseInt(durationStr); + } + + //Attempt to acquire the new resource + String requestResponse; + if (newResourceServer.equals(serverName)) { + requestResponse = handleLocalResourceRequest(coordinatorID, newResourceID, duration); + } else { + requestResponse = sendUDPRequest(newResourceServer, "requestResource", coordinatorID, newResourceID, String.valueOf(duration)); + } + + if (!requestResponse.contains("allocated successfully")) { + return "Failed to acquire new resource: " + requestResponse; + } + + //Return the old resource + String returnResponse; + if (oldResourceServer.equals(serverName)) { + returnResponse = handleReturnResource(coordinatorID, oldResourceID); + } else { + returnResponse = sendUDPRequest(oldResourceServer, "returnResource", coordinatorID, oldResourceID); + } + + if (!returnResponse.contains("returned successfully")) { + //Undo the allocation of the new resource + if (newResourceServer.equals(serverName)) { + handleReturnResource(coordinatorID, newResourceID); + } else { + sendUDPRequest(newResourceServer, "returnResource", coordinatorID, newResourceID); + } + return "Failed to return old resource: " + returnResponse; + } + + log("Coordinator " + coordinatorID + " swapped resource " + oldResourceID + " with " + newResourceID); + + return "Resource swap successful."; + } +} + diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 6ff5de2..a3008de 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -10,6 +10,7 @@ import derms.net.runicast.ReliableUnicastSender; import derms.net.tomulticast.TotalOrderMulticastSender; import derms.replica1.Replica1; import derms.replica2.Replica2; +import derms.util.*; import java.io.IOException; import java.net.InetAddress; @@ -22,7 +23,7 @@ import java.util.Objects; import java.util.logging.Logger; public class ReplicaManager { - public static final String usage = "Usage: java ReplicaManager <replicaId> <city> <frontEndIP>"; + public static final String usage = "Usage: java ReplicaManager <replicaId> <city> <replicaManagerIP> <frontEndIP> <byzantine(0 or 1)> <crash(0 or 1)>"; private final int replicaId; private final String city; private Replica replica; @@ -32,13 +33,13 @@ public class ReplicaManager { private ReliableUnicastSender<Response> unicastSender; private TotalOrderMulticastReceiver multicastReceiver; - public ReplicaManager(int replicaId, String city, InetAddress frontEndIP) throws IOException { + public ReplicaManager(int replicaId, String city, InetAddress replicaManagerIP, InetAddress frontEndIP, int byzantine, int crash) throws IOException { this.replicaId = replicaId; this.city = city; this.log = Logger.getLogger(getClass().getName()); initUnicastSender(frontEndIP); - initReplica(); - initMulticastReceiver(); + initReplica(byzantine, crash); + initMulticastReceiver(replicaManagerIP); startHeartbeatThread(); } @@ -48,7 +49,7 @@ public class ReplicaManager { unicastSender = new ReliableUnicastSender<>(frontEndAddress); } - private void initReplica() throws IOException { + private void initReplica(int byzantine, int crash) throws IOException { switch (replicaId) { case 1: replica = new Replica1(this); @@ -63,14 +64,27 @@ public class ReplicaManager { replica = new derms.replica2.Replica2(city, this); break; } - replica.startProcess(); + + // [TEST] Logging + if (byzantine == 0) { + TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: FALSE}"); + } else { + TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: TRUE}"); + } + + if (crash == 0) { + TestLogger.log("REPLICA " + replicaId + ": {CRASH: FALSE}"); + } else { + TestLogger.log("REPLICA " + replicaId + ": {CRASH: TRUE}"); + } + + replica.startProcess(byzantine, crash); } - private void initMulticastReceiver() throws IOException { + private void initMulticastReceiver(InetAddress replicaManagerIP) throws IOException { InetSocketAddress group = Config.group; - InetAddress localAddress = InetAddress.getLocalHost(); // Your local address - NetworkInterface netInterface = NetworkInterface.getByInetAddress(localAddress); - multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress, netInterface); + NetworkInterface netInterface = NetworkInterface.getByInetAddress(replicaManagerIP); + multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, replicaManagerIP, netInterface); new Thread(() -> { while (true) { @@ -97,8 +111,12 @@ public class ReplicaManager { new Thread(() -> { while (true) { if (!replica.isAlive()) { + // [TEST] Logging + TestLogger.log("REPLICA " + replicaId + ": {CRASH: DETECTED}"); + informFrontEndRmIsDown(replica.getId()); replica.restart(); + //TestLogger.log("REPLICA " + replicaId + ": {RESTARTED}"); } try { Thread.sleep(5000); // Example 5 seconds. @@ -119,8 +137,13 @@ public class ReplicaManager { public void handleByzantineFailure() { log.severe("Byzantine failure detected in Replica " + replica.getId()); + + // [TEST] Logging + TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: DETECTED}"); + replica.restart(); informFrontEndRmHasBug(replica.getId()); + //TestLogger.log("REPLICA " + replicaId + ": {RESTARTED}"); } private void informFrontEndRmIsDown(int replicaId) { @@ -129,6 +152,7 @@ public class ReplicaManager { out.writeObject("RM_DOWN:" + replicaId); } catch (IOException e) { log.severe("Failed to inform FE that RM is down: " + e.getMessage()); + TestLogger.log("[FAILED TO INFORM FE (RM IS DOWN)]"); } } @@ -138,11 +162,12 @@ public class ReplicaManager { out.writeObject("RM_BUG:" + replicaId); } catch (IOException e) { log.severe("Failed to inform FE that RM has a bug: " + e.getMessage()); + TestLogger.log("[FAILED TO INFORM FE (RM HAS A BUG)]"); } } public static void main(String[] args) { - if (args.length < 3) { + if (args.length < 4) { System.err.println(usage); System.exit(1); } @@ -150,11 +175,15 @@ public class ReplicaManager { try { int replicaId = Integer.parseInt(args[0]); String city = args[1]; - InetAddress frontEndIP = InetAddress.getByName(args[2]); - ReplicaManager replicaManager = new ReplicaManager(replicaId, city, frontEndIP); + InetAddress replicaManagerIP = InetAddress.getByName(args[2]); + InetAddress frontEndIP = InetAddress.getByName(args[3]); + int byzantine = Integer.parseInt(args[4]); + int crash = Integer.parseInt(args[5]); + ReplicaManager replicaManager = new ReplicaManager(replicaId, city, replicaManagerIP, frontEndIP, byzantine, crash); System.out.println("ReplicaManager " + replicaId + " is running."); } catch (IOException e) { System.err.println("Failed to start ReplicaManager: " + e.getMessage()); + TestLogger.log("[FAILED TO START RM]"); e.printStackTrace(); } } diff --git a/src/main/java/derms/client/CoordinatorClient.java b/src/main/java/derms/client/CoordinatorClient.java index 570d76b..40d8a94 100644 --- a/src/main/java/derms/client/CoordinatorClient.java +++ b/src/main/java/derms/client/CoordinatorClient.java @@ -4,116 +4,28 @@ import derms.frontend.DERMSInterface; import java.net.MalformedURLException; -public class CoordinatorClient extends CLI { - public static final String usage = "Usage: java derms.client.CoordinatorClient <coordinator ID> <FE host>"; - - private final String coordinatorID; +public class CoordinatorClient { private final DERMSInterface server; + private final String coordinatorID; - private CoordinatorClient(String coordinatorID, String FEhost) throws MalformedURLException { - this.coordinatorID = coordinatorID; + public CoordinatorClient(String coordinatorID, String FEhost) throws MalformedURLException { this.server = Client.connectToServer(FEhost); - - commands.put("request", new Request()); - cmdDescriptions.add(new Description( - "request <resource ID> <duration>", - "Borrow a resource.")); - - commands.put("find", new Find()); - cmdDescriptions.add(new Description( - "find <resource name>", - "List borrowed resources.")); - - commands.put("return", new Return()); - cmdDescriptions.add(new Description( - "return <resource ID>", - "Return a borrowed resource.")); - - commands.put("swap", new Swap()); - cmdDescriptions.add(new Description( - "swap <old resource ID> <old resource type> <new resource ID> <new resource type>", - "Return the old resource and borrow the new one.")); - } - - public static void main(String[] args) { - if (args.length < 2) { - System.err.println(usage); - System.exit(1); - } - - String coordinatorID = args[0]; - String FEhost = args[1]; - - try { - (new CoordinatorClient(coordinatorID, FEhost)).run(); - } catch (MalformedURLException e) { - e.printStackTrace(); - } + this.coordinatorID = coordinatorID; } - private class Request implements Command { - @Override - public void exec(String[] args) { - if (args.length < 2) - System.out.println("invalid arguments for 'request'"); - else - request(args[0], args[1]); - } - - private void request(String resourceID, String durationStr) { - try { - int duration = Integer.parseInt(durationStr); - if (duration < 0) - throw new NumberFormatException("duration less than 0"); - String response = server.requestResource(coordinatorID, resourceID, duration); - System.out.println(response); - } catch (NumberFormatException e) { - System.out.println("invalid duration: " + e.getMessage()); - } - } + public String requestResource(String resourceID, int duration) { + return server.requestResource(coordinatorID, resourceID, duration); } - private class Find implements Command { - @Override - public void exec(String[] args) { - if (args.length < 1) - System.out.println("invalid arguments for 'find'"); - else find(args[0]); - } - - private void find(String resourceID) { - String response = server.findResource(coordinatorID, resourceID); - System.out.println(response); - } + public String findResource(String resourceName) { + return server.findResource(coordinatorID, resourceName); } - private class Return implements Command { - @Override - public void exec(String[] args) { - if (args.length < 1) - System.out.println("invalid arguments for 'return'"); - else - returnResource(args[0]); - } - - private void returnResource(String resourceID) { - String response = server.returnResource(coordinatorID, resourceID); - System.out.println(response); - } + public String returnResource(String resourceID) { + return server.returnResource(coordinatorID, resourceID); } - private class Swap implements Command { - @Override - public void exec(String[] args) { - if (args.length < 4) - System.out.println("invalid arguments for 'swap'"); - else - swap(args[0], args[1], args[2], args[3]); - } - - private void swap(String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) { - String response = server.swapResource(coordinatorID, oldResourceID, oldResourceType, newResourceID, newResourceType); - System.out.println(response); - } + public String swapResource(String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) { + return server.swapResource(coordinatorID, oldResourceID, oldResourceType, newResourceID, newResourceType); } } diff --git a/src/main/java/derms/client/CoordinatorClientCLI.java b/src/main/java/derms/client/CoordinatorClientCLI.java new file mode 100644 index 0000000..bddc6e6 --- /dev/null +++ b/src/main/java/derms/client/CoordinatorClientCLI.java @@ -0,0 +1,119 @@ +package derms.client; + +import derms.frontend.DERMSInterface; + +import java.net.MalformedURLException; + +public class CoordinatorClientCLI extends CLI { + public static final String usage = "Usage: java derms.client.CoordinatorClient <coordinator ID> <FE host>"; + + private final String coordinatorID; + private final DERMSInterface server; + + private CoordinatorClientCLI(String coordinatorID, String FEhost) throws MalformedURLException { + this.coordinatorID = coordinatorID; + this.server = Client.connectToServer(FEhost); + + commands.put("request", new Request()); + cmdDescriptions.add(new CLI.Description( + "request <resource ID> <duration>", + "Borrow a resource.")); + + commands.put("find", new Find()); + cmdDescriptions.add(new CLI.Description( + "find <resource name>", + "List borrowed resources.")); + + commands.put("return", new Return()); + cmdDescriptions.add(new CLI.Description( + "return <resource ID>", + "Return a borrowed resource.")); + + commands.put("swap", new Swap()); + cmdDescriptions.add(new CLI.Description( + "swap <old resource ID> <old resource type> <new resource ID> <new resource type>", + "Return the old resource and borrow the new one.")); + } + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println(usage); + System.exit(1); + } + + String coordinatorID = args[0]; + String FEhost = args[1]; + + try { + (new CoordinatorClientCLI(coordinatorID, FEhost)).run(); + } catch (MalformedURLException e) { + e.printStackTrace(); + } + } + + private class Request implements CLI.Command { + @Override + public void exec(String[] args) { + if (args.length < 2) + System.out.println("invalid arguments for 'request'"); + else + request(args[0], args[1]); + } + + private void request(String resourceID, String durationStr) { + try { + int duration = Integer.parseInt(durationStr); + if (duration < 0) + throw new NumberFormatException("duration less than 0"); + String response = server.requestResource(coordinatorID, resourceID, duration); + System.out.println(response); + } catch (NumberFormatException e) { + System.out.println("invalid duration: " + e.getMessage()); + } + } + } + + private class Find implements CLI.Command { + @Override + public void exec(String[] args) { + if (args.length < 1) + System.out.println("invalid arguments for 'find'"); + else find(args[0]); + } + + private void find(String resourceID) { + String response = server.findResource(coordinatorID, resourceID); + System.out.println(response); + } + } + + private class Return implements CLI.Command { + @Override + public void exec(String[] args) { + if (args.length < 1) + System.out.println("invalid arguments for 'return'"); + else + returnResource(args[0]); + } + + private void returnResource(String resourceID) { + String response = server.returnResource(coordinatorID, resourceID); + System.out.println(response); + } + } + + private class Swap implements CLI.Command { + @Override + public void exec(String[] args) { + if (args.length < 4) + System.out.println("invalid arguments for 'swap'"); + else + swap(args[0], args[1], args[2], args[3]); + } + + private void swap(String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) { + String response = server.swapResource(coordinatorID, oldResourceID, oldResourceType, newResourceID, newResourceType); + System.out.println(response); + } + } +} diff --git a/src/main/java/derms/client/ResponderClient.java b/src/main/java/derms/client/ResponderClient.java index 23361d7..6834c76 100644 --- a/src/main/java/derms/client/ResponderClient.java +++ b/src/main/java/derms/client/ResponderClient.java @@ -1,103 +1,33 @@ package derms.client; import derms.frontend.DERMSInterface; +import derms.util.TestLogger; import java.net.MalformedURLException; +import java.util.Objects; -public class ResponderClient extends CLI { - public static final String usage = "Usage: java derms.client.ResponderClient <FE host>"; - +public class ResponderClient { private final DERMSInterface server; - private ResponderClient(String FEhost) throws MalformedURLException { + public ResponderClient(String FEhost) throws MalformedURLException { server = Client.connectToServer(FEhost); - - commands.put("add", new Add()); - cmdDescriptions.add(new Description( - "add <resource ID> <resource type> <duration>", - "Add ad resource to the server")); - - commands.put("remove", new Remove()); - cmdDescriptions.add(new Description( - "remove <resource ID> <duration>", - "Decrease the duration of a resource. If duration is negative, the resource is removed entirely.")); - - commands.put("list", new List()); - cmdDescriptions.add(new Description( - "list <resource name>", - "List available resources")); - } - - public static void main(String[] args) { - if (args.length < 1) { - System.err.println(usage); - System.exit(1); - } - - String FEhost = args[0]; - - try { - (new ResponderClient(FEhost)).run(); - } catch (MalformedURLException e) { - e.printStackTrace(); - } } - private class Add implements Command { - @Override - public void exec(String[] args) { - if (args.length < 3) - System.out.println("invalid arguments for 'add'"); - else - add(args[0], args[1], args[2]); - } - - private void add(String resourceID, String resourceName, String durationStr) { - try { - int duration = Integer.parseInt(durationStr); - if (duration < 0) { - throw new NumberFormatException("duration less than 0"); - } - String response = server.addResource(resourceID, resourceName, duration); - System.out.println(response); - } catch (NumberFormatException e) { - System.out.println("invalid duration: " + durationStr); - } + public String addResource(String resourceID, String resourceName, int duration) { + String res = server.addResource(resourceID, resourceName, duration); + if (res.contains("Fail")) { + TestLogger.log("[FAILED: " + res + "]"); + } else { + TestLogger.log("[SUCCESS: " + res + "]"); } + return res; } - private class Remove implements Command { - @Override - public void exec(String[] args) { - if (args.length < 2) - System.out.println("invalid arguments for 'remove'"); - else - remove(args[0], args[1]); - } - - private void remove(String resourceID, String durationStr) { - try { - int duration = Integer.parseInt(durationStr); - String response = server.removeResource(resourceID, duration); - System.out.println(response); - } catch (NumberFormatException e) { - System.out.println("invalid duration: " + durationStr); - } - } + public String removeResource(String resourceID, int duration) { + return server.removeResource(resourceID, duration); } - private class List implements Command { - @Override - public void exec(String[] args) { - if (args.length < 1) - System.out.println("invalid arguments for 'list'"); - else - list(args[0]); - } - - private void list(String resourceName) { - String response = server.listResourceAvailability(resourceName); - System.out.println(response); - } + public String listResourceAvailability(String resourceName) { + return server.listResourceAvailability(resourceName); } } diff --git a/src/main/java/derms/client/ResponderClientCLI.java b/src/main/java/derms/client/ResponderClientCLI.java new file mode 100644 index 0000000..de046d5 --- /dev/null +++ b/src/main/java/derms/client/ResponderClientCLI.java @@ -0,0 +1,104 @@ +package derms.client; + +import derms.frontend.DERMSInterface; +import derms.util.TestLogger; + +import java.net.MalformedURLException; + +public class ResponderClientCLI extends CLI { + public static final String usage = "Usage: java derms.client.ResponderClient <FE host>"; + + private final DERMSInterface server; + + private ResponderClientCLI(String FEhost) throws MalformedURLException { + server = Client.connectToServer(FEhost); + + commands.put("add", new Add()); + cmdDescriptions.add(new CLI.Description( + "add <resource ID> <resource type> <duration>", + "Add ad resource to the server")); + + commands.put("remove", new Remove()); + cmdDescriptions.add(new CLI.Description( + "remove <resource ID> <duration>", + "Decrease the duration of a resource. If duration is negative, the resource is removed entirely.")); + + commands.put("list", new List()); + cmdDescriptions.add(new CLI.Description( + "list <resource name>", + "List available resources")); + } + + public static void main(String[] args) { + if (args.length < 1) { + System.err.println(usage); + System.exit(1); + } + + String FEhost = args[0]; + + try { + (new ResponderClientCLI(FEhost)).run(); + } catch (MalformedURLException e) { + e.printStackTrace(); + } + } + + private class Add implements CLI.Command { + @Override + public void exec(String[] args) { + if (args.length < 3) + System.out.println("invalid arguments for 'add'"); + else + add(args[0], args[1], args[2]); + } + + private void add(String resourceID, String resourceName, String durationStr) { + try { + int duration = Integer.parseInt(durationStr); + if (duration < 0) { + throw new NumberFormatException("duration less than 0"); + } + String response = server.addResource(resourceID, resourceName, duration); + System.out.println(response); + } catch (NumberFormatException e) { + System.out.println("invalid duration: " + durationStr); + } + } + } + + private class Remove implements CLI.Command { + @Override + public void exec(String[] args) { + if (args.length < 2) + System.out.println("invalid arguments for 'remove'"); + else + remove(args[0], args[1]); + } + + private void remove(String resourceID, String durationStr) { + try { + int duration = Integer.parseInt(durationStr); + String response = server.removeResource(resourceID, duration); + System.out.println(response); + } catch (NumberFormatException e) { + System.out.println("invalid duration: " + durationStr); + } + } + } + + private class List implements CLI.Command { + @Override + public void exec(String[] args) { + if (args.length < 1) + System.out.println("invalid arguments for 'list'"); + else + list(args[0]); + } + + private void list(String resourceName) { + String response = server.listResourceAvailability(resourceName); + System.out.println(response); + } + } +} diff --git a/src/main/java/derms/frontend/DERMSServerImpl.java b/src/main/java/derms/frontend/DERMSServerImpl.java index 1877cef..2a5dfa9 100644 --- a/src/main/java/derms/frontend/DERMSServerImpl.java +++ b/src/main/java/derms/frontend/DERMSServerImpl.java @@ -5,10 +5,6 @@ package derms.frontend; //import model.Resource; import java.util.*; import java.util.stream.Collectors; -import java.util.Optional; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -23,6 +19,7 @@ import javax.jws.soap.SOAPBinding.Style; //import interfaces.DERMSInterface; import derms.Request; import derms.Response; +import derms.util.TestLogger; @WebService(endpointInterface = "derms.frontend.DERMSInterface") public class DERMSServerImpl implements DERMSInterface { diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java index f29459c..b3e100f 100644 --- a/src/main/java/derms/frontend/FE.java +++ b/src/main/java/derms/frontend/FE.java @@ -10,6 +10,7 @@ import derms.Request; import derms.Response; import derms.net.runicast.ReliableUnicastReceiver; import derms.net.runicast.ReliableUnicastSender; +import derms.util.TestLogger; import java.util.ArrayList; import java.util.List; @@ -51,6 +52,7 @@ public class FE { System.out.println("Rm:" + RmNumber + "has bug"); // sendMulticastFaultMessageToRms(errorMessage); sendUnicastToSequencer(errorMessage); + //TestLogger.log("FE: {BYZANTINE: INFORM REPLICA" + RmNumber + "}"); } @Override @@ -60,6 +62,7 @@ public class FE { System.out.println("Rm:" + RmNumber + "is down"); // sendMulticastFaultMessageToRms(errorMessage); sendUnicastToSequencer(errorMessage); + //TestLogger.log("FE: {CRASH: INFORM REPLICA" + RmNumber + "}"); } @Override diff --git a/src/main/java/derms/replica1/DERMSServerPublisher.java b/src/main/java/derms/replica1/DERMSServerPublisher.java index 44ee879..c94cb34 100644 --- a/src/main/java/derms/replica1/DERMSServerPublisher.java +++ b/src/main/java/derms/replica1/DERMSServerPublisher.java @@ -3,8 +3,27 @@ package derms.replica1; import javax.xml.ws.Endpoint;
public class DERMSServerPublisher {
+
+ private static Endpoint[] endpoints = new Endpoint[3];
+
public static void main(String[] args) {
// try {
+// endpoints[0] = Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL"));
+// endpoints[1] = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
+// endpoints[2] = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
+// } catch (InterruptedException e) {
+// throw new RuntimeException(e);
+// }
+ }
+
+ public static void stop() {
+ for (Endpoint endpoint : endpoints) {
+ if (endpoint != null && endpoint.isPublished()) {
+ endpoint.stop();
+ System.out.println("DERMS Server is stopped.");
+ }
+ }
+// try {
// Endpoint.publish("http://localhost:8387/ws/derms", new DERMSServer("MTL"));
// Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
// Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
diff --git a/src/main/java/derms/replica1/Replica1.java b/src/main/java/derms/replica1/Replica1.java index 6863d5c..1520b73 100644 --- a/src/main/java/derms/replica1/Replica1.java +++ b/src/main/java/derms/replica1/Replica1.java @@ -5,6 +5,7 @@ import derms.ReplicaManager; import derms.Request; import derms.Response; import derms.replica2.DermsLogger; +import derms.util.TestLogger; import derms.util.ThreadPool; import javax.xml.ws.Endpoint; @@ -32,6 +33,7 @@ public class Replica1 implements Replica { private final ConcurrentHashMap<String, Integer> portsMap; private final Random r = new Random(); + private boolean byzFailure; public Replica1(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -60,7 +62,21 @@ public class Replica1 implements Replica { } @Override - public void startProcess() { + public void startProcess(int byzantine, int crash) { + // [TEST] Detect crash + if (crash == 1) { + alive = false; + } else { + alive = true; + } + + // [TEST] Detect byzantine failure + if (byzantine == 1) { + byzFailure = true; + } else { + byzFailure = false; + } + try { server = new DERMSServer("MTL", portsMap); } catch (InterruptedException e) { @@ -89,6 +105,13 @@ public class Replica1 implements Replica { @Override public void processRequest(Request request) { + // [TEST] Simulate byzantine failure (return incorrect value) + if (byzFailure == true) { + Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false); + replicaManager.sendResponseToFE(response); + return; + } + log.info(request.toString()); String status = ""; @@ -137,7 +160,10 @@ public class Replica1 implements Replica { ThreadPool.shutdown(pool, log); alive = false; log.info("Finished shutting down."); - startProcess(); + + // [TEST] Restart process without byzantine failure or crash + TestLogger.log("REPLICA 1: {RESTARTED}"); + startProcess(0, 0); } @Override diff --git a/src/main/java/derms/replica2/Replica2.java b/src/main/java/derms/replica2/Replica2.java index 3c9f764..cf21d74 100644 --- a/src/main/java/derms/replica2/Replica2.java +++ b/src/main/java/derms/replica2/Replica2.java @@ -1,6 +1,7 @@ package derms.replica2; import derms.*; +import derms.util.TestLogger; import derms.util.ThreadPool; import sun.reflect.generics.reflectiveObjects.NotImplementedException; @@ -24,6 +25,7 @@ public class Replica2 implements Replica { private final ReplicaManager replicaManager; private final ExecutorService pool; private boolean alive = false; + private boolean byzFailure; public Replica2(City city, ReplicaManager replicaManager) throws IOException { this.city = city; @@ -57,7 +59,21 @@ public class Replica2 implements Replica { public boolean isAlive() { return alive; } @Override - public void startProcess() { + public void startProcess(int byzantine, int crash) { + // [TEST] Detect crash + if (crash == 1) { + alive = false; + } else { + alive = true; + } + + // [TEST] Detect byzantine failure + if (byzantine == 1) { + byzFailure = true; + } else { + byzFailure = false; + } + try { pool.execute(new ResourceAvailability.Server(localAddr, resources)); } catch (IOException e) { @@ -105,7 +121,7 @@ public class Replica2 implements Replica { log.info("Running"); log.config("Local address is "+localAddr.toString()); - alive = true; + //alive = true; log.info(getClass().getSimpleName() + " started."); } @@ -113,6 +129,13 @@ public class Replica2 implements Replica { public void processRequest(Request request) { log.info(request.toString()); + // [TEST] Simulate byzantine failure (return incorrect value) + if (byzFailure == true) { + Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false); + replicaManager.sendResponseToFE(response); + return; + } + String status = ""; try { switch (request.getFunction()) { @@ -153,7 +176,10 @@ public class Replica2 implements Replica { @Override public void restart() { shutdown(); - startProcess(); + + // [TEST] Restart process without byzantine failure or crash + TestLogger.log("REPLICA 2: {RESTARTED}"); + startProcess(0, 0); } @Override diff --git a/src/main/java/derms/replica3/Replica3.java b/src/main/java/derms/replica3/Replica3.java index 5c6a113..456c4f1 100644 --- a/src/main/java/derms/replica3/Replica3.java +++ b/src/main/java/derms/replica3/Replica3.java @@ -13,6 +13,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import derms.replica3.Logger; +import derms.util.TestLogger; public class Replica3 implements Replica{ static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); @@ -27,6 +28,7 @@ public class Replica3 implements Replica{ // private final Logger log; private boolean alive; + private boolean byzFailure; private final ReplicaManager replicaManager; public Replica3(City city, ReplicaManager replicaManager) throws IOException { @@ -42,6 +44,7 @@ public class Replica3 implements Replica{ // log.config("Local address is "+localAddr.toString()); this.alive = true; + this.byzFailure = false; } public Replica3(String city, ReplicaManager replicaManager) throws IOException { @@ -52,7 +55,21 @@ public class Replica3 implements Replica{ public boolean isAlive() { return alive; } @Override - public void startProcess() { + public void startProcess(int byzantine, int crash) { + // [TEST] Detect crash + if (crash == 1) { + alive = false; + } else { + alive = true; + } + + // [TEST] Detect byzantine failure + if (byzantine == 1) { + byzFailure = true; + } else { + byzFailure = false; + } + // TODO // log.info(getClass().getSimpleName() + " started."); System.out.println("process started"); @@ -61,6 +78,14 @@ public class Replica3 implements Replica{ @Override public void processRequest(Request request) { // log.info(request.toString()); + + // [TEST] Simulate byzantine failure (return incorrect value) + if (byzFailure == true) { + Response response = new Response(request, replicaManager.getReplicaId(), "BYZANTINE FAILURE", false); + replicaManager.sendResponseToFE(response); + return; + } + System.out.println("process request and good"); String status = ""; try { @@ -104,7 +129,10 @@ public class Replica3 implements Replica{ public void restart() { // TODO shutdown(); - startProcess(); + + // [TEST] Restart process without byzantine failure or crash + TestLogger.log("REPLICA 3: {RESTARTED}"); + startProcess(0, 0); } @Override @@ -112,6 +140,7 @@ public class Replica3 implements Replica{ private void shutdown() { // TODO + alive = false; } public synchronized String addResource(String resourceID, String resourceName, int duration) { diff --git a/src/main/java/derms/util/LogComparator.java b/src/main/java/derms/util/LogComparator.java new file mode 100644 index 0000000..da3736c --- /dev/null +++ b/src/main/java/derms/util/LogComparator.java @@ -0,0 +1,27 @@ +package derms.util; + +import java.io.*; +import java.nio.file.*; + +public class LogComparator { + public static boolean compareLineCounts(String actualFilePath, String expectedFilePath) throws IOException { + long actualLineCount = Files.lines(Paths.get(actualFilePath)).count(); + long expectedLineCount = Files.lines(Paths.get(expectedFilePath)).count(); + System.out.println("XXXXXXXXX ACTUAL LINE: " + actualLineCount); + System.out.println("XXXXXXXXX EXPECTED: " + expectedLineCount); + return actualLineCount == expectedLineCount; + } +public static boolean containsSuccess(String filePath) throws IOException { + return Files.lines(Paths.get(filePath)).anyMatch(line -> line.contains("SUCCESS")); +} + +public static boolean compareFiles(String actualFilePath, String expectedFilePath) throws IOException { + boolean lineCountsMatch = compareLineCounts(actualFilePath, expectedFilePath); + boolean actualContainsSuccess = containsSuccess(actualFilePath); + System.out.println("XXXXXXXXX ACTUAL SUCCESS: " + actualContainsSuccess); + boolean expectedContainsSuccess = containsSuccess(expectedFilePath); + System.out.println("XXXXXXXXX EXPECTED SUCCESS: " + expectedContainsSuccess); + + return lineCountsMatch && actualContainsSuccess && expectedContainsSuccess; +} +}
\ No newline at end of file diff --git a/src/main/java/derms/util/TestLogger.java b/src/main/java/derms/util/TestLogger.java new file mode 100644 index 0000000..7bb34db --- /dev/null +++ b/src/main/java/derms/util/TestLogger.java @@ -0,0 +1,27 @@ +package derms.util; + +import java.io.*; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class TestLogger { + private static final String LOG_FILE = "SystemTest.log"; + + public static void log(String message) { + try (BufferedWriter writer = new BufferedWriter(new FileWriter(LOG_FILE, true))) { + String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + writer.write("[" + timestamp + "] " + message); + writer.newLine(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static void clearLog() { + try (BufferedWriter writer = new BufferedWriter(new FileWriter(LOG_FILE))) { + // Clear the file by overwriting it with nothing + } catch (IOException e) { + e.printStackTrace(); + } + } +}
\ No newline at end of file diff --git a/src/test/java/derms/test/SystemTest.java b/src/test/java/derms/test/SystemTest.java new file mode 100644 index 0000000..818b4e9 --- /dev/null +++ b/src/test/java/derms/test/SystemTest.java @@ -0,0 +1,183 @@ +package derms.test; + +import derms.ReplicaManager; +import derms.Sequencer; +import derms.client.ResponderClient; +import derms.frontend.FE; +import derms.replica1.DERMSServerPublisher; + +import org.junit.jupiter.api.*; +import java.io.*; +import java.net.MalformedURLException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.nio.file.*; +import java.util.*; +import derms.util.*; +import derms.replica3.*; + +import static org.junit.jupiter.api.Assertions.*; + +class SystemTest { + + private static final String TEST_LOG_PATH = "SystemTest.log"; + private static final String EXPECTED_LOG_PATH_NORM = "TestExpected.log"; + private static final String EXPECTED_LOG_PATH_BYZ = "TestExpectedByz.log"; + private static final String EXPECTED_LOG_PATH_CRASH = "TestExpectedCrash.log"; + private static final String EXPECTED_LOG_PATH_COMBINED = "TestExpectedCombined.log"; + + // [TODO] + // input IP and NET config + private static String IP = "172.16.62.225"; + + @BeforeEach + void clearLogFile() throws IOException { + TestLogger.clearLog(); + } + + @BeforeAll + static void runMains() throws IOException { + String[] argsFE = {IP, IP}; + + Thread feThread = new Thread(() -> { + try { + FE.main(argsFE); + } catch (Exception e) { + e.printStackTrace(); + } + }); + feThread.start(); + + Thread sequencerThread = new Thread(() -> { + try { + InetAddress ip = InetAddress.getByName(IP); + NetworkInterface netIfc = NetworkInterface.getByInetAddress(ip); + Sequencer sequencer = new Sequencer(ip, netIfc); + sequencer.run(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + sequencerThread.start(); + } + + @AfterEach + void stopServers() { + // Stop the DERMSServerPublisher + DERMSServerPublisher.stop(); + } + + @Test + void testNormal() throws IOException { + // Replica 1 + String[] argsRM = {"1", "MTL", IP, IP, "0", "0"}; + + // [TODO] + // Run the main function of the desired replica, for example: + DERMSServerPublisher.main(new String[0]); + + ReplicaManager.main(argsRM); + ResponderClient responderClient = new ResponderClient(IP); + responderClient.addResource("MTL1001", "ambulance", 10); + + // Compare the number of lines in the log files, to determine if they match or not + assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_NORM)); + } + + @Test + void testByzantine() throws IOException { + // Replica 1 + String[] argsRM = {"1", "MTL", IP, IP, "1", "0"}; + + // [TODO] + // Run the main function of the desired replica, for example: + DERMSServerPublisher.main(new String[0]); + + ReplicaManager.main(argsRM); + ResponderClient responderClient = new ResponderClient(IP); + responderClient.addResource("MTL1001", "ambulance", 10); + + // Compare the number of lines in the log files, to determine if they match or not + assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_BYZ)); + } + + @Test + void testCrash() throws IOException { + // Replica 1 + String[] argsRM = {"1", "MTL", IP, IP, "0", "1"}; + + // [TODO] + // Run the main function of the desired replica, for example: + DERMSServerPublisher.main(new String[0]); + + ReplicaManager.main(argsRM); + ResponderClient responderClient = new ResponderClient(IP); + responderClient.addResource("MTL1001", "ambulance", 10); + + // Compare the number of lines in the log files, to determine if they match or not + assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_CRASH)); + } + + @Test + void testCombined() throws IOException { + // Replica 1 and 2 + String[] argsRM1 = {"1", "MTL", IP, IP, "1", "0"}; + String[] argsRM3 = {"3", "MTL", IP, IP, "0", "1"}; + + // [TODO] + // Run the main function of the desired TWO replicas, for example: + DERMSServerPublisher.main(new String[0]); + MTLServer.main(new String[0]); + QUEServer.main(new String[0]); + SHEServer.main(new String[0]); + + ReplicaManager.main(argsRM1); + ReplicaManager.main(argsRM3); + + Thread thread1 = new Thread(() -> { + ResponderClient responderClient = null; + try { + responderClient = new ResponderClient(IP); + } catch (MalformedURLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + if (responderClient != null) { + responderClient.addResource("MTL1001", "ambulance", 10); + } + } + }); + + Thread thread2 = new Thread(() -> { + ResponderClient responderClient2 = null; + try { + responderClient2 = new ResponderClient(IP); + } catch (MalformedURLException e) { + e.printStackTrace(); + } finally { + if (responderClient2 != null) { + responderClient2.addResource("MTL1002", "ambulance", 11); + } + } + }); + + thread1.start(); + thread2.start(); + + try { + thread1.join(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + try { + thread2.join(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + // Compare the number of lines in the log files, to determine if they match or not + assertTrue(LogComparator.compareFiles(TEST_LOG_PATH, EXPECTED_LOG_PATH_COMBINED)); + } +}
\ No newline at end of file |