blob: 2bab635314adf3d7334f45ebf6b993f81ffed27a (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
package derms;
import derms.frontend.FEInterface;
import derms.Replica1;
import derms.Replica2;
import derms.Replica3;
import derms.Replica4;
import derms.Request;
import derms.Response;
import derms.net.MessagePayload;
import derms.net.tomulticast.TotalOrderMulticastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.logging.Logger;
public class ReplicaManager {
private final int replicaId;
private Replica replica;
private Response response;
private final FEInterface frontEnd;
private final Logger log;
private ReliableUnicastSender <Response>unicastSender;
private TotalOrderMulticastReceiver multicastReceiver;
public ReplicaManager(int replicaId, FEInterface frontEnd) throws IOException {
this.replicaId = replicaId;
this.frontEnd = frontEnd;
this.log = Logger.getLogger(getClass().getName());
initReplica();
initMulticastReceiver();
startHeartbeatThread();
}
private void initReplica() throws IOException {
InetSocketAddress frontEndAddress = new InetSocketAddress("localhost", 1999);
switch (replicaId)
case 1:
replica = new Replica1(frontEndAddress);
break;
case 2:
replica = new Replica2();
break;
case 3:
replica = new Replica3( frontEndAddress);
break;
case 4:
replica = new Replica4(frontEndAddress);
break;
replica.startProcess();
}
private void initMulticastReceiver() throws IOException {
InetSocketAddress group = new InetSocketAddress("230.0.0.0", 4446); // Example multicast group and port for receiving requests
InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress);
new Thread(() -> {
while (true) {
try {
MessagePayload receivedPayload = multicastReceiver.receive();
Request request = (Request) receivedPayload;
replica.processRequest(request);
} catch (InterruptedException e) {
log.severe("Failed to receive request: " + e.getMessage());
Thread.currentThread().interrupt();
} catch (Exception e) {
log.severe("Error processing request: " + e.getMessage());
}
}
}).start();
}
private void startHeartbeatThread() {
new Thread(() -> {
while (true) {
if (!replica.isAlive()) {
frontEnd.informRmIsDown(replica.getId());
replica.restart();
}
try {
Thread.sleep(5000); // Example 5 seconds.
} catch (InterruptedException e) {
log.severe("Heartbeat thread interrupted: " + e.getMessage());
}
}
}).start();
}
public void sendResponseToFE(Response response) {
try {
unicastSender.send(response);
} catch (IOException e) {
log.severe("Failed to send response to FE: " + e.getMessage());
}
}
public void handleByzantineFailure() {
log.severe("Byzantine failure detected in Replica " + replica.getId());
replica.restart();
frontEnd.informRmHasBug(replica.getId());
}
}
|