From 53bd785304f4ecb62f46e19f02183a858aebe027 Mon Sep 17 00:00:00 2001 From: ShazaAli Date: Sun, 1 Dec 2024 08:23:40 -0500 Subject: running all. error comm between FE Replica --- src/main/java/derms/Replica1.java | 160 +++++++++++++---------- src/main/java/derms/ReplicaManager.java | 89 ++++++++----- src/main/java/derms/ReplicaRunner.java | 22 ++-- src/main/java/derms/Response.java | 5 + src/main/java/derms/Sequencer.java | 1 + src/main/java/derms/client/ResponderClient.java | 8 +- src/main/java/derms/frontend/FE.java | 9 +- src/main/java/derms/replica2/City.java | 4 +- src/main/java/derms/replica2/Replica2.java | 6 +- src/main/java/derms/replica2/Resource.java | 2 +- src/main/java/derms/replica2/ReturnResource.java | 2 +- src/main/java/derms/replica2/SwapResource.java | 2 +- 12 files changed, 187 insertions(+), 123 deletions(-) (limited to 'src/main/java/derms') diff --git a/src/main/java/derms/Replica1.java b/src/main/java/derms/Replica1.java index 1df25a2..ad8b7a0 100644 --- a/src/main/java/derms/Replica1.java +++ b/src/main/java/derms/Replica1.java @@ -8,90 +8,114 @@ import java.net.InetSocketAddress; import java.util.logging.Logger; public class Replica1 implements Replica { - private final ReliableUnicastSender unicastSender; - ReplicaManager replicaManager; - private final Logger log; - private boolean alive = true; - - public Replica1( InetSocketAddress frontEndAddress) throws IOException { - this.unicastSender = new ReliableUnicastSender(frontEndAddress); - this.log = Logger.getLogger(getClass().getName()); - } - - @Override public boolean isAlive() { - return alive; + return false; } @Override public void startProcess() { - // Simulate the replica process starting. - log.info("Replica " + 1 + " started."); + } @Override public void processRequest(Request request) { - // Process the request and create a response. - endpoint1 = Endpoint.publish("http://localhost:8080/ws/derms", new DERMSServer("MTL")); - endpoint2 = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE")); - endpoint3 = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE")); - - String responseMessage; - switch (request.getFunction()) { - case "addResource": - responderClient = new ResponderClient(request.getClientID()); - responseMessage = responderClient.addResource(request.getResourceID(), request.getResourceType(), request.getDuration()); - break; - case "removeResource": - responderClient = new ResponderClient(request.getClientID()); - responseMessage = responderClient.removeResource(request.getResourceID(), request.getDuration()); - break; - case "listResourceAvailability": - responderClient = new ResponderClient(request.getClientID()); - responseMessage = responderClient.listResourceAvailability(request.getResourceType()); - break; - case "requestResource": - coordinatorClient = new CoordinatorClient(request.getClientID()); - responseMessage = coordinatorClient.requestResource(request.getClientID(), request.getResourceID(), request.getDuration()); - break; - case "findResource": - coordinatorClient = new CoordinatorClient(request.getClientID()); - responseMessage = coordinatorClient.findResource(request.getClientID(), request.getResourceType()); - break; - case "returnResource": - coordinatorClient = new CoordinatorClient(request.getClientID()); - responseMessage = coordinatorClient.returnResource(request.getClientID(), request.getResourceID()); - break; - case "swapResource": - coordinatorClient = new CoordinatorClient(request.getClientID()); - responseMessage = coordinatorClient.swapResource( - request.getClientID(), - request.getOldResourceID(), - request.getOldResourceType(), - request.getResourceID(), - request.getResourceType() - ); - break; - default: - responseMessage = "Unrecognized function: " + request.getFunction(); - log.severe("Unrecognized function in request: " + request.getFunction()); - break; - } - - Response response = new Response(request.getSequenceNumber(), responseMessage); - log.info("Replica " + 1 + " processed request: " + request + ", response: " + response); - replicaManager.sendResponseToFE(response); } @Override public void restart() { - // Restart the replica process. - log.warning("Replica " + 1 + " is restarting..."); - startProcess(); + } @Override - public int getId() { return 1; } + public int getId() { + return 0; + } +// private final ReliableUnicastSender unicastSender; +// ReplicaManager replicaManager; +// private final Logger log; +// private boolean alive = true; +// +// public Replica1( InetSocketAddress frontEndAddress) throws IOException { +// this.unicastSender = new ReliableUnicastSender(frontEndAddress); +// this.log = Logger.getLogger(getClass().getName()); +// } +// +// +// @Override +// public boolean isAlive() { +// return alive; +// } +// +// @Override +// public void startProcess() { +// // Simulate the replica process starting. +// log.info("Replica " + 1 + " started."); +// } +// +// @Override +// public void processRequest(Request request) { +// // Process the request and create a response. +// endpoint1 = Endpoint.publish("http://localhost:8080/ws/derms", new DERMSServer("MTL")); +// endpoint2 = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE")); +// endpoint3 = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE")); +// +// +// String responseMessage; +// switch (request.getFunction()) { +// case "addResource": +// responderClient = new ResponderClient(request.getClientID()); +// responseMessage = responderClient.addResource(request.getResourceID(), request.getResourceType(), request.getDuration()); +// break; +// case "removeResource": +// responderClient = new ResponderClient(request.getClientID()); +// responseMessage = responderClient.removeResource(request.getResourceID(), request.getDuration()); +// break; +// case "listResourceAvailability": +// responderClient = new ResponderClient(request.getClientID()); +// responseMessage = responderClient.listResourceAvailability(request.getResourceType()); +// break; +// case "requestResource": +// coordinatorClient = new CoordinatorClient(request.getClientID()); +// responseMessage = coordinatorClient.requestResource(request.getClientID(), request.getResourceID(), request.getDuration()); +// break; +// case "findResource": +// coordinatorClient = new CoordinatorClient(request.getClientID()); +// responseMessage = coordinatorClient.findResource(request.getClientID(), request.getResourceType()); +// break; +// case "returnResource": +// coordinatorClient = new CoordinatorClient(request.getClientID()); +// responseMessage = coordinatorClient.returnResource(request.getClientID(), request.getResourceID()); +// break; +// case "swapResource": +// coordinatorClient = new CoordinatorClient(request.getClientID()); +// responseMessage = coordinatorClient.swapResource( +// request.getClientID(), +// request.getOldResourceID(), +// request.getOldResourceType(), +// request.getResourceID(), +// request.getResourceType() +// ); +// break; +// default: +// responseMessage = "Unrecognized function: " + request.getFunction(); +// log.severe("Unrecognized function in request: " + request.getFunction()); +// break; +// } +// +// Response response = new Response(request.getSequenceNumber(), responseMessage); +// log.info("Replica " + 1 + " processed request: " + request + ", response: " + response); +// replicaManager.sendResponseToFE(response); +// } +// +// @Override +// public void restart() { +// // Restart the replica process. +// log.warning("Replica " + 1 + " is restarting..."); +// startProcess(); +// } +// +// @Override +// public int getId() { return 1; } } \ No newline at end of file diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 2bab635..0a727c1 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -2,9 +2,6 @@ package derms; import derms.frontend.FEInterface; import derms.Replica1; -import derms.Replica2; -import derms.Replica3; -import derms.Replica4; import derms.Request; import derms.Response; @@ -12,63 +9,63 @@ import derms.net.MessagePayload; import derms.net.tomulticast.TotalOrderMulticastReceiver; import derms.net.runicast.ReliableUnicastSender; import derms.net.tomulticast.TotalOrderMulticastSender; - +import derms.replica2.Replica2; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; +import java.io.ObjectOutputStream; +import java.io.ObjectInputStream; import java.util.logging.Logger; public class ReplicaManager { private final int replicaId; private Replica replica; private Response response; - private final FEInterface frontEnd; private final Logger log; - private ReliableUnicastSender unicastSender; - + private ReliableUnicastSender unicastSender; private TotalOrderMulticastReceiver multicastReceiver; + private final InetSocketAddress frontEndAddress; - public ReplicaManager(int replicaId, FEInterface frontEnd) throws IOException { + public ReplicaManager(int replicaId) throws IOException { this.replicaId = replicaId; - this.frontEnd = frontEnd; this.log = Logger.getLogger(getClass().getName()); + this.frontEndAddress = new InetSocketAddress("localhost", 1999); initReplica(); initMulticastReceiver(); startHeartbeatThread(); } private void initReplica() throws IOException { - InetSocketAddress frontEndAddress = new InetSocketAddress("localhost", 1999); - switch (replicaId) - case 1: - replica = new Replica1(frontEndAddress); - break; - case 2: - replica = new Replica2(); - break; - case 3: - replica = new Replica3( frontEndAddress); - break; - case 4: - replica = new Replica4(frontEndAddress); - break; + switch (replicaId) { + case 1: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + case 2: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + case 3: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + case 4: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + } replica.startProcess(); } - - private void initMulticastReceiver() throws IOException { - InetSocketAddress group = new InetSocketAddress("230.0.0.0", 4446); // Example multicast group and port for receiving requests + InetSocketAddress group = Config.group; InetAddress localAddress = InetAddress.getLocalHost(); // Your local address multicastReceiver = new TotalOrderMulticastReceiver(group, localAddress); - new Thread(() -> { while (true) { try { MessagePayload receivedPayload = multicastReceiver.receive(); Request request = (Request) receivedPayload; + log.info("Received request: " + request); replica.processRequest(request); } catch (InterruptedException e) { log.severe("Failed to receive request: " + e.getMessage()); @@ -80,12 +77,11 @@ public class ReplicaManager { }).start(); } - private void startHeartbeatThread() { new Thread(() -> { while (true) { if (!replica.isAlive()) { - frontEnd.informRmIsDown(replica.getId()); + informFrontEndRmIsDown(replica.getId()); replica.restart(); } try { @@ -104,12 +100,45 @@ public class ReplicaManager { log.severe("Failed to send response to FE: " + e.getMessage()); } } + public void handleByzantineFailure() { log.severe("Byzantine failure detected in Replica " + replica.getId()); replica.restart(); - frontEnd.informRmHasBug(replica.getId()); + informFrontEndRmHasBug(replica.getId()); } + private void informFrontEndRmIsDown(int replicaId) { + try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort()); + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) { + out.writeObject("RM_DOWN:" + replicaId); + } catch (IOException e) { + log.severe("Failed to inform FE that RM is down: " + e.getMessage()); + } + } + private void informFrontEndRmHasBug(int replicaId) { + try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort()); + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) { + out.writeObject("RM_BUG:" + replicaId); + } catch (IOException e) { + log.severe("Failed to inform FE that RM has a bug: " + e.getMessage()); + } + } + + public static void main(String[] args) { + if (args.length != 1) { + System.err.println("Usage: java ReplicaManager "); + System.exit(1); + } + int replicaId = Integer.parseInt(args[0]); + + try { + ReplicaManager replicaManager = new ReplicaManager(replicaId); + System.out.println("ReplicaManager " + replicaId + " is running."); + } catch (IOException e) { + System.err.println("Failed to start ReplicaManager: " + e.getMessage()); + e.printStackTrace(); + } + } } \ No newline at end of file diff --git a/src/main/java/derms/ReplicaRunner.java b/src/main/java/derms/ReplicaRunner.java index a17fbf1..2372090 100644 --- a/src/main/java/derms/ReplicaRunner.java +++ b/src/main/java/derms/ReplicaRunner.java @@ -13,16 +13,16 @@ public class ReplicaRunner { int replicaId = Integer.parseInt(args[0]); System.out.println("Starting ReplicaManager for Replica " + replicaId); - try { - FEInterface frontEnd = new FE(); // Assume FE implements FEInterface - ReplicaManager replicaManager = new ReplicaManager(replicaId, frontEnd); - - // Simulate receiving and handling client requests - // Add logic to listen for client requests and forward them to replicaManager - - } catch (IOException e) { - System.err.println("Failed to start ReplicaManager: " + e.getMessage()); - e.printStackTrace(); - } +// try { +// FEInterface frontEnd = new FE(); // Assume FE implements FEInterface +// ReplicaManager replicaManager = new ReplicaManager(replicaId, frontEnd); +// +// // Simulate receiving and handling client requests +// // Add logic to listen for client requests and forward them to replicaManager +// +// } catch (IOException e) { +// System.err.println("Failed to start ReplicaManager: " + e.getMessage()); +// e.printStackTrace(); +// } } } \ No newline at end of file diff --git a/src/main/java/derms/Response.java b/src/main/java/derms/Response.java index ba2c05c..f9bcb02 100644 --- a/src/main/java/derms/Response.java +++ b/src/main/java/derms/Response.java @@ -31,6 +31,11 @@ public class Response implements MessagePayload { setDuration(Integer.parseInt(messageParts[9])); } + public Response(int sequenceID, String status) { + setSequenceID(sequenceID); + setResponse(status); + } + @Override public int hash() { return sequenceID + response.hashCode() + rmNumber + function.hashCode() diff --git a/src/main/java/derms/Sequencer.java b/src/main/java/derms/Sequencer.java index a432c2c..530e0c1 100644 --- a/src/main/java/derms/Sequencer.java +++ b/src/main/java/derms/Sequencer.java @@ -69,6 +69,7 @@ public class Sequencer implements Runnable { for (;;) { try { Request req = in.receive(); + System.out.println("Seq: received: " + req); out.send(req); } catch (InterruptedException e) { log.info("Shutting down."); diff --git a/src/main/java/derms/client/ResponderClient.java b/src/main/java/derms/client/ResponderClient.java index 78fc327..0144a95 100644 --- a/src/main/java/derms/client/ResponderClient.java +++ b/src/main/java/derms/client/ResponderClient.java @@ -10,10 +10,10 @@ public class ResponderClient extends Client { } public static void main(String[] args) { -// if (args.length < 1) { -// System.err.println("Missing argument 'FE host'"); -// System.exit(1); -// } + if (args.length < 1) { + System.err.println("Missing argument 'FE host'"); + System.exit(1); + } // String FEhost = args[0]; String FEhost = "localhost"; diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java index ca5374d..58d934b 100644 --- a/src/main/java/derms/frontend/FE.java +++ b/src/main/java/derms/frontend/FE.java @@ -15,9 +15,9 @@ import java.util.concurrent.atomic.AtomicInteger; //import constants.Constants; public class FE { - private static final String sequencerIP = "localhost"; + private static String sequencerIP = "localhost"; private static ReliableUnicastSender sequencerSock; - private static final String RM_Multicast_group_address = "230.1.1.10"; + private static final String RM_Multicast_group_address = Config.group.toString(); private static final int FE_PORT = 1999; private static final int RM_Multicast_Port = 1234; public static String FE_Address = "http://localhost:8067/"+DERMSInterface.class.getSimpleName(); @@ -28,6 +28,7 @@ public class FE { public static void main(String[] args) { try { + sequencerIP = args[0]; System.out.println("Connecting to sequencer (" + sequencerIP + ":" + Config.sequencerInPort + ")..."); sequencerSock = new ReliableUnicastSender( @@ -54,7 +55,9 @@ public class FE { @Override public int sendRequestToSequencer(Request myRequest) { - return sendUnicastToSequencer(myRequest); + int r = sendUnicastToSequencer(myRequest); + System.out.println("request: " + myRequest + " returned " + r); + return r; } @Override diff --git a/src/main/java/derms/replica2/City.java b/src/main/java/derms/replica2/City.java index 74535ae..a5a2f4c 100644 --- a/src/main/java/derms/replica2/City.java +++ b/src/main/java/derms/replica2/City.java @@ -2,7 +2,7 @@ package derms.replica2; import java.io.Serializable; -class City implements Serializable { +public class City implements Serializable { static final int codeLen = 3; private String code; @@ -13,7 +13,7 @@ class City implements Serializable { this.code = code; } - City() { + public City() { this("XXX"); } diff --git a/src/main/java/derms/replica2/Replica2.java b/src/main/java/derms/replica2/Replica2.java index 25609cc..666ee1b 100644 --- a/src/main/java/derms/replica2/Replica2.java +++ b/src/main/java/derms/replica2/Replica2.java @@ -1,6 +1,7 @@ package derms.replica2; import derms.Replica; +import derms.ReplicaManager; import derms.Request; import derms.Response; import sun.reflect.generics.reflectiveObjects.NotImplementedException; @@ -23,14 +24,15 @@ public class Replica2 implements Replica { private final ResponderServer responderServer; private final CoordinatorServer coordinatorServer; private boolean alive; + private final ReplicaManager replicaManager; - public Replica2(City city) throws IOException { + public Replica2(City city, ReplicaManager replicaManager) throws IOException { this.city = city; this.localAddr = InetAddress.getLocalHost(); this.resources = new Resources(); this.servers = new Servers(); this.log = DermsLogger.getLogger(getClass()); - + this.replicaManager = replicaManager; try { this.responderServer = new ResponderServer(city, resources, servers); } catch (IOException e) { diff --git a/src/main/java/derms/replica2/Resource.java b/src/main/java/derms/replica2/Resource.java index 31d40bc..404cfb0 100644 --- a/src/main/java/derms/replica2/Resource.java +++ b/src/main/java/derms/replica2/Resource.java @@ -20,7 +20,7 @@ class Resource implements Serializable { } Resource(ResourceID id, ResourceType type, int duration) { - this(id, type, duration, false, new CoordinatorID(), -1); + this(id, type, duration, false, new CoordinatorID("XXX", (short) 1), -1); } Resource() { diff --git a/src/main/java/derms/replica2/ReturnResource.java b/src/main/java/derms/replica2/ReturnResource.java index 6c42b8a..6adac50 100644 --- a/src/main/java/derms/replica2/ReturnResource.java +++ b/src/main/java/derms/replica2/ReturnResource.java @@ -169,7 +169,7 @@ class ReturnResource { request.resourceID+" is not borrowed by "+request.coordinatorID); } resource.isBorrowed = false; - resource.borrower = new CoordinatorID(); + resource.borrower = request.coordinatorID; resource.borrowDuration = -1; return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id); } diff --git a/src/main/java/derms/replica2/SwapResource.java b/src/main/java/derms/replica2/SwapResource.java index cc65f29..fbd2cf9 100644 --- a/src/main/java/derms/replica2/SwapResource.java +++ b/src/main/java/derms/replica2/SwapResource.java @@ -219,7 +219,7 @@ class SwapResource { private void returnOldResource(Resource r) { r.isBorrowed = false; - r.borrower = new CoordinatorID(); + r.borrower = request.cid; r.borrowDuration = -1; } } -- cgit v1.2.3