diff options
Diffstat (limited to 'src/main/java/derms/ReplicaManager.java')
| -rw-r--r-- | src/main/java/derms/ReplicaManager.java | 89 |
1 files changed, 59 insertions, 30 deletions
diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 2bab635..0a727c1 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -2,9 +2,6 @@ package derms; import derms.frontend.FEInterface; import derms.Replica1; -import derms.Replica2; -import derms.Replica3; -import derms.Replica4; import derms.Request; import derms.Response; @@ -12,63 +9,63 @@ import derms.net.MessagePayload; import derms.net.tomulticast.TotalOrderMulticastReceiver; import derms.net.runicast.ReliableUnicastSender; import derms.net.tomulticast.TotalOrderMulticastSender; - +import derms.replica2.Replica2; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; +import java.io.ObjectOutputStream; +import java.io.ObjectInputStream; import java.util.logging.Logger; public class ReplicaManager { private final int replicaId; private Replica replica; private Response response; - private final FEInterface frontEnd; private final Logger log; - private ReliableUnicastSender <Response>unicastSender; - + private ReliableUnicastSender<Response> unicastSender; private TotalOrderMulticastReceiver multicastReceiver; + private final InetSocketAddress frontEndAddress; - public ReplicaManager(int replicaId, FEInterface frontEnd) throws IOException { + public ReplicaManager(int replicaId) throws IOException { this.replicaId = replicaId; - this.frontEnd = frontEnd; this.log = Logger.getLogger(getClass().getName()); + this.frontEndAddress = new InetSocketAddress("localhost", 1999); initReplica(); initMulticastReceiver(); startHeartbeatThread(); } private void initReplica() throws IOException { - InetSocketAddress frontEndAddress = new InetSocketAddress("localhost", 1999); - switch (replicaId) - case 1: - replica = new Replica1(frontEndAddress); - break; - case 2: - replica = new Replica2(); - break; - case 3: - replica = new Replica3( frontEndAddress); - break; - case 4: - replica = new Replica4(frontEndAddress); - break; + switch (replicaId) { + case 1: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + case 2: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + case 3: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + case 4: + replica = new derms.replica2.Replica2(new derms.replica2.City(), this); + break; + } replica.startProcess(); } - - private void initMulticastReceiver() throws IOException { - InetSocketAddress group = new InetSocketAddress("230.0.0.0", 4446); // Example multicast group and port for receiving requests + InetSocketAddress group = Config.group; InetAddress localAddress = InetAddress.getLocalHost(); // Your local address multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress); - new Thread(() -> { while (true) { try { MessagePayload receivedPayload = multicastReceiver.receive(); Request request = (Request) receivedPayload; + log.info("Received request: " + request); replica.processRequest(request); } catch (InterruptedException e) { log.severe("Failed to receive request: " + e.getMessage()); @@ -80,12 +77,11 @@ public class ReplicaManager { }).start(); } - private void startHeartbeatThread() { new Thread(() -> { while (true) { if (!replica.isAlive()) { - frontEnd.informRmIsDown(replica.getId()); + informFrontEndRmIsDown(replica.getId()); replica.restart(); } try { @@ -104,12 +100,45 @@ public class ReplicaManager { log.severe("Failed to send response to FE: " + e.getMessage()); } } + public void handleByzantineFailure() { log.severe("Byzantine failure detected in Replica " + replica.getId()); replica.restart(); - frontEnd.informRmHasBug(replica.getId()); + informFrontEndRmHasBug(replica.getId()); } + private void informFrontEndRmIsDown(int replicaId) { + try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort()); + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) { + out.writeObject("RM_DOWN:" + replicaId); + } catch (IOException e) { + log.severe("Failed to inform FE that RM is down: " + e.getMessage()); + } + } + private void informFrontEndRmHasBug(int replicaId) { + try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort()); + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) { + out.writeObject("RM_BUG:" + replicaId); + } catch (IOException e) { + log.severe("Failed to inform FE that RM has a bug: " + e.getMessage()); + } + } + + public static void main(String[] args) { + if (args.length != 1) { + System.err.println("Usage: java ReplicaManager <replicaId>"); + System.exit(1); + } + int replicaId = Integer.parseInt(args[0]); + + try { + ReplicaManager replicaManager = new ReplicaManager(replicaId); + System.out.println("ReplicaManager " + replicaId + " is running."); + } catch (IOException e) { + System.err.println("Failed to start ReplicaManager: " + e.getMessage()); + e.printStackTrace(); + } + } }
\ No newline at end of file |