summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/ReplicaManager.java
blob: 252953c8bddb61e2a11ba47f9830c662d65c8fa9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package derms;
import derms.frontend.FEInterface;


import derms.Request;
import derms.Response;
import derms.net.MessagePayload;
import derms.net.tomulticast.TotalOrderMulticastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;
import derms.replica1.Replica1;
import derms.replica2.Replica2;
import derms.util.*;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.logging.Logger;

public class ReplicaManager {
    public static final String usage = "Usage: java ReplicaManager <replicaId> <city> <frontEndIP> <byzantine(0 or 1)> <crash(0 or 1)>";
    private final int replicaId;
    private final String city;
    private Replica replica;
    private Response response;
    private final Logger log;
    private InetSocketAddress frontEndAddress;
    private ReliableUnicastSender<Response> unicastSender;
    private TotalOrderMulticastReceiver multicastReceiver;

    public ReplicaManager(int replicaId, String city, InetAddress frontEndIP, int byzantine, int crash) throws IOException {
        this.replicaId = replicaId;
        this.city = city;
        this.log = Logger.getLogger(getClass().getName());
        initUnicastSender(frontEndIP);
        initReplica(byzantine, crash);
        initMulticastReceiver();
        startHeartbeatThread();
    }

    private void initUnicastSender(InetAddress frontEndIP) throws IOException {
        int frontEndPort = Config.frontendResponsePorts[replicaId - 1];
        frontEndAddress = new InetSocketAddress(frontEndIP, frontEndPort);
        unicastSender = new ReliableUnicastSender<>(frontEndAddress);
    }

    private void initReplica(int byzantine, int crash) throws IOException {
        switch (replicaId) {
            case 1:
                replica = new Replica1(this);
                break;
            case 2:
                replica = new derms.replica2.Replica2(city, this);
                break;
            case 3:
                replica = new derms.replica3.Replica3(city, this);
                break;
            case 4:
                replica = new derms.replica2.Replica2(city, this);
                break;
        }

        // [TEST] Logging
        if (byzantine == 0) {
            TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: FALSE}");
        } else {
            TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: TRUE}");
        }

        if (crash == 0) {
            TestLogger.log("REPLICA " + replicaId + ": {CRASH: FALSE}");
        } else {
            TestLogger.log("REPLICA " + replicaId + ": {CRASH: TRUE}");
        }

        replica.startProcess(byzantine, crash);
    }

    private void initMulticastReceiver() throws IOException {
        InetSocketAddress group = Config.group;
        InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
        NetworkInterface netInterface = NetworkInterface.getByInetAddress(localAddress);
        multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress, netInterface);

        new Thread(() -> {
            while (true) {
                try {
                    MessagePayload receivedPayload = multicastReceiver.receive();
                    Request request = (Request) receivedPayload;
                    log.info("Received request: " + request);
                    if (Objects.equals(request.getMessageType(), "1" + replicaId)) {
                        handleByzantineFailure();
                    } else {
                        replica.processRequest(request);
                    }
                } catch (InterruptedException e) {
                    log.severe("Failed to receive request: " + e.getMessage());
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    log.severe("Error processing request: " + e.getMessage());
                }
            }
        }).start();
    }

    private void startHeartbeatThread() {
        new Thread(() -> {
            while (true) {
                if (!replica.isAlive()) {
                    // [TEST] Logging
                    TestLogger.log("REPLICA " + replicaId + ": {CRASH: DETECTED}");

                    informFrontEndRmIsDown(replica.getId());
                    replica.restart();
                    //TestLogger.log("REPLICA " + replicaId + ": {RESTARTED}");
                }
                try {
                    Thread.sleep(5000); // Example 5 seconds.
                } catch (InterruptedException e) {
                    log.severe("Heartbeat thread interrupted: " + e.getMessage());
                }
            }
        }).start();
    }

    public void sendResponseToFE(Response response) {
        try {
            unicastSender.send(response);
        } catch (IOException e) {
            log.severe("Failed to send response to FE: " + e.getMessage());
        }
    }

    public void handleByzantineFailure() {
        log.severe("Byzantine failure detected in Replica " + replica.getId());

        // [TEST] Logging
        TestLogger.log("REPLICA " + replicaId + ": {BYZANTINE: DETECTED}");

        replica.restart();
        informFrontEndRmHasBug(replica.getId());
        //TestLogger.log("REPLICA " + replicaId + ": {RESTARTED}");
    }

    private void informFrontEndRmIsDown(int replicaId) {
        try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
             ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
            out.writeObject("RM_DOWN:" + replicaId);
        } catch (IOException e) {
            log.severe("Failed to inform FE that RM is down: " + e.getMessage());
            TestLogger.log("[FAILED TO INFORM FE (RM IS DOWN)]");
        }
    }

    private void informFrontEndRmHasBug(int replicaId) {
        try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
             ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
            out.writeObject("RM_BUG:" + replicaId);
        } catch (IOException e) {
            log.severe("Failed to inform FE that RM has a bug: " + e.getMessage());
            TestLogger.log("[FAILED TO INFORM FE (RM HAS A BUG)]");
        }
    }

    public static void main(String[] args) {
        if (args.length < 3) {
            System.err.println(usage);
            System.exit(1);
        }

        try {
            int replicaId = Integer.parseInt(args[0]);
            String city = args[1];
            InetAddress frontEndIP = InetAddress.getByName(args[2]);
            int byzantine = Integer.parseInt(args[3]);
            int crash = Integer.parseInt(args[4]);
            ReplicaManager replicaManager = new ReplicaManager(replicaId, city, frontEndIP, byzantine, crash);
            System.out.println("ReplicaManager " + replicaId + " is running.");
        } catch (IOException e) {
            System.err.println("Failed to start ReplicaManager: " + e.getMessage());
            TestLogger.log("[FAILED TO START RM]");
            e.printStackTrace();
        }
    }

    public int getReplicaId() { return replicaId; }
}