blob: 9e3a6cc3971757a79b67a8c4ac9dad2012f66cbf (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
package derms;
import derms.frontend.FEInterface;
import derms.Replica1;
import derms.Request;
import derms.Response;
import derms.net.MessagePayload;
import derms.net.tomulticast.TotalOrderMulticastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;
import derms.replica2.Replica2;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import java.util.logging.Logger;
public class ReplicaManager {
private final int replicaId;
private Replica replica;
private Response response;
private final Logger log;
private ReliableUnicastSender<Response> unicastSender = new ReliableUnicastSender<>(new InetSocketAddress("localhost", 1999));
private TotalOrderMulticastReceiver multicastReceiver;
private final InetSocketAddress frontEndAddress;
public ReplicaManager(int replicaId) throws IOException {
this.replicaId = replicaId;
this.log = Logger.getLogger(getClass().getName());
this.frontEndAddress = new InetSocketAddress("localhost", 1999);
initReplica();
initMulticastReceiver();
startHeartbeatThread();
}
private void initReplica() throws IOException {
switch (replicaId) {
case 1:
replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
break;
case 2:
replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
break;
case 3:
replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
break;
case 4:
replica = new derms.replica2.Replica2(new derms.replica2.City(), this);
break;
}
replica.startProcess();
}
private void initMulticastReceiver() throws IOException {
InetSocketAddress group = Config.group;
InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
NetworkInterface netInterface = NetworkInterface.getByInetAddress(localAddress);
multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress, netInterface);
new Thread(() -> {
while (true) {
try {
MessagePayload receivedPayload = multicastReceiver.receive();
Request request = (Request) receivedPayload;
log.info("Received request: " + request);
replica.processRequest(request);
} catch (InterruptedException e) {
log.severe("Failed to receive request: " + e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
log.severe("Error processing request: " + e.getMessage());
}
}
}).start();
}
private void startHeartbeatThread() {
new Thread(() -> {
while (true) {
if (!replica.isAlive()) {
informFrontEndRmIsDown(replica.getId());
replica.restart();
}
try {
Thread.sleep(5000); // Example 5 seconds.
} catch (InterruptedException e) {
log.severe("Heartbeat thread interrupted: " + e.getMessage());
}
}
}).start();
}
public void sendResponseToFE(Response response) {
try {
unicastSender.send(response);
} catch (IOException e) {
log.severe("Failed to send response to FE: " + e.getMessage());
}
}
public void handleByzantineFailure() {
log.severe("Byzantine failure detected in Replica " + replica.getId());
replica.restart();
informFrontEndRmHasBug(replica.getId());
}
private void informFrontEndRmIsDown(int replicaId) {
try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
out.writeObject("RM_DOWN:" + replicaId);
} catch (IOException e) {
log.severe("Failed to inform FE that RM is down: " + e.getMessage());
}
}
private void informFrontEndRmHasBug(int replicaId) {
try (Socket socket = new Socket(frontEndAddress.getAddress(), frontEndAddress.getPort());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())) {
out.writeObject("RM_BUG:" + replicaId);
} catch (IOException e) {
log.severe("Failed to inform FE that RM has a bug: " + e.getMessage());
}
}
public static void main(String[] args) {
if (args.length != 1) {
System.err.println("Usage: java ReplicaManager <replicaId>");
System.exit(1);
}
int replicaId = Integer.parseInt(args[0]);
try {
ReplicaManager replicaManager = new ReplicaManager(replicaId);
System.out.println("ReplicaManager " + replicaId + " is running.");
} catch (IOException e) {
System.err.println("Failed to start ReplicaManager: " + e.getMessage());
e.printStackTrace();
}
}
}
|