diff options
| author | Vijaykumar Patel <vijay.patel@aheeva.com> | 2024-12-02 15:56:23 -0500 |
|---|---|---|
| committer | Vijaykumar Patel <vijay.patel@aheeva.com> | 2024-12-02 15:56:23 -0500 |
| commit | 1ac7511cf337bc64f3a02ebbfd9c3214fa61a946 (patch) | |
| tree | 1ea9aad630db0036a83273b203b0aed5fb77f7dd | |
| parent | 687b126f39c44a083d424fc53fa3843bbdf91473 (diff) | |
| download | soen423-1ac7511cf337bc64f3a02ebbfd9c3214fa61a946.zip | |
Replica3 added
| -rw-r--r-- | src/main/java/derms/ReplicaManager.java | 2 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/City.java | 40 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/Constants.java | 17 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/Logger.java | 60 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/MTLServer.java | 113 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/PortConstants.java | 24 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/QUEServer.java | 111 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/Replica3.java | 589 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/Resource.java | 32 | ||||
| -rw-r--r-- | src/main/java/derms/replica3/SHEServer.java | 112 |
10 files changed, 1099 insertions, 1 deletions
diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 414cdf0..71c22e2 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -54,7 +54,7 @@ public class ReplicaManager { replica = new derms.replica2.Replica2(new derms.replica2.City(), this); break; case 3: - replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + replica = new derms.replica3.Replica3(new derms.replica3.City(), this); break; case 4: replica = new derms.replica2.Replica2(new derms.replica2.City(), this); diff --git a/src/main/java/derms/replica3/City.java b/src/main/java/derms/replica3/City.java new file mode 100644 index 0000000..89fd8ce --- /dev/null +++ b/src/main/java/derms/replica3/City.java @@ -0,0 +1,40 @@ +package derms.replica3; + +import java.io.Serializable; + +public class City implements Serializable { + static final int codeLen = 3; + + private String code; + + City(String code) throws IllegalArgumentException { + if (code.length() != codeLen) + throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters"); + this.code = code; + } + + public City() { + this("XXX"); + } + public String getCode(){ + return code; + } + @Override + public String toString() { + return code; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || this.getClass() != obj.getClass()) { + return false; + } + City other = (City) obj; + return this.code.equals(other.code); + } + + @Override + public int hashCode() { + return code.hashCode(); + } +} diff --git a/src/main/java/derms/replica3/Constants.java b/src/main/java/derms/replica3/Constants.java new file mode 100644 index 0000000..f142a4f --- /dev/null +++ b/src/main/java/derms/replica3/Constants.java @@ -0,0 +1,17 @@ +package derms.replica3; + +public class Constants { + public static final String MONTREAL = "MTL"; + public static final String QUEBEC = "QUE"; + public static final String SHERBROOKE = "SHE"; + + //Address for Servers +// public static final String MONTREAL_ADDRESS = "http://localhost:8085/service/" + Constants.MONTREAL; +// public static final String QUEBEC_ADDRESS = "http://localhost:8090/service/" + Constants.QUEBEC; +// public static final String SHERBROOKE_ADDRESS = "http://localhost:8091/service/" + Constants.SHERBROOKE; +// +// //WSDL URLS +// public static final String MONTREAL_WSDL_URL = Constants.MONTREAL_ADDRESS + "?wsdl"; +// public static final String QUEBEC_WSDL_URL = Constants.QUEBEC_ADDRESS + "?wsdl"; +// public static final String SHERBROOKE_WSDL_URL = Constants.SHERBROOKE_ADDRESS + "?wsdl"; +} diff --git a/src/main/java/derms/replica3/Logger.java b/src/main/java/derms/replica3/Logger.java new file mode 100644 index 0000000..15124c5 --- /dev/null +++ b/src/main/java/derms/replica3/Logger.java @@ -0,0 +1,60 @@ +package derms.replica3; + + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.lang.reflect.Field; +import java.text.DateFormat; +import java.util.Date; + +public class Logger { + private FileWriter fileWriter = null; + private BufferedWriter bufferedWriter = null; + private PrintWriter printWriter = null; + + public Logger(final String activityLoggerFile) throws IOException { + fileWriter = new FileWriter(activityLoggerFile, true); + bufferedWriter = new BufferedWriter(fileWriter); + printWriter = new PrintWriter(bufferedWriter); + } + + public synchronized void log( String action, String status, String res) { + try { + final String dataLog = DateFormat.getDateTimeInstance().format(new Date()) + " [" + + action + "] : [" + status + "] - " + res; + + printWriter.println(dataLog); + System.out.println(dataLog); + bufferedWriter.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public synchronized void clientLog(final String userId, final String action, final String message) { + try { + final String dataLog = DateFormat.getDateTimeInstance().format(new Date()) + " [" + userId + "] [" + + action + "] - " + message; + printWriter.println(dataLog); + System.out.println(dataLog); + bufferedWriter.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public synchronized void clientLog(final String userId, final String action) { + try { + final String dataLog = DateFormat.getDateTimeInstance().format(new Date()) + " [" + userId + "] " + " [" + + action + "] "; + printWriter.println(dataLog); + System.out.println(dataLog); + bufferedWriter.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/derms/replica3/MTLServer.java b/src/main/java/derms/replica3/MTLServer.java new file mode 100644 index 0000000..9e7ea5c --- /dev/null +++ b/src/main/java/derms/replica3/MTLServer.java @@ -0,0 +1,113 @@ +package derms.replica3; + +//import logger.Logger; + + +import javax.xml.ws.Endpoint; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; + +import derms.replica3.Constants; +import derms.replica3.Constants; + +public class MTLServer { + public static void main(String[] args) { + + try{ + final Logger activityLogger = new Logger("MTLServer.log"); + int udpPort = PortConstants.getUdpPort("MTL"); + City ct = new City("MTL"); + final Replica3 ServerImpl = new Replica3(ct, null); +// Endpoint endpoint = Endpoint.publish(Constants.MONTREAL_ADDRESS, ServerImpl); + + /*Start UDP server for MTL*/ + new Thread(() -> { + startUdpServer(activityLogger, ServerImpl, udpPort); + }).start(); + + System.out.println("#=== MTL Server is started ===#"); + + } catch (Exception e) { + System.err.println("ERROR: " + e); + e.printStackTrace(System.out); + } + } + + private static void startUdpServer(final Logger activityLogger, final Replica3 server, int udpPort) { + DatagramSocket socket = null; + try { + socket = new DatagramSocket(udpPort); +// activityLogger.log(MessageTypeConstants.INFO, String.format(UdpServerMessages.UDP_SERVER_STARTED, server.getServerName())); + while (true) { + try { + final byte[] data = new byte[1000]; + final DatagramPacket packet = new DatagramPacket(data, data.length); + socket.receive(packet); + System.out.println("request received" + udpPort); + new Thread(() -> { + processRequest(activityLogger, server, packet); + }).start(); + } catch (IOException e) { +// activityLogger.log("MessageTypeConstants.ERROR", e.getMessage()); + } + } + } catch (SocketException e1) { +// activityLogger.log(MessageTypeConstants.ERROR, e1.getMessage()); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + private static void processRequest(final Logger activityLogger, final Replica3 server, final DatagramPacket packet) { + byte[] response; + DatagramSocket socket = null; + final String request = new String(packet.getData(), StandardCharsets.UTF_8).trim(); + final String[] packetData = request.split("-"); + final String sourceServer = packetData[0].trim(); + final String action = packetData[1].trim(); + final String resourceName = packetData[2].trim(); + String resourceID = ""; + int duration = 0; + + System.out.println("action:" + action); + + if(packetData.length == 4) { + resourceID = packetData[2].trim(); + duration = Integer.parseInt(packetData[3]); + } + + try { + socket = new DatagramSocket(); + if("GET_AVAILABLE_RESOURCES".equalsIgnoreCase(action)) { + response = sanitizeXml(server.getAvailableResources(resourceName).toString()).getBytes(StandardCharsets.UTF_8); + socket.send(new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort())); + }else if("GET_RESOURCE".equalsIgnoreCase(action)) { + response = server.getRequestedResource(resourceID, duration).toString().getBytes(); + socket.send(new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort())); + } + +// activityLogger.log("", action, "Response sent"); + } catch (IOException e) { +// activityLogger.log("", "EXCEPTION", e.getMessage()); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + private static String sanitizeXml(String input) { + if (input == null) { + return ""; // or handle as needed + } + + return input.replaceAll("[\\x00]", ""); + } + +} diff --git a/src/main/java/derms/replica3/PortConstants.java b/src/main/java/derms/replica3/PortConstants.java new file mode 100644 index 0000000..e8ce489 --- /dev/null +++ b/src/main/java/derms/replica3/PortConstants.java @@ -0,0 +1,24 @@ +package derms.replica3; + +public class PortConstants { + + public static final int MTL_PORT = 12012; + public static final int QUE_PORT = 12013; + public static final int SHE_PORT = 12014; + + public static final int MTL_UDP_PORT = 13013; + public static final int QUE_UDP_PORT = 13014; + public static final int SHE_UDP_PORT = 13015; + + public static int getUdpPort(final String serverLocation) { + if(Constants.MONTREAL.equalsIgnoreCase(serverLocation)) { + return MTL_UDP_PORT; + } else if(Constants.QUEBEC.equalsIgnoreCase(serverLocation)) { + return QUE_UDP_PORT; + } else if(Constants.SHERBROOKE.equalsIgnoreCase(serverLocation)) { + return SHE_UDP_PORT; + } + return 0; + } + +} diff --git a/src/main/java/derms/replica3/QUEServer.java b/src/main/java/derms/replica3/QUEServer.java new file mode 100644 index 0000000..c03d329 --- /dev/null +++ b/src/main/java/derms/replica3/QUEServer.java @@ -0,0 +1,111 @@ +package derms.replica3; + +//import logger.Logger; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; + +import javax.xml.ws.Endpoint; +// +//import constants.Constants; +//import constants.PortConstants; +public class QUEServer { + public static void main(String[] args) { + + try{ + final Logger activityLogger = new Logger("QUEServer.log"); + int udpPort = PortConstants.getUdpPort("QUE"); + City ct = new City("QUE"); + /* Create servant and register it with the ORB */ + final Replica3 ServerImpl = new Replica3(ct, null); +// Endpoint endpoint = Endpoint.publish(Constants.QUEBEC_ADDRESS, ServerImpl); + + /*Start UDP server for QUE*/ + new Thread(() -> { + startUdpServer(activityLogger, ServerImpl, udpPort); + }).start(); + + System.out.println("#=== QUE Server is started ===#"); + + } catch (Exception e) { + System.err.println("ERROR: " + e); + e.printStackTrace(System.out); + } + } + + private static void startUdpServer(final Logger activityLogger, final Replica3 server, int udpPort) { + DatagramSocket socket = null; + try { + socket = new DatagramSocket(udpPort); + while (true) { + try { + final byte[] data = new byte[1000]; + final DatagramPacket packet = new DatagramPacket(data, data.length); + socket.receive(packet); + System.out.println("request received" + udpPort); + + new Thread(() -> { + processRequest(activityLogger, server, packet); + }).start(); + } catch (IOException e) { + activityLogger.log("", "EXCEPTION", e.getMessage()); + } + } + } catch (SocketException e1) { + activityLogger.log("", "EXCEPTION", e1.getMessage()); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + private static void processRequest(final Logger activityLogger, final Replica3 server, final DatagramPacket packet) { + byte[] response; + DatagramSocket socket = null; + final String request = new String(packet.getData(), StandardCharsets.UTF_8).trim(); + final String[] packetData = request.split("-"); + final String sourceServer = packetData[0].trim(); + final String action = packetData[1].trim(); + final String resourceName = packetData[2].trim(); + String resourceID = ""; + int duration = 0; + + if(packetData.length == 4) { + resourceID = packetData[2].trim(); + duration = Integer.parseInt(packetData[3]); + } + + try { + socket = new DatagramSocket(); + + if("GET_AVAILABLE_RESOURCES".equalsIgnoreCase(action)) { + response = sanitizeXml(server.getAvailableResources(resourceName).toString()).getBytes(StandardCharsets.UTF_8); + socket.send(new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort())); + }else if("GET_RESOURCE".equalsIgnoreCase(action)) { + response = server.getRequestedResource(resourceID, duration).toString().getBytes(); + socket.send(new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort())); + } + + activityLogger.log("", action, "Response sent"); + } catch (IOException e) { + activityLogger.log("", "EXCEPTION", e.getMessage()); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + private static String sanitizeXml(String input) { + if (input == null) { + return ""; // or handle as needed + } + + return input.replaceAll("[\\x00]", ""); + } + +} diff --git a/src/main/java/derms/replica3/Replica3.java b/src/main/java/derms/replica3/Replica3.java new file mode 100644 index 0000000..9994d82 --- /dev/null +++ b/src/main/java/derms/replica3/Replica3.java @@ -0,0 +1,589 @@ +package derms.replica3; + +import derms.Replica; +import derms.ReplicaManager; +import derms.Request; +import derms.Response; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import derms.replica3.Logger; + +public class Replica3 implements Replica{ + static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); + + private String serverID; + private HashMap<String, HashMap<String, Resource>> resources; + private HashMap<String, List<Resource>> coordinatorResources ; + private HashMap<String, List<String>> resourceWaitingQueues; + private List<String> serverList = Arrays.asList(Constants.MONTREAL, Constants.QUEBEC, Constants.SHERBROOKE); + private Logger activityLogger; +// final InetAddress localAddr; +// private final Logger log; + + private boolean alive; + private final ReplicaManager replicaManager; + + public Replica3(City city, ReplicaManager replicaManager) throws IOException { + this.serverID = city.getCode(); + this.resources = new HashMap<>(); + coordinatorResources = new HashMap<>(); + resourceWaitingQueues = new HashMap<>(); +// this.log = DermsLogger.getLogger(getClass()); + this.replicaManager = replicaManager; + this.activityLogger = new Logger( serverID + "Server.log"); + +// log.info("Running"); +// log.config("Local address is "+localAddr.toString()); + + this.alive = true; + } + + @Override + public boolean isAlive() { return alive; } + + @Override + public void startProcess() { + // TODO +// log.info(getClass().getSimpleName() + " started."); + System.out.println("process started"); + } + + @Override + public void processRequest(Request request) { +// log.info(request.toString()); + System.out.println("process request and good"); + String status = ""; + try { + switch (request.getFunction()) { + case "addResource": + status = addResource(request.getResourceID(), request.getResourceType(), request.getDuration()); + break; + case "removeResource": + status = removeResource(request.getResourceID(), request.getDuration()); + break; + case "listResourceAvailability": + status = listResourceAvailability(request.getResourceType()); + break; + case "requestResource": + status = requestResource(request.getClientID(), request.getResourceID(), request.getDuration()); + break; + case "findResource": + status = findResource(request.getClientID(), request.getResourceType()); + break; + case "returnResource": + status = returnResource(request.getClientID(), request.getResourceID()); + break; + case "swapResource": + status = swapResource(request.getClientID(),request.getOldResourceID(), request.getOldResourceType(), request.getResourceID(), request.getResourceType()); + break; + default: + status = "Failure: unknown function '" + request.getFunction() + "'"; + } + } catch (Exception e) { +// log.warning(e.getMessage()); + status = "Failure: " + request.getFunction() + ": " + e.getMessage(); + } + + Response response = new Response(request.getSequenceNumber(), status); +// log.info("Processed request " + request + "; response: " + response); + replicaManager.sendResponseToFE(response); + } + + + @Override + public void restart() { + // TODO + shutdown(); + startProcess(); + } + + @Override + public int getId() { return 3; } + + private void shutdown() { + // TODO + } + + public synchronized String addResource(String resourceID, String resourceName, int duration) { + // Check if the resource name already exists + String message = ""; +// boolean isIdValid = ValidationService.isResourceIDValid(resourceID); +// boolean isNameValid = ValidationService.isResourceNameValid(resourceName); +// +// if(!(isIdValid && isNameValid)) { +// message = "Input is not valid for ID: " + resourceID + " " + resourceName + " " + duration; +// activityLogger.log(" ADD RESOURCE (" + resourceID + ", " + resourceName + ", " + duration + ") " , " FAILED ", message); +// return message; +// } + + HashMap<String, Resource> resourceMap = resources.get(resourceName); + if (resourceMap == null) { + resourceMap = new HashMap<>(); // Create a new inner HashMap if it doesn't exist + resources.put(resourceName, resourceMap); + } + + Resource resource = resourceMap.get(resourceID); + if (resource != null) { + // If the resource already exists, update its duration + if (duration > resource.getDuration()) { + resource.setDuration(duration); + } + message = "Resource already exist! So duration is updated: " + resourceID + " : " + resourceName + " : " + duration; + } else { + // Add the new resource + resource = new Resource(resourceID, resourceName, duration); + resourceMap.put(resourceID, resource); + message = "New Resource of " + resourceName +" is added: " + resourceID + " : " + resourceName + " : " + duration; + } + +// if(resourceWaitingQueues.containsKey(resourceID)) { +// List<String> coordinators = resourceWaitingQueues.get(resourceID); +// for(int i=0; i<coordinators.size(); i++) { +// if(coordinators.get(i).split("-").length > 1) { +// String crID = coordinators.get(i).split("-")[0]; +// int dur = Integer.parseInt(coordinators.get(i).split("-")[1]); +// Resource res = resourceMap.get(resourceID); +// +// if(res.getDuration() >= dur) { +//// activityLogger.log(" ADD RESOURCE - Resource allocation to waiting coordinator", " COMPLETED ", "Resource " + +//// resourceID + " is allocated to waiting coordinator " + crID); +// res.setDuration(res.getDuration() - dur); +// resourceMap.put(resourceID, res); +// coordinators.remove(i); +// break; +// } +// } +// +// } +// } +// activityLogger.log(" ADD RESOURCE (" + resourceID + ", " + resourceName + ", " + duration + ") " , " COMPLETED ", message); + + return message; + } + + public synchronized String removeResource(String resourceID, int duration) { + String message = ""; + +// boolean isIdValid = ValidationService.isResourceIDValid(resourceID); +// +// if(!isIdValid ) { +// message = "Input is not valid for ID: " + resourceID + " " + duration; +// activityLogger.log(" ADD RESOURCE (" + resourceID + ", " + duration + ") " , " FAILED ", message); +// return message; +// } + + for (HashMap<String, Resource> resourceMap : resources.values()) { + Resource resource = resourceMap.get(resourceID); + + if (resource != null) { + if (duration >= resource.getDuration()) { + resourceMap.remove(resourceID); // Remove if duration is fully used + message = "Resource of ID: " + resourceID + " with duration " + resource.getDuration() + " is successfully removed."; + activityLogger.log(" REMOVE RESOURCE (" + resourceID + ", " + duration + ") " , " COMPLETED ", message); + return message; + } else { + resource.setDuration(resource.getDuration() - duration); // Reduce the duration + message = "Duration of the Resource of ID: " + resourceID + " with duration " + duration + " is successfully reduced."; + activityLogger.log(" REMOVE RESOURCE (" + resourceID + ", " + duration + ") " , " COMPLETED ", message); + return message; + } + } + } + message = "Resource with ID " + resourceID + " is not found!"; + activityLogger.log(" REMOVE RESOURCE (" + resourceID + ", " + duration + ") " , " FAILED ", message); + + return "Resource not found"; + } + + public synchronized String listResourceAvailability(String resourceName) { + String message = ""; + +// boolean isNameValid = ValidationService.isResourceNameValid(resourceName); +// +// if(!isNameValid ) { +// message = "Input is not valid for name: " + resourceName; +// activityLogger.log(" LIST RESOURCE AVAILABILITY (" + resourceName + ") " , " FAILED ", message); +// return message; +// } + + final StringBuilder result = new StringBuilder(); + + result.append(resourceName.toUpperCase()); + CountDownLatch latch = new CountDownLatch(serverList.size() - 1); + + for(String server: serverList) { + System.out.println(serverID + " " + server); + + if(serverID.equalsIgnoreCase(server)) { + result.append(getAvailableResources(resourceName)); + System.out.println("listResourceAvailability same server:" + result); + }else { + System.out.println("listResourceAvailability request region:" + result); + new Thread(() -> { + System.out.println("listResourceAvailability Thread:" + result); + final String count = sendResourceAvailabilityRequest(server, resourceName); + System.out.println(count); + if(count != null) + synchronized(result) { + result.append(count); + } + latch.countDown(); + }).start(); + } + } + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + activityLogger.log(" LIST RESOURCE AVAILABILITY (" + resourceName + ") " , " COMPLETED ", result.toString()); + System.out.println("printing: " + result.toString()); + return result.toString().trim(); + } + + public String getAvailableResources(String resourceName) { + String result = ""; + + HashMap<String, Resource> resourceMap = resources.get(resourceName); + if (resourceMap == null) { + return result; + } + + for (Resource resource : resourceMap.values()) { + result += " " + resource.getResourceID() + "-" + resource.getDuration(); + } + + return result; + } + + private String sendResourceAvailabilityRequest(final String location, String resourceName) { + System.out.println("sendResourceAvailabilityRequest:"); + String recordCount = ""; + try { + final InetAddress inetAddress = InetAddress.getByName("localhost"); + activityLogger.log(location, " UPD MESSAGE SENT ", "UPD Message sent for Resource Availability"); + + final DatagramSocket socket = new DatagramSocket(); + byte[] data = (serverID + "-GET_AVAILABLE_RESOURCES" + "-" + resourceName ).toString().getBytes(); + + final DatagramPacket packet = new DatagramPacket(data, data.length, inetAddress, PortConstants.getUdpPort(location)) ; + socket.send(packet); + data = new byte[1000]; + DatagramPacket responsePacket = new DatagramPacket(data, data.length); + socket.receive(responsePacket); + recordCount = " " + new String(responsePacket.getData(), 0, responsePacket.getLength()).trim(); + socket.close(); + } catch (Exception e) { + activityLogger.log("", "EXCEPTION", e.getMessage()); + } + return recordCount; + } + + public synchronized String requestResource(String coordinatorID, String resourceID, int duration) { + // Iterate through the outer HashMap to find the resource by its ID + String message = ""; + final StringBuilder result = new StringBuilder(); + +// boolean isUserIdValid = ValidationService.isUserIDValid(coordinatorID); +// boolean isIdValid = ValidationService.isResourceIDValid(resourceID); +// +// if(!(isIdValid && isUserIdValid)) { +// message = "Input is not valid for ID: " + resourceID + " " + resourceID + " " + duration; +// activityLogger.log(" ADD RESOURCE (" + resourceID + ", " + resourceID + ", " + duration + ") " , " FAILED ", message); +// return message; +// } + + CountDownLatch latch = new CountDownLatch(1); + + for (HashMap<String, Resource> resourceMap : resources.values()) { + Resource resource = resourceMap.get(resourceID); + if (resource != null && resource.getDuration() >= duration) { + // If resource is found and has sufficient duration, reduce it + + resource.setDuration(resource.getDuration() - duration); + if(coordinatorResources.containsKey(coordinatorID)) { + Resource rs = new Resource(resourceID, resource.getResourceName(), duration); + coordinatorResources.get(coordinatorID).add(rs); + }else { + Resource rs = new Resource(resourceID, resource.getResourceName(), duration); + List<Resource> lst = new ArrayList<Resource>(); + lst.add(rs); + System.out.println(lst); + coordinatorResources.put(coordinatorID, lst); + System.out.println(coordinatorResources.keySet()); + } + message = "Coordinator of ID " + coordinatorID + " borrowed resource of ID " + resourceID + " " + "from same server"; + + activityLogger.log(" REQUEST RESOURCE (" + coordinatorID + ", " + resourceID + ", " + duration + ") " , " COMPLETED ", message); + return message; + } else if (resource != null) { + message = "Insufficient resource duration for resource of ID " + resourceID + " " + " in the same server" ; + activityLogger.log(" REQUEST RESOURCE (" + coordinatorID + ", " + resourceID + ", " + duration + ") " , " FAILED ", message); + if(resourceWaitingQueues.containsKey(resourceID)) { + resourceWaitingQueues.get(resourceID).add(coordinatorID + "-" + duration); + }else { + List<String> lst = new ArrayList<String>(); + lst.add(coordinatorID + "-" + duration); + resourceWaitingQueues.put(resourceID, lst); + } + return message; + } + } + + for(String server: serverList) { + if(!server.equalsIgnoreCase(serverID) && resourceID.substring(0,3).equalsIgnoreCase(server)) { + new Thread(() -> { + final String count = sendResourceRequest(server, resourceID, duration); + if(count != null) { + if(count.trim().length() > 0) { + if(coordinatorResources.containsKey(coordinatorID)) { + Resource rs = new Resource(resourceID, "", duration); + coordinatorResources.get(coordinatorID).add(rs); + }else { + List<Resource> lst = new ArrayList<Resource>(); + Resource rs = new Resource(resourceID, "", duration); + lst.add(rs); + coordinatorResources.put(coordinatorID, lst); + } + }else { + if(resourceWaitingQueues.containsKey(resourceID)) { + resourceWaitingQueues.get(resourceID).add(coordinatorID); + }else { + List<String> lst = new ArrayList<String>(); + lst.add(coordinatorID); + resourceWaitingQueues.put(resourceID, lst); + } + + } + synchronized(result) { + result.append(count); + } + } + latch.countDown(); + }).start(); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + System.out.println(coordinatorResources.toString()); + + // Request to other servers for getting specified resource + if(result.toString().trim().length() == 0) { + message = "Requested Resource of ID " + resourceID + " by coordinator " + coordinatorID + " is not found! so Coordinator is added to waiting queue!"; + activityLogger.log(" REQUEST RESOURCE (" + coordinatorID + ", " + resourceID + ", " + duration + ") " , " COMPLETED ", message); + return message; + } + + activityLogger.log(" REQUEST RESOURCE (" + coordinatorID + ", " + resourceID + ", " + duration + ") " , " COMPLETED ", message); + return result.toString(); + + } + + private String sendResourceRequest(final String location, String resourceID, int duration) { + + String recordCount = ""; + try { + final InetAddress inetAddress = InetAddress.getByName("localhost"); +// activityLogger.log(location, " UPD MESSAGE SENT ", "UPD Message sent for Request Resource"); + final DatagramSocket socket = new DatagramSocket(); + byte[] data = (serverID + "-GET_RESOURCE" + "-" + resourceID + "-" + duration).toString().getBytes(); + + final DatagramPacket packet = new DatagramPacket(data, data.length, inetAddress, PortConstants.getUdpPort(location)) ; + socket.send(packet); + System.out.println("inside the request udp: "+ location + " "+ data.toString()); + data = new byte[1000]; + + DatagramPacket responsePacket = new DatagramPacket(data, data.length); + socket.receive(responsePacket); + recordCount = " " + new String(responsePacket.getData(), 0, responsePacket.getLength()).trim(); + System.out.println("after received the request udp: "+ location + " "+ data.toString()); + + socket.close(); + } catch (Exception e) { + } + return recordCount; + } + + public String getRequestedResource(String id, int duration) { + String message = ""; + for (HashMap<String, Resource> resourceMap : resources.values()) { + Resource resource = resourceMap.get(id); + + if (resource != null) { + if(resource.getDuration() >= duration) { + resource.setDuration(resource.getDuration() - duration); + message = "Resource of ID: " + id + " is borrowed successfully and duration is updated to " + resource.getDuration() + " in " + serverID + "server" ; + activityLogger.log("REQUEST RESOURCE", " COMPLETED " , message); + return message; + } + } + } + message = "Resource of ID: " + id + " is not available to borrow!"; + activityLogger.log("REQUEST RESOURCE", " FAILED " , message); + return ""; + + } + + public String findResource(String coordinatorID, String resourceName){ + // Retrieve the inner HashMap for the resource name (e.g., "AMBULANCE") + final StringBuilder result = new StringBuilder(); + String message = ""; + +// boolean isUserIdValid = ValidationService.isUserIDValid(coordinatorID); +// boolean isNameValid = ValidationService.isResourceNameValid(resourceName); +// +// if(!(isNameValid && isUserIdValid)) { +// message = "Input is not valid for : " + coordinatorID + " " + resourceName; +// activityLogger.log(" FIND RESOURCE (" + coordinatorID + ", " + resourceName + ") " , " FAILED ", message); +// return message; +// } + + result.append(resourceName.toUpperCase()); + int startLen = result.length(); + + HashMap<String, Resource> specifiedResources = resources.get(resourceName); + if(specifiedResources != null) { + if(coordinatorResources.containsKey(coordinatorID)) { + for(Resource res: coordinatorResources.get(coordinatorID)) { + System.out.println("resID: " + res.getResourceID()); + if(specifiedResources.containsKey(res.getResourceID())) { + result.append(" " + res.getResourceID() + "-" + res.getDuration()); + } + } + } + } + + + CountDownLatch latch = new CountDownLatch(serverList.size() - 1); + final StringBuilder response = new StringBuilder(); + for(String server: serverList) { + + if(!serverID.equalsIgnoreCase(server)) { + new Thread(() -> { + final String count = sendResourceAvailabilityRequest(server, resourceName); + if(count != null) + synchronized(result) { + response.append(count); + } + latch.countDown(); + }).start(); + } + } + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + String responseString = response.toString().trim(); + String[] resources = responseString.split(" "); + if (resources != null) { + for (String resource : resources) { + // Split each resource by '-' + String[] parts = resource.split("-"); + + if (parts.length == 2) { + String resourceID = parts[0]; + String count = parts[1]; + + if(coordinatorResources.containsKey(coordinatorID)) { + for(Resource res: coordinatorResources.get(coordinatorID)) { + if(resourceID.equalsIgnoreCase(res.getResourceID())) { + result.append(" " + resourceID + "-" + res.getDuration()); + } + } + } + + } + } + } + if(startLen == result.length()) { + message = "Resources of " + resourceName+ " occupied by coordinator " + coordinatorID + " are not found!"; + activityLogger.log(" FIND RESOURCE (" + coordinatorID + ") " , " COMPLETED ", message); + return message; + } + + message = "Resources of " + resourceName+ " occupied by coordinator " + coordinatorID + " are returned successfully!"; + activityLogger.log(" FIND RESOURCE (" + coordinatorID + ") " , " COMPLETED ", message); + System.out.println(result); + + return result.toString(); + } + + public synchronized String returnResource(String coordinatorID, String resourceID){ + // Iterate through the outer HashMap to find the resource by its ID + String message = ""; +// boolean isUserIdValid = ValidationService.isUserIDValid(coordinatorID); +// boolean isIDValid = ValidationService.isResourceIDValid(resourceID); +// +// if(!(isIDValid && isUserIdValid)) { +// message = "Input is not valid for : " + coordinatorID + " " + resourceID; +// activityLogger.log(" RETURN RESOURCE (" + coordinatorID + ", " + resourceID + ") " , " FAILED ", message); +// return message; +// } + +// for (HashMap<String, Resource> resourceMap : resources.values()) { +// Resource resource = resourceMap.get(resourceID); +// if (resource != null) { + // Perform any logic to return the resource + if(coordinatorResources.containsKey(coordinatorID)) { + for(Resource res: coordinatorResources.get(coordinatorID)) { + if(res.getResourceID().equalsIgnoreCase(resourceID)) { + message = "Resource " + resourceID + "-" + res.getDuration() + " returned successfully to coordinator " + coordinatorID; + activityLogger.log(" RETURN RESOURCE (" + coordinatorID + ", " + resourceID + ") " , " COMPLETED ", message); + return message; + } + } + } +// } +// } + message = "Resource of ID: " + resourceID + " not found."; + activityLogger.log(" RETURN RESOURCE (" + coordinatorID + ", " + resourceID + ") " , " FAILED ", message); + return message; + } + + public synchronized String swapResource(String coordinatorID, String oldResourceID, String oldResourceType, String newResourceID, String newResourceType) { + String message = ""; + int durationRequest = 0; + String returnedResource = returnResource(coordinatorID, oldResourceID); + if(returnedResource.contains("not found")) { + message = "Coordinator of ID: " + coordinatorID + " didn't acquired Resource of ID: " + oldResourceID + "."; +// activityLogger.log(" SWAP RESOURCE (" + coordinatorID + ", " + oldResourceID + ") " , " FAILED ", message); + return message; + } + if(coordinatorResources.containsKey(coordinatorID)) { + for(Resource res: coordinatorResources.get(coordinatorID)) { + if(res.getResourceID().equals(oldResourceID)) { + durationRequest = res.getDuration(); + coordinatorResources.get(coordinatorID).remove(res); + break; + } + } + } + + String requestedResource = requestResource(coordinatorID, newResourceID, durationRequest); + if(requestedResource.contains("not found")) { + message = "Coordinator of ID: " + coordinatorID + " couldn't acquire Resource of ID: " + newResourceID + "."; + activityLogger.log(" SWAP RESOURCE (" + coordinatorID + ", " + oldResourceID + ") " , " FAILED ", message); + return message; + } + message = "Coordinator of ID: " + coordinatorID + " Successfully swapped old resource " + oldResourceID + " with new resource " + newResourceID; +// activityLogger.log(" SWAP RESOURCE (" + coordinatorID + ", " + oldResourceID + ", " + oldResourceType + ", " + newResourceID + ", " + newResourceType + ", "+ ") ", +// " COMPLETED ", message); + + return message; + } + +} diff --git a/src/main/java/derms/replica3/Resource.java b/src/main/java/derms/replica3/Resource.java new file mode 100644 index 0000000..d4fc424 --- /dev/null +++ b/src/main/java/derms/replica3/Resource.java @@ -0,0 +1,32 @@ +package derms.replica3; + +import java.io.Serializable; + +public class Resource implements Serializable { + private String resourceID; + private String resourceName; + private int duration; + + public Resource(String resourceID, String resourceName, int duration) { + this.resourceID = resourceID; + this.resourceName = resourceName; + this.duration = duration; + } + + public String getResourceID() { + return resourceID; + } + + public String getResourceName() { + return resourceName; + } + + public int getDuration() { + return duration; + } + + public void setDuration(int duration) { + this.duration = duration; + } +} + diff --git a/src/main/java/derms/replica3/SHEServer.java b/src/main/java/derms/replica3/SHEServer.java new file mode 100644 index 0000000..9d29a70 --- /dev/null +++ b/src/main/java/derms/replica3/SHEServer.java @@ -0,0 +1,112 @@ +package derms.replica3; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; + +import javax.xml.ws.Endpoint; + +//import constants.Constants; +//import constants.PortConstants; + +public class SHEServer { + public static void main(String[] args) { + + try{ +// final ActivityLoggerService activityLogger = new ActivityLoggerService(FileConstants.SERVER_LOG_FILE_PATH + LocationConstants.DOLLARD + "/" + FileConstants.ACTIVITY_LOG); + + final Logger activityLogger = new Logger("SHEServer.log"); + int udpPort = PortConstants.getUdpPort("SHE"); + City ct = new City("SHE"); + + final Replica3 ServerImpl = new Replica3(ct, null); +// Endpoint endpoint = Endpoint.publish(Constants.SHERBROOKE_ADDRESS, ServerImpl); + + /*Start UDP server for SHE*/ + new Thread(() -> { + startUdpServer(activityLogger, ServerImpl, udpPort); + }).start(); + + System.out.println("#=== SHE Server is started ===#"); + + + } catch (Exception e) { + System.err.println("ERROR: " + e); + e.printStackTrace(System.out); + } + } + + private static void startUdpServer(final Logger activityLogger, final Replica3 server, int udpPort) { + DatagramSocket socket = null; + try { + socket = new DatagramSocket(udpPort); + while (true) { + try { + final byte[] data = new byte[1000]; + final DatagramPacket packet = new DatagramPacket(data, data.length); + socket.receive(packet); + System.out.println("inside the received packet she"); + new Thread(() -> { + processRequest(activityLogger, server, packet); + }).start(); + } catch (IOException e) { + activityLogger.log("", "EXCEPTION", e.getMessage()); + } + } + } catch (SocketException e1) { + activityLogger.log("", "EXCEPTION", e1.getMessage()); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + private static void processRequest(final Logger activityLogger, final Replica3 server, final DatagramPacket packet) { + byte[] response; + DatagramSocket socket = null; + final String request = new String(packet.getData(), StandardCharsets.UTF_8).trim(); + final String[] packetData = request.split("-"); + final String sourceServer = packetData[0].trim(); + final String action = packetData[1].trim(); + final String resourceName = packetData[2].trim(); + String resourceID = ""; + int duration = 0; + System.out.println(" action:" + action); + if(packetData.length == 4) { + resourceID = packetData[2].trim(); + duration = Integer.parseInt(packetData[3]); + } + + try { + socket = new DatagramSocket(); + + if("GET_AVAILABLE_RESOURCES".equalsIgnoreCase(action)) { + response = sanitizeXml(server.getAvailableResources(resourceName).toString()).getBytes(StandardCharsets.UTF_8); + socket.send(new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort())); + }else if("GET_RESOURCE".equalsIgnoreCase(action)) { + System.out.println("inside action:" + action); + response = server.getRequestedResource(resourceID, duration).toString().getBytes(); + socket.send(new DatagramPacket(response, response.length, packet.getAddress(), packet.getPort())); + } + + } catch (IOException e) { + activityLogger.log("", "EXCEPTION", e.getMessage()); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + private static String sanitizeXml(String input) { + if (input == null) { + return ""; // or handle as needed + } + + return input.replaceAll("[\\x00]", ""); + } + +} |