summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorShazaAli <shazamamdouh@aucegypt.edu>2024-12-01 08:23:40 -0500
committerShazaAli <shazamamdouh@aucegypt.edu>2024-12-01 08:23:40 -0500
commit53bd785304f4ecb62f46e19f02183a858aebe027 (patch)
treeadbdb888329fab8f406e6eed6c9cb88508d9d1bd /src/main/java
parent66f256b05c7daa7c4cd20f20758b8413a9329500 (diff)
downloadsoen423-53bd785304f4ecb62f46e19f02183a858aebe027.zip
running all. error comm between FE Replica
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/derms/Replica1.java160
-rw-r--r--src/main/java/derms/ReplicaManager.java89
-rw-r--r--src/main/java/derms/ReplicaRunner.java22
-rw-r--r--src/main/java/derms/Response.java5
-rw-r--r--src/main/java/derms/Sequencer.java1
-rw-r--r--src/main/java/derms/client/ResponderClient.java8
-rw-r--r--src/main/java/derms/frontend/FE.java9
-rw-r--r--src/main/java/derms/replica2/City.java4
-rw-r--r--src/main/java/derms/replica2/Replica2.java6
-rw-r--r--src/main/java/derms/replica2/Resource.java2
-rw-r--r--src/main/java/derms/replica2/ReturnResource.java2
-rw-r--r--src/main/java/derms/replica2/SwapResource.java2
12 files changed, 187 insertions, 123 deletions
diff --git a/src/main/java/derms/Replica1.java b/src/main/java/derms/Replica1.java
index 1df25a2..ad8b7a0 100644
--- a/src/main/java/derms/Replica1.java
+++ b/src/main/java/derms/Replica1.java
@@ -8,90 +8,114 @@ import java.net.InetSocketAddress;
import java.util.logging.Logger;
public class Replica1 implements Replica {
- private final ReliableUnicastSender<Response> unicastSender;
- ReplicaManager replicaManager;
- private final Logger log;
- private boolean alive = true;
-
- public Replica1( InetSocketAddress frontEndAddress) throws IOException {
- this.unicastSender = new ReliableUnicastSender<Response>(frontEndAddress);
- this.log = Logger.getLogger(getClass().getName());
- }
-
-
@Override
public boolean isAlive() {
- return alive;
+ return false;
}
@Override
public void startProcess() {
- // Simulate the replica process starting.
- log.info("Replica " + 1 + " started.");
+
}
@Override
public void processRequest(Request request) {
- // Process the request and create a response.
- endpoint1 = Endpoint.publish("http://localhost:8080/ws/derms", new DERMSServer("MTL"));
- endpoint2 = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
- endpoint3 = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
-
- String responseMessage;
- switch (request.getFunction()) {
- case "addResource":
- responderClient = new ResponderClient(request.getClientID());
- responseMessage = responderClient.addResource(request.getResourceID(), request.getResourceType(), request.getDuration());
- break;
- case "removeResource":
- responderClient = new ResponderClient(request.getClientID());
- responseMessage = responderClient.removeResource(request.getResourceID(), request.getDuration());
- break;
- case "listResourceAvailability":
- responderClient = new ResponderClient(request.getClientID());
- responseMessage = responderClient.listResourceAvailability(request.getResourceType());
- break;
- case "requestResource":
- coordinatorClient = new CoordinatorClient(request.getClientID());
- responseMessage = coordinatorClient.requestResource(request.getClientID(), request.getResourceID(), request.getDuration());
- break;
- case "findResource":
- coordinatorClient = new CoordinatorClient(request.getClientID());
- responseMessage = coordinatorClient.findResource(request.getClientID(), request.getResourceType());
- break;
- case "returnResource":
- coordinatorClient = new CoordinatorClient(request.getClientID());
- responseMessage = coordinatorClient.returnResource(request.getClientID(), request.getResourceID());
- break;
- case "swapResource":
- coordinatorClient = new CoordinatorClient(request.getClientID());
- responseMessage = coordinatorClient.swapResource(
- request.getClientID(),
- request.getOldResourceID(),
- request.getOldResourceType(),
- request.getResourceID(),
- request.getResourceType()
- );
- break;
- default:
- responseMessage = "Unrecognized function: " + request.getFunction();
- log.severe("Unrecognized function in request: " + request.getFunction());
- break;
- }
-
- Response response = new Response(request.getSequenceNumber(), responseMessage);
- log.info("Replica " + 1 + " processed request: " + request + ", response: " + response);
- replicaManager.sendResponseToFE(response);
}
@Override
public void restart() {
- // Restart the replica process.
- log.warning("Replica " + 1 + " is restarting...");
- startProcess();
+
}
@Override
- public int getId() { return 1; }
+ public int getId() {
+ return 0;
+ }
+// private final ReliableUnicastSender<Response> unicastSender;
+// ReplicaManager replicaManager;
+// private final Logger log;
+// private boolean alive = true;
+//
+// public Replica1( InetSocketAddress frontEndAddress) throws IOException {
+// this.unicastSender = new ReliableUnicastSender<Response>(frontEndAddress);
+// this.log = Logger.getLogger(getClass().getName());
+// }
+//
+//
+// @Override
+// public boolean isAlive() {
+// return alive;
+// }
+//
+// @Override
+// public void startProcess() {
+// // Simulate the replica process starting.
+// log.info("Replica " + 1 + " started.");
+// }
+//
+// @Override
+// public void processRequest(Request request) {
+// // Process the request and create a response.
+// endpoint1 = Endpoint.publish("http://localhost:8080/ws/derms", new DERMSServer("MTL"));
+// endpoint2 = Endpoint.publish("http://localhost:8081/ws/derms", new DERMSServer("QUE"));
+// endpoint3 = Endpoint.publish("http://localhost:8082/ws/derms", new DERMSServer("SHE"));
+//
+//
+// String responseMessage;
+// switch (request.getFunction()) {
+// case "addResource":
+// responderClient = new ResponderClient(request.getClientID());
+// responseMessage = responderClient.addResource(request.getResourceID(), request.getResourceType(), request.getDuration());
+// break;
+// case "removeResource":
+// responderClient = new ResponderClient(request.getClientID());
+// responseMessage = responderClient.removeResource(request.getResourceID(), request.getDuration());
+// break;
+// case "listResourceAvailability":
+// responderClient = new ResponderClient(request.getClientID());
+// responseMessage = responderClient.listResourceAvailability(request.getResourceType());
+// break;
+// case "requestResource":
+// coordinatorClient = new CoordinatorClient(request.getClientID());
+// responseMessage = coordinatorClient.requestResource(request.getClientID(), request.getResourceID(), request.getDuration());
+// break;
+// case "findResource":
+// coordinatorClient = new CoordinatorClient(request.getClientID());
+// responseMessage = coordinatorClient.findResource(request.getClientID(), request.getResourceType());
+// break;
+// case "returnResource":
+// coordinatorClient = new CoordinatorClient(request.getClientID());
+// responseMessage = coordinatorClient.returnResource(request.getClientID(), request.getResourceID());
+// break;
+// case "swapResource":
+// coordinatorClient = new CoordinatorClient(request.getClientID());
+// responseMessage = coordinatorClient.swapResource(
+// request.getClientID(),
+// request.getOldResourceID(),
+// request.getOldResourceType(),
+// request.getResourceID(),
+// request.getResourceType()
+// );
+// break;
+// default:
+// responseMessage = "Unrecognized function: " + request.getFunction();
+// log.severe("Unrecognized function in request: " + request.getFunction());
+// break;
+// }
+//
+// Response response = new Response(request.getSequenceNumber(), responseMessage);
+// log.info("Replica " + 1 + " processed request: " + request + ", response: " + response);
+// replicaManager.sendResponseToFE(response);
+// }
+//
+// @Override
+// public void restart() {
+// // Restart the replica process.
+// log.warning("Replica " + 1 + " is restarting...");
+// startProcess();
+// }
+//
+// @Override
+// public int getId() { return 1; }
} \ No newline at end of file
diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java
index 2bab635..0a727c1 100644
--- a/src/main/java/derms/ReplicaManager.java
+++ b/src/main/java/derms/ReplicaManager.java
@@ -2,9 +2,6 @@ package derms;
import derms.frontend.FEInterface;
import derms.Replica1;
-import derms.Replica2;
-import derms.Replica3;
-import derms.Replica4;
import derms.Request;
import derms.Response;
@@ -12,63 +9,63 @@ import derms.net.MessagePayload;
import derms.net.tomulticast.TotalOrderMulticastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;
-
+import derms.replica2.Replica2;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
import java.util.logging.Logger;
public class ReplicaManager {
private final int replicaId;
private Replica replica;
private Response response;
- private final FEInterface frontEnd;
private final Logger log;
- private ReliableUnicastSender <Response>unicastSender;
-
+ private ReliableUnicastSender<Response> unicastSender;
private TotalOrderMulticastReceiver multicastReceiver;
+ private final InetSocketAddress frontEndAddress;
- public ReplicaManager(int replicaId, FEInterface frontEnd) throws IOException {
+ public ReplicaManager(int replicaId) throws IOException {
this.replicaId = replicaId;
- this.frontEnd = frontEnd;
this.log = Logger.getLogger(getClass().getName());
+ this.frontEndAddress = new InetSocketAddress("localhost", 1999);
initReplica();
initMulticastReceiver();
startHeartbeatThread();
}
private void initReplica() throws IOException {
- InetSocketAddress frontEndAddress = new InetSocketAddress("localhost", 1999);
- switch (replicaId)
- case 1:
- replica = new Replica1(frontEndAddress);
- break;
- case 2:
- replica = new Replica2();
- break;
- case 3:
- replica = new Replica3( frontEndAddress);
- break;
- case 4:
- replica = new Replica4(frontEndAddress);
- break;
+ switch (replicaId) {
+ case 1:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ case 2:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ case 3:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ case 4:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ }
replica.startProcess();
}
-
-
private void initMulticastReceiver() throws IOException {
- InetSocketAddress group = new InetSocketAddress("230.0.0.0", 4446); // Example multicast group and port for receiving requests
+ InetSocketAddress group = Config.group;
InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress);
-
new Thread(() -> {
while (true) {
try {
MessagePayload receivedPayload = multicastReceiver.receive();
Request request = (Request) receivedPayload;
+ log.info("Received request: " + request);
replica.processRequest(request);
} catch (InterruptedException e) {
log.severe("Failed to receive request: " + e.getMessage());
@@ -80,12 +77,11 @@ public class ReplicaManager {
}).start();
}
-
private void startHeartbeatThread() {
new Thread(() -> {
while (true) {
if (!replica.isAlive()) {
- frontEnd.informRmIsDown(replica.getId());
+ informFrontEndRmIsDown(replica.getId());
replica.restart();
}
try {
@@ -104,12 +100,45 @@ public class ReplicaManager {
log.severe("Failed to send response to FE: " + e.getMessage());
}
}
+
public void handleByzantineFailure() {
log.severe("Byzantine failure detected in Replica " + replica.getId());
replica.restart();
- frontEnd.informRmHasBug(replica.getId());
+ informFrontEndRmHasBug(replica.getId());
}
+ private void informFrontEndRmIsDown(int replicaId) {
+ try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
+ ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
+ out.writeObject("RM_DOWN:" + replicaId);
+ } catch (IOException e) {
+ log.severe("Failed to inform FE that RM is down: " + e.getMessage());
+ }
+ }
+ private void informFrontEndRmHasBug(int replicaId) {
+ try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
+ ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
+ out.writeObject("RM_BUG:" + replicaId);
+ } catch (IOException e) {
+ log.severe("Failed to inform FE that RM has a bug: " + e.getMessage());
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ System.err.println("Usage: java ReplicaManager <replicaId>");
+ System.exit(1);
+ }
+ int replicaId = Integer.parseInt(args[0]);
+
+ try {
+ ReplicaManager replicaManager = new ReplicaManager(replicaId);
+ System.out.println("ReplicaManager " + replicaId + " is running.");
+ } catch (IOException e) {
+ System.err.println("Failed to start ReplicaManager: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
} \ No newline at end of file
diff --git a/src/main/java/derms/ReplicaRunner.java b/src/main/java/derms/ReplicaRunner.java
index a17fbf1..2372090 100644
--- a/src/main/java/derms/ReplicaRunner.java
+++ b/src/main/java/derms/ReplicaRunner.java
@@ -13,16 +13,16 @@ public class ReplicaRunner {
int replicaId = Integer.parseInt(args[0]);
System.out.println("Starting ReplicaManager for Replica " + replicaId);
- try {
- FEInterface frontEnd = new FE(); // Assume FE implements FEInterface
- ReplicaManager replicaManager = new ReplicaManager(replicaId, frontEnd);
-
- // Simulate receiving and handling client requests
- // Add logic to listen for client requests and forward them to replicaManager
-
- } catch (IOException e) {
- System.err.println("Failed to start ReplicaManager: " + e.getMessage());
- e.printStackTrace();
- }
+// try {
+// FEInterface frontEnd = new FE(); // Assume FE implements FEInterface
+// ReplicaManager replicaManager = new ReplicaManager(replicaId, frontEnd);
+//
+// // Simulate receiving and handling client requests
+// // Add logic to listen for client requests and forward them to replicaManager
+//
+// } catch (IOException e) {
+// System.err.println("Failed to start ReplicaManager: " + e.getMessage());
+// e.printStackTrace();
+// }
}
} \ No newline at end of file
diff --git a/src/main/java/derms/Response.java b/src/main/java/derms/Response.java
index ba2c05c..f9bcb02 100644
--- a/src/main/java/derms/Response.java
+++ b/src/main/java/derms/Response.java
@@ -31,6 +31,11 @@ public class Response implements MessagePayload {
setDuration(Integer.parseInt(messageParts[9]));
}
+ public Response(int sequenceID, String status) {
+ setSequenceID(sequenceID);
+ setResponse(status);
+ }
+
@Override
public int hash() {
return sequenceID + response.hashCode() + rmNumber + function.hashCode()
diff --git a/src/main/java/derms/Sequencer.java b/src/main/java/derms/Sequencer.java
index a432c2c..530e0c1 100644
--- a/src/main/java/derms/Sequencer.java
+++ b/src/main/java/derms/Sequencer.java
@@ -69,6 +69,7 @@ public class Sequencer implements Runnable {
for (;;) {
try {
Request req = in.receive();
+ System.out.println("Seq: received: " + req);
out.send(req);
} catch (InterruptedException e) {
log.info("Shutting down.");
diff --git a/src/main/java/derms/client/ResponderClient.java b/src/main/java/derms/client/ResponderClient.java
index 78fc327..0144a95 100644
--- a/src/main/java/derms/client/ResponderClient.java
+++ b/src/main/java/derms/client/ResponderClient.java
@@ -10,10 +10,10 @@ public class ResponderClient extends Client {
}
public static void main(String[] args) {
-// if (args.length < 1) {
-// System.err.println("Missing argument 'FE host'");
-// System.exit(1);
-// }
+ if (args.length < 1) {
+ System.err.println("Missing argument 'FE host'");
+ System.exit(1);
+ }
// String FEhost = args[0];
String FEhost = "localhost";
diff --git a/src/main/java/derms/frontend/FE.java b/src/main/java/derms/frontend/FE.java
index ca5374d..58d934b 100644
--- a/src/main/java/derms/frontend/FE.java
+++ b/src/main/java/derms/frontend/FE.java
@@ -15,9 +15,9 @@ import java.util.concurrent.atomic.AtomicInteger;
//import constants.Constants;
public class FE {
- private static final String sequencerIP = "localhost";
+ private static String sequencerIP = "localhost";
private static ReliableUnicastSender<Request> sequencerSock;
- private static final String RM_Multicast_group_address = "230.1.1.10";
+ private static final String RM_Multicast_group_address = Config.group.toString();
private static final int FE_PORT = 1999;
private static final int RM_Multicast_Port = 1234;
public static String FE_Address = "http://localhost:8067/"+DERMSInterface.class.getSimpleName();
@@ -28,6 +28,7 @@ public class FE {
public static void main(String[] args) {
try {
+ sequencerIP = args[0];
System.out.println("Connecting to sequencer ("
+ sequencerIP + ":" + Config.sequencerInPort + ")...");
sequencerSock = new ReliableUnicastSender<Request>(
@@ -54,7 +55,9 @@ public class FE {
@Override
public int sendRequestToSequencer(Request myRequest) {
- return sendUnicastToSequencer(myRequest);
+ int r = sendUnicastToSequencer(myRequest);
+ System.out.println("request: " + myRequest + " returned " + r);
+ return r;
}
@Override
diff --git a/src/main/java/derms/replica2/City.java b/src/main/java/derms/replica2/City.java
index 74535ae..a5a2f4c 100644
--- a/src/main/java/derms/replica2/City.java
+++ b/src/main/java/derms/replica2/City.java
@@ -2,7 +2,7 @@ package derms.replica2;
import java.io.Serializable;
-class City implements Serializable {
+public class City implements Serializable {
static final int codeLen = 3;
private String code;
@@ -13,7 +13,7 @@ class City implements Serializable {
this.code = code;
}
- City() {
+ public City() {
this("XXX");
}
diff --git a/src/main/java/derms/replica2/Replica2.java b/src/main/java/derms/replica2/Replica2.java
index 25609cc..666ee1b 100644
--- a/src/main/java/derms/replica2/Replica2.java
+++ b/src/main/java/derms/replica2/Replica2.java
@@ -1,6 +1,7 @@
package derms.replica2;
import derms.Replica;
+import derms.ReplicaManager;
import derms.Request;
import derms.Response;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
@@ -23,14 +24,15 @@ public class Replica2 implements Replica {
private final ResponderServer responderServer;
private final CoordinatorServer coordinatorServer;
private boolean alive;
+ private final ReplicaManager replicaManager;
- public Replica2(City city) throws IOException {
+ public Replica2(City city, ReplicaManager replicaManager) throws IOException {
this.city = city;
this.localAddr = InetAddress.getLocalHost();
this.resources = new Resources();
this.servers = new Servers();
this.log = DermsLogger.getLogger(getClass());
-
+ this.replicaManager = replicaManager;
try {
this.responderServer = new ResponderServer(city, resources, servers);
} catch (IOException e) {
diff --git a/src/main/java/derms/replica2/Resource.java b/src/main/java/derms/replica2/Resource.java
index 31d40bc..404cfb0 100644
--- a/src/main/java/derms/replica2/Resource.java
+++ b/src/main/java/derms/replica2/Resource.java
@@ -20,7 +20,7 @@ class Resource implements Serializable {
}
Resource(ResourceID id, ResourceType type, int duration) {
- this(id, type, duration, false, new CoordinatorID(), -1);
+ this(id, type, duration, false, new CoordinatorID("XXX", (short) 1), -1);
}
Resource() {
diff --git a/src/main/java/derms/replica2/ReturnResource.java b/src/main/java/derms/replica2/ReturnResource.java
index 6c42b8a..6adac50 100644
--- a/src/main/java/derms/replica2/ReturnResource.java
+++ b/src/main/java/derms/replica2/ReturnResource.java
@@ -169,7 +169,7 @@ class ReturnResource {
request.resourceID+" is not borrowed by "+request.coordinatorID);
}
resource.isBorrowed = false;
- resource.borrower = new CoordinatorID();
+ resource.borrower = request.coordinatorID;
resource.borrowDuration = -1;
return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id);
}
diff --git a/src/main/java/derms/replica2/SwapResource.java b/src/main/java/derms/replica2/SwapResource.java
index cc65f29..fbd2cf9 100644
--- a/src/main/java/derms/replica2/SwapResource.java
+++ b/src/main/java/derms/replica2/SwapResource.java
@@ -219,7 +219,7 @@ class SwapResource {
private void returnOldResource(Resource r) {
r.isBorrowed = false;
- r.borrower = new CoordinatorID();
+ r.borrower = request.cid;
r.borrowDuration = -1;
}
}