1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
package derms;
import derms.frontend.FEInterface;
import derms.Request;
import derms.Response;
import derms.net.MessagePayload;
import derms.net.tomulticast.TotalOrderMulticastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;
import derms.replica1.Replica1;
import derms.replica2.Replica2;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.util.logging.Logger;
public class ReplicaManager {
public static final String usage = "Usage: java ReplicaManager <replicaId> <city> <frontEndIP>";
private final int replicaId;
private final String city;
private Replica replica;
private Response response;
private final Logger log;
private InetSocketAddress frontEndAddress;
private ReliableUnicastSender<Response> unicastSender;
private TotalOrderMulticastReceiver multicastReceiver;
public ReplicaManager(int replicaId, String city, InetAddress frontEndIP) throws IOException {
this.replicaId = replicaId;
this.city = city;
this.log = Logger.getLogger(getClass().getName());
initUnicastSender(frontEndIP);
initReplica();
initMulticastReceiver();
startHeartbeatThread();
}
private void initUnicastSender(InetAddress frontEndIP) throws IOException {
int frontEndPort = Config.frontendResponsePorts[replicaId - 1];
frontEndAddress = new InetSocketAddress(frontEndIP, frontEndPort);
unicastSender = new ReliableUnicastSender<>(frontEndAddress);
}
private void initReplica() throws IOException {
switch (replicaId) {
case 1:
replica = new Replica1(this);
break;
case 2:
replica = new derms.replica2.Replica2(city, this);
break;
case 3:
replica = new derms.replica3.Replica3(city, this);
break;
case 4:
replica = new derms.replica2.Replica2(city, this);
break;
}
replica.startProcess();
}
private void initMulticastReceiver() throws IOException {
InetSocketAddress group = Config.group;
InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
NetworkInterface netInterface = NetworkInterface.getByInetAddress(localAddress);
multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress, netInterface);
new Thread(() -> {
while (true) {
try {
MessagePayload receivedPayload = multicastReceiver.receive();
Request request = (Request) receivedPayload;
log.info("Received request: " + request);
replica.processRequest(request);
} catch (InterruptedException e) {
log.severe("Failed to receive request: " + e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
log.severe("Error processing request: " + e.getMessage());
}
}
}).start();
}
private void startHeartbeatThread() {
new Thread(() -> {
while (true) {
if (!replica.isAlive()) {
informFrontEndRmIsDown(replica.getId());
replica.restart();
}
try {
Thread.sleep(5000); // Example 5 seconds.
} catch (InterruptedException e) {
log.severe("Heartbeat thread interrupted: " + e.getMessage());
}
}
}).start();
}
public void sendResponseToFE(Response response) {
try {
unicastSender.send(response);
} catch (IOException e) {
log.severe("Failed to send response to FE: " + e.getMessage());
}
}
public void handleByzantineFailure() {
log.severe("Byzantine failure detected in Replica " + replica.getId());
replica.restart();
informFrontEndRmHasBug(replica.getId());
}
private void informFrontEndRmIsDown(int replicaId) {
try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
out.writeObject("RM_DOWN:" + replicaId);
} catch (IOException e) {
log.severe("Failed to inform FE that RM is down: " + e.getMessage());
}
}
private void informFrontEndRmHasBug(int replicaId) {
try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
out.writeObject("RM_BUG:" + replicaId);
} catch (IOException e) {
log.severe("Failed to inform FE that RM has a bug: " + e.getMessage());
}
}
public static void main(String[] args) {
if (args.length < 3) {
System.err.println(usage);
System.exit(1);
}
try {
int replicaId = Integer.parseInt(args[0]);
String city = args[1];
InetAddress frontEndIP = InetAddress.getByName(args[2]);
ReplicaManager replicaManager = new ReplicaManager(replicaId, city, frontEndIP);
System.out.println("ReplicaManager " + replicaId + " is running.");
} catch (IOException e) {
System.err.println("Failed to start ReplicaManager: " + e.getMessage());
e.printStackTrace();
}
}
public int getReplicaId() { return replicaId; }
}
|