summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/ReplicaManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/ReplicaManager.java')
-rw-r--r--src/main/java/derms/ReplicaManager.java89
1 files changed, 59 insertions, 30 deletions
diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java
index 2bab635..0a727c1 100644
--- a/src/main/java/derms/ReplicaManager.java
+++ b/src/main/java/derms/ReplicaManager.java
@@ -2,9 +2,6 @@ package derms;
import derms.frontend.FEInterface;
import derms.Replica1;
-import derms.Replica2;
-import derms.Replica3;
-import derms.Replica4;
import derms.Request;
import derms.Response;
@@ -12,63 +9,63 @@ import derms.net.MessagePayload;
import derms.net.tomulticast.TotalOrderMulticastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;
-
+import derms.replica2.Replica2;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
import java.util.logging.Logger;
public class ReplicaManager {
private final int replicaId;
private Replica replica;
private Response response;
- private final FEInterface frontEnd;
private final Logger log;
- private ReliableUnicastSender <Response>unicastSender;
-
+ private ReliableUnicastSender<Response> unicastSender;
private TotalOrderMulticastReceiver multicastReceiver;
+ private final InetSocketAddress frontEndAddress;
- public ReplicaManager(int replicaId, FEInterface frontEnd) throws IOException {
+ public ReplicaManager(int replicaId) throws IOException {
this.replicaId = replicaId;
- this.frontEnd = frontEnd;
this.log = Logger.getLogger(getClass().getName());
+ this.frontEndAddress = new InetSocketAddress("localhost", 1999);
initReplica();
initMulticastReceiver();
startHeartbeatThread();
}
private void initReplica() throws IOException {
- InetSocketAddress frontEndAddress = new InetSocketAddress("localhost", 1999);
- switch (replicaId)
- case 1:
- replica = new Replica1(frontEndAddress);
- break;
- case 2:
- replica = new Replica2();
- break;
- case 3:
- replica = new Replica3( frontEndAddress);
- break;
- case 4:
- replica = new Replica4(frontEndAddress);
- break;
+ switch (replicaId) {
+ case 1:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ case 2:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ case 3:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ case 4:
+ replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
+ break;
+ }
replica.startProcess();
}
-
-
private void initMulticastReceiver() throws IOException {
- InetSocketAddress group = new InetSocketAddress("230.0.0.0", 4446); // Example multicast group and port for receiving requests
+ InetSocketAddress group = Config.group;
InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress);
-
new Thread(() -> {
while (true) {
try {
MessagePayload receivedPayload = multicastReceiver.receive();
Request request = (Request) receivedPayload;
+ log.info("Received request: " + request);
replica.processRequest(request);
} catch (InterruptedException e) {
log.severe("Failed to receive request: " + e.getMessage());
@@ -80,12 +77,11 @@ public class ReplicaManager {
}).start();
}
-
private void startHeartbeatThread() {
new Thread(() -> {
while (true) {
if (!replica.isAlive()) {
- frontEnd.informRmIsDown(replica.getId());
+ informFrontEndRmIsDown(replica.getId());
replica.restart();
}
try {
@@ -104,12 +100,45 @@ public class ReplicaManager {
log.severe("Failed to send response to FE: " + e.getMessage());
}
}
+
public void handleByzantineFailure() {
log.severe("Byzantine failure detected in Replica " + replica.getId());
replica.restart();
- frontEnd.informRmHasBug(replica.getId());
+ informFrontEndRmHasBug(replica.getId());
}
+ private void informFrontEndRmIsDown(int replicaId) {
+ try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
+ ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
+ out.writeObject("RM_DOWN:" + replicaId);
+ } catch (IOException e) {
+ log.severe("Failed to inform FE that RM is down: " + e.getMessage());
+ }
+ }
+ private void informFrontEndRmHasBug(int replicaId) {
+ try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
+ ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
+ out.writeObject("RM_BUG:" + replicaId);
+ } catch (IOException e) {
+ log.severe("Failed to inform FE that RM has a bug: " + e.getMessage());
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ System.err.println("Usage: java ReplicaManager <replicaId>");
+ System.exit(1);
+ }
+ int replicaId = Integer.parseInt(args[0]);
+
+ try {
+ ReplicaManager replicaManager = new ReplicaManager(replicaId);
+ System.out.println("ReplicaManager " + replicaId + " is running.");
+ } catch (IOException e) {
+ System.err.println("Failed to start ReplicaManager: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
} \ No newline at end of file