summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/ReplicaManager.java
blob: 9725bcb6dbe2600281c62ede02625fbe23389cb5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package derms;
import derms.frontend.FEInterface;

import derms.Replica1;
import derms.Replica2;
import derms.Replica3;
import derms.Replica4;

import derms.Request;
import derms.Response;
import derms.net.MessagePayload;
import derms.net.tomulticast.TotalOrderMulticastReceiver;
import derms.net.runicast.ReliableUnicastSender;
import derms.net.tomulticast.TotalOrderMulticastSender;


import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.logging.Logger;

public class ReplicaManager {
    private final int replicaId;
    private Replica replica;
    private Response response;
    private final FEInterface frontEnd;
    private final Logger log;
    private ReliableUnicastSender <Response>unicastSender;

    private TotalOrderMulticastReceiver multicastReceiver;

    public ReplicaManager(int replicaId, FEInterface frontEnd) throws IOException {
        this.replicaId = replicaId;
        this.frontEnd = frontEnd;
        this.log = Logger.getLogger(getClass().getName());
        initReplica();
        initMulticastReceiver();
        startHeartbeatThread();
    }

    private void initReplica() throws IOException {
        InetSocketAddress frontEndAddress = new InetSocketAddress("localhost", 1999);
        switch (replicaId)
        case 1:
        replica = new Replica1(frontEndAddress);
        break;
        case 2:
        replica = new Replica2( frontEndAddress);
        break;
        case 3:
        replica = new Replica3( frontEndAddress);
        break;
        case 4:
        replica = new Replica4(frontEndAddress);
        break;
        replica.startProcess();
    }



    private void initMulticastReceiver() throws IOException {
        InetSocketAddress group = new InetSocketAddress("230.0.0.0", 4446); // Example multicast group and port for receiving requests
        InetAddress localAddress = InetAddress.getLocalHost(); // Your local address
        multicastReceiver = new TotalOrderMulticastReceiver<Request>(group, localAddress);


        new Thread(() -> {
            while (true) {
                try {
                    MessagePayload receivedPayload = multicastReceiver.receive();
                    Request request = (Request) receivedPayload;
                    replica.processRequest(request);
                } catch (InterruptedException e) {
                    log.severe("Failed to receive request: " + e.getMessage());
                    Thread.currentThread().interrupt();
                } catch (Exception e) {
                    log.severe("Error processing request: " + e.getMessage());
                }
            }
        }).start();
    }


    private void startHeartbeatThread() {
        new Thread(() -> {
            while (true) {
                if (!replica.isAlive()) {
                    frontEnd.informRmIsDown(replica.getId());
                    replica.restart();
                }
                try {
                    Thread.sleep(5000); // Example 5 seconds.
                } catch (InterruptedException e) {
                    log.severe("Heartbeat thread interrupted: " + e.getMessage());
                }
            }
        }).start();
    }

    public void sendResponseToFE(Response response) {
        try {
            unicastSender.send(response);
        } catch (IOException e) {
            log.severe("Failed to send response to FE: " + e.getMessage());
        }
    }
    public void handleByzantineFailure() {
        log.severe("Byzantine failure detected in Replica " + replica.getId());
        replica.restart();
        frontEnd.informRmHasBug(replica.getId());
    }



}