summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/replica2
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-28 17:32:28 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-28 17:32:28 -0500
commitd267dd1dda606f0c56d8afaa7187485e60ebfd86 (patch)
treee1bca5933aa7e5e9793773057fd5616ff65a9eb8 /src/main/java/derms/replica2
parent6654546671eea9f9becd32b3160a134802659cbc (diff)
downloadsoen423-d267dd1dda606f0c56d8afaa7187485e60ebfd86.zip
move replica2 to top level
Diffstat (limited to 'src/main/java/derms/replica2')
-rw-r--r--src/main/java/derms/replica2/AlreadyBorrowedException.java7
-rw-r--r--src/main/java/derms/replica2/AnnounceListener.java81
-rw-r--r--src/main/java/derms/replica2/Announcer.java69
-rw-r--r--src/main/java/derms/replica2/City.java38
-rw-r--r--src/main/java/derms/replica2/CoordinatorID.java39
-rw-r--r--src/main/java/derms/replica2/CoordinatorServer.java170
-rw-r--r--src/main/java/derms/replica2/DermsLogger.java23
-rw-r--r--src/main/java/derms/replica2/FindResource.java160
-rw-r--r--src/main/java/derms/replica2/Hosts.java26
-rw-r--r--src/main/java/derms/replica2/ID.java5
-rw-r--r--src/main/java/derms/replica2/InvalidDurationException.java7
-rw-r--r--src/main/java/derms/replica2/NoSuchResourceException.java7
-rw-r--r--src/main/java/derms/replica2/NotBorrowedException.java7
-rw-r--r--src/main/java/derms/replica2/ObjectPacket.java33
-rw-r--r--src/main/java/derms/replica2/Replica2.java237
-rw-r--r--src/main/java/derms/replica2/RequestResource.java223
-rw-r--r--src/main/java/derms/replica2/Resource.java39
-rw-r--r--src/main/java/derms/replica2/ResourceAvailability.java128
-rw-r--r--src/main/java/derms/replica2/ResourceID.java49
-rw-r--r--src/main/java/derms/replica2/ResourceTransfer.java50
-rw-r--r--src/main/java/derms/replica2/ResourceType.java18
-rw-r--r--src/main/java/derms/replica2/Resources.java74
-rw-r--r--src/main/java/derms/replica2/ResponderID.java16
-rw-r--r--src/main/java/derms/replica2/ResponderServer.java94
-rw-r--r--src/main/java/derms/replica2/ReturnResource.java195
-rw-r--r--src/main/java/derms/replica2/ServerCommunicationError.java7
-rw-r--r--src/main/java/derms/replica2/Servers.java34
-rw-r--r--src/main/java/derms/replica2/SwapResource.java262
28 files changed, 2098 insertions, 0 deletions
diff --git a/src/main/java/derms/replica2/AlreadyBorrowedException.java b/src/main/java/derms/replica2/AlreadyBorrowedException.java
new file mode 100644
index 0000000..e1146e7
--- /dev/null
+++ b/src/main/java/derms/replica2/AlreadyBorrowedException.java
@@ -0,0 +1,7 @@
+package derms.replica2;
+
+class AlreadyBorrowedException extends Exception {
+ AlreadyBorrowedException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica2/AnnounceListener.java b/src/main/java/derms/replica2/AnnounceListener.java
new file mode 100644
index 0000000..dd21b8d
--- /dev/null
+++ b/src/main/java/derms/replica2/AnnounceListener.java
@@ -0,0 +1,81 @@
+package derms.replica2;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.net.*;
+import java.util.logging.Logger;
+
+class AnnounceListener implements Runnable {
+ private static final int bufsize = 1024;
+
+ private InetSocketAddress groupAddr;
+ private InetAddress localAddr;
+ private Servers servers;
+ private Logger log;
+
+ AnnounceListener(InetSocketAddress groupAddr, InetAddress localAddr, Servers servers) throws IOException {
+ this.groupAddr = groupAddr;
+ this.localAddr = localAddr;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ NetworkInterface netInterface = null;
+ try {
+ netInterface = NetworkInterface.getByInetAddress(localAddr);
+ if (netInterface == null) {
+ throw new Exception("netInterface is null");
+ }
+ } catch (Exception e) {
+ log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage());
+ return;
+ }
+
+ MulticastSocket sock = null;
+ try {
+ sock = new MulticastSocket(groupAddr.getPort());
+ sock.joinGroup(groupAddr, netInterface);
+ } catch (Exception e) {
+ log.severe("Failed to open multicast socket: "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening to "+groupAddr.toString()+" from "+localAddr.toString()+" ("+netInterface.getName()+")");
+ byte[] buf = new byte[bufsize];
+ DatagramPacket pkt = new DatagramPacket(buf, buf.length);
+
+ for (;;) {
+ try {
+ sock.receive(pkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from multicast socket: "+e.getMessage());
+ continue;
+ }
+
+ ObjectInputStream objStream;
+ try {
+ objStream = new ObjectInputStream(
+ new ByteArrayInputStream(pkt.getData()));
+ } catch (IOException e) {
+ log.warning("Failed to create input stream: "+e.getMessage());
+ continue;
+ }
+
+ City city;
+ try {
+ city = (City) objStream.readObject();
+ } catch (Exception e) {
+ log.warning("Failed to deserialize data: "+e.getMessage());
+ continue;
+ }
+
+ InetAddress remote = pkt.getAddress();
+ if (servers.put(city, remote) == null) {
+ log.info("Added remote server "+city.toString()+" "+remote.toString());
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica2/Announcer.java b/src/main/java/derms/replica2/Announcer.java
new file mode 100644
index 0000000..508349e
--- /dev/null
+++ b/src/main/java/derms/replica2/Announcer.java
@@ -0,0 +1,69 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.logging.Logger;
+
+class Announcer implements Runnable {
+ static final long intervalMillis = 3000;
+
+ private SocketAddress group;
+ private InetAddress localAddr;
+ private City city;
+ private Logger log;
+
+ Announcer(SocketAddress group, InetAddress localAddr, City city) throws IOException {
+ this.group = group;
+ this.localAddr = localAddr;
+ this.city = city;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ NetworkInterface netInterface = null;
+ try {
+ netInterface = NetworkInterface.getByInetAddress(localAddr);
+ if (netInterface == null) {
+ throw new Exception("netInterface is null");
+ }
+ } catch (Exception e) {
+ log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage());
+ return;
+ }
+
+ MulticastSocket sock = null;
+ try {
+ sock = new MulticastSocket();
+ sock.joinGroup(group, netInterface);
+ } catch (Exception e) {
+ log.severe("Failed to open multicast socket: "+e.getMessage());
+ return;
+ }
+
+ log.info("Announcing from "+localAddr.toString()+" ("+netInterface.getName()+") to "+group.toString());
+
+ DatagramPacket pkt = null;
+ try {
+ pkt = ObjectPacket.create(city, group);
+ } catch (IOException e) {
+ log.severe("Failed to create packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ for (;;) {
+ sock.send(pkt);
+ Thread.sleep(intervalMillis);
+ }
+ } catch (IOException e) {
+ log.severe("Failed to send to multicast socket "+group.toString()+": "+e.getMessage());
+ } catch (InterruptedException e) {
+ log.info("Interrupted.");
+ } finally {
+ log.info("Shutting down...");
+ sock.close();
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica2/City.java b/src/main/java/derms/replica2/City.java
new file mode 100644
index 0000000..74535ae
--- /dev/null
+++ b/src/main/java/derms/replica2/City.java
@@ -0,0 +1,38 @@
+package derms.replica2;
+
+import java.io.Serializable;
+
+class City implements Serializable {
+ static final int codeLen = 3;
+
+ private String code;
+
+ City(String code) throws IllegalArgumentException {
+ if (code.length() != codeLen)
+ throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters");
+ this.code = code;
+ }
+
+ City() {
+ this("XXX");
+ }
+
+ @Override
+ public String toString() {
+ return code;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || this.getClass() != obj.getClass()) {
+ return false;
+ }
+ City other = (City) obj;
+ return this.code.equals(other.code);
+ }
+
+ @Override
+ public int hashCode() {
+ return code.hashCode();
+ }
+}
diff --git a/src/main/java/derms/replica2/CoordinatorID.java b/src/main/java/derms/replica2/CoordinatorID.java
new file mode 100644
index 0000000..523ebcf
--- /dev/null
+++ b/src/main/java/derms/replica2/CoordinatorID.java
@@ -0,0 +1,39 @@
+package derms.replica2;
+
+import java.io.Serializable;
+
+class CoordinatorID implements Serializable {
+ String city;
+ short num;
+
+ CoordinatorID(String city, short num) {
+ this.city = city;
+ this.num = num;
+ }
+
+ static CoordinatorID parse(String str) throws IllegalArgumentException {
+ if (str.length() != City.codeLen+ID.nDigits)
+ throw new IllegalArgumentException("illegal coordinator ID: " + str);
+
+ try {
+ String city = str.substring(0, City.codeLen);
+ short num = Short.parseShort(str.substring(City.codeLen));
+ return new CoordinatorID(city, num);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("illegal coordinator ID '" + str + "': " + e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() != this.getClass())
+ return false;
+ CoordinatorID other = (CoordinatorID) obj;
+ return other.city.equals(this.city) && other.num == this.num;
+ }
+
+ @Override
+ public String toString() {
+ return city+"C"+num;
+ }
+}
diff --git a/src/main/java/derms/replica2/CoordinatorServer.java b/src/main/java/derms/replica2/CoordinatorServer.java
new file mode 100644
index 0000000..0683638
--- /dev/null
+++ b/src/main/java/derms/replica2/CoordinatorServer.java
@@ -0,0 +1,170 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+class CoordinatorServer {
+ static final Duration timeout = Duration.ofSeconds(5);
+
+ private City city;
+ private Resources resources;
+ private Servers servers;
+ private Logger log;
+
+ CoordinatorServer(City city, Resources resources, Servers servers) throws IOException {
+ super();
+ this.city = city;
+ this.resources = resources;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ CoordinatorServer() throws IOException {
+ this(new City(), new Resources(), new Servers());
+ }
+
+ void requestResource(CoordinatorID cid, ResourceID rid, int duration)
+ throws ServerCommunicationError, NoSuchResourceException,
+ AlreadyBorrowedException, InvalidDurationException
+ {
+ log.info("Request for "+rid+" from "+cid);
+
+ InetAddress server = servers.get(new City(rid.city));
+ if (server == null) {
+ throw new ServerCommunicationError("requestResource(): no connection to server "+rid.city.toString());
+ }
+
+ RequestResource.Client client = new RequestResource.Client(cid, rid, duration);
+ RequestResource.Response response;
+ try {
+ response = client.sendRequest(server);
+ } catch (IOException e) {
+ throw new ServerCommunicationError("requestResource(): "+e.getMessage());
+ }
+ switch (response.status) {
+ case SUCCESS:
+ log.info("Request "+rid+" from "+cid+" - success");
+ break;
+ case NO_SUCH_RESOURCE:
+ log.warning(response.message);
+ throw new NoSuchResourceException(response.message);
+ case ALREADY_BORROWED:
+ log.warning(response.message);
+ throw new AlreadyBorrowedException(response.message);
+ case INVALID_DURATION:
+ log.warning(response.message);
+ throw new InvalidDurationException(response.message);
+ default:
+ log.warning("Unsuccessful response from server: "+response.message);
+ throw new ServerCommunicationError("requestResource(): failed to borrow resource: "+response.message);
+ }
+ }
+
+ Resource[] findResource(CoordinatorID cid, ResourceType rname) throws ServerCommunicationError {
+ log.info("Find Resource "+rname+" from "+cid);
+ FindResource.Request request = new FindResource.Request(cid, rname);
+ Collection<Resource> response = ConcurrentHashMap.newKeySet();
+ ExecutorService pool = Executors.newFixedThreadPool(servers.size());
+ try {
+ for (InetAddress server : servers.all()) {
+ pool.execute(new FindResource.Client(request, server, response));
+ }
+ } catch (IOException e) {
+ String msg = "Failed to start FindResource Client: "+e.getMessage();
+ log.severe(msg);
+ throw new ServerCommunicationError("findResource(): "+msg);
+ }
+ log.fine("Started worker threads");
+ pool.shutdown();
+ boolean terminated;
+ try {
+ terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ String msg = "findResource() interrupted: "+e.getMessage();
+ log.warning(msg);
+ throw new ServerCommunicationError("findResource(): "+msg);
+ }
+ if (!terminated) {
+ String msg = "Request timed out: no response after "+timeout.toString();
+ log.warning(msg);
+ throw new ServerCommunicationError("findResource(): "+msg);
+ }
+ Resource[] arr = new Resource[0];
+ arr = response.toArray(arr);
+ log.info("Find resource "+rname+" from "+cid+" - success. Response length: "+arr.length);
+ return arr;
+ }
+
+ void returnResource(CoordinatorID cid, ResourceID rid)
+ throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException
+ {
+ log.info("Return resource "+rid+" from "+cid);
+ InetAddress server = servers.get(new City(rid.city));
+ if (server == null) {
+ String msg = "no connection to server "+rid.city;
+ log.warning(msg);
+ throw new ServerCommunicationError("returnResource(): "+msg);
+ }
+ log.fine("server address: "+server);
+
+ ReturnResource.Client client = new ReturnResource.Client(cid, rid);
+ ReturnResource.Response response;
+ try {
+ response = client.sendRequest(server);
+ } catch (IOException e) {
+ log.warning(e.getMessage());
+ throw new ServerCommunicationError("returnResource(): "+e.getMessage());
+ }
+ switch (response.status) {
+ case SUCCESS:
+ log.info(cid+" return "+rid+" - success");
+ break;
+ case NO_SUCH_RESOURCE:
+ log.warning(response.message);
+ throw new NoSuchResourceException(response.message);
+ case NOT_BORROWED:
+ log.warning(response.message);
+ throw new NotBorrowedException(response.message);
+ default:
+ String msg = "Failed to return resource: "+response.message;
+ log.warning(msg);
+ throw new ServerCommunicationError("returnResource(): "+msg);
+ }
+ }
+
+ void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException {
+ log.info(cid+": swap "+oldRID+", "+newRID);
+
+ InetAddress server = servers.get(new City(oldRID.city));
+ if (server == null) {
+ String msg = "no connection to server "+oldRID.city;
+ log.warning(msg);
+ throw new ServerCommunicationError("swapResource(): "+msg);
+ }
+ log.fine("server address: "+server);
+
+ SwapResource.Client client = new SwapResource.Client(cid, oldRID, newRID);
+ SwapResource.Response response;
+ try {
+ response = client.sendRequest(server);
+ } catch (IOException e) {
+ throw new ServerCommunicationError("swapResource(): "+e.getMessage());
+ }
+ switch (response.status) {
+ case SUCCESS:
+ log.info(cid+": swap "+oldRID+", "+newRID+" - success");
+ break;
+ case NO_SUCH_RESOURCE:
+ throw new NoSuchResourceException(response.message);
+ default:
+ throw new ServerCommunicationError("swapResource(): "+response.message);
+ }
+ }
+}
diff --git a/src/main/java/derms/replica2/DermsLogger.java b/src/main/java/derms/replica2/DermsLogger.java
new file mode 100644
index 0000000..3e031f7
--- /dev/null
+++ b/src/main/java/derms/replica2/DermsLogger.java
@@ -0,0 +1,23 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.util.logging.FileHandler;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
+import java.util.logging.SimpleFormatter;
+
+class DermsLogger {
+ static final String logFile = "server.log";
+
+ private static Logger log = null;
+
+ static Logger getLogger(Class clazz) throws IOException {
+ if (log == null) {
+ log = Logger.getLogger(clazz.getName());
+ Handler fileHandler = new FileHandler(logFile);
+ fileHandler.setFormatter(new SimpleFormatter());
+ log.addHandler(fileHandler);
+ }
+ return log;
+ }
+}
diff --git a/src/main/java/derms/replica2/FindResource.java b/src/main/java/derms/replica2/FindResource.java
new file mode 100644
index 0000000..8ba642f
--- /dev/null
+++ b/src/main/java/derms/replica2/FindResource.java
@@ -0,0 +1,160 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+class FindResource {
+ static final int port = 5558;
+
+ static class Client implements Runnable {
+ private InetAddress serverAddr;
+ private Request request;
+ private Collection<Resource> response;
+ private Logger log;
+
+ Client(Request request, InetAddress serverAddr, Collection<Resource> response) throws IOException {
+ this.serverAddr = serverAddr;
+ this.request = request;
+ this.response = response;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("Failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ log.severe("Failed to create request packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ sock.send(requestPkt);
+ } catch (Exception e) {
+ log.severe("Failed to send request: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ Resource[] resources;
+ try {
+ resources = ResourceTransfer.receive(sock);
+ } catch (IOException e) {
+ log.severe(e.getMessage());
+ return;
+ } finally {
+ sock.close();
+ }
+
+ for (Resource r : resources) {
+ response.add(r);
+ }
+ }
+ }
+
+ static class Server implements Runnable {
+ private static final int bufsize = 4096;
+
+ private InetAddress localAddr;
+ private Resources resources;
+ private ExecutorService pool;
+ private Logger log;
+
+ Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ this.pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket: "+e.getMessage());
+ return;
+ }
+
+ log.info("Running on "+localAddr.toString()+":"+port);
+
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.info("Got request");
+
+ Request request;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log));
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ static class Request implements Serializable {
+ private CoordinatorID cid;
+ private ResourceType rname;
+
+ Request(CoordinatorID cid, ResourceType rname) {
+ this.cid = cid;
+ this.rname = rname;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+ private Logger log;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ this.log = log;
+ }
+
+ @Override
+ public void run() {
+ List<Resource> borrowedResources = resources.borrowed(request.cid, request.rname);
+ log.info(""+borrowedResources.size()+" "+request.rname+" resources borrowed by "+request.cid);
+ try {
+ Resource[] arr = new Resource[0];
+ ResourceTransfer.send(borrowedResources.toArray(arr), client);
+ } catch (IOException e) {
+ log.severe("Failed to send response: "+e.getMessage());
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica2/Hosts.java b/src/main/java/derms/replica2/Hosts.java
new file mode 100644
index 0000000..1392b15
--- /dev/null
+++ b/src/main/java/derms/replica2/Hosts.java
@@ -0,0 +1,26 @@
+package derms.replica2;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+class Hosts {
+ private static Map<City, String> hosts = null;
+
+ static String get(City city) throws UnknownHostException {
+ if (hosts == null)
+ init();
+
+ String host = hosts.get(city);
+ if (host == null)
+ throw new UnknownHostException("unknown host: "+city);
+ return host;
+ }
+
+ private static void init() {
+ hosts = new HashMap<City, String>();
+ hosts.put(new City("MTL"), "alpine1");
+ hosts.put(new City("QUE"), "alpine2");
+ hosts.put(new City("SHE"), "alpine3");
+ }
+}
diff --git a/src/main/java/derms/replica2/ID.java b/src/main/java/derms/replica2/ID.java
new file mode 100644
index 0000000..a61888b
--- /dev/null
+++ b/src/main/java/derms/replica2/ID.java
@@ -0,0 +1,5 @@
+package derms.replica2;
+
+class ID {
+ static final int nDigits = 4;
+}
diff --git a/src/main/java/derms/replica2/InvalidDurationException.java b/src/main/java/derms/replica2/InvalidDurationException.java
new file mode 100644
index 0000000..2596cb9
--- /dev/null
+++ b/src/main/java/derms/replica2/InvalidDurationException.java
@@ -0,0 +1,7 @@
+package derms.replica2;
+
+class InvalidDurationException extends Exception {
+ InvalidDurationException (String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica2/NoSuchResourceException.java b/src/main/java/derms/replica2/NoSuchResourceException.java
new file mode 100644
index 0000000..c1eb8c8
--- /dev/null
+++ b/src/main/java/derms/replica2/NoSuchResourceException.java
@@ -0,0 +1,7 @@
+package derms.replica2;
+
+class NoSuchResourceException extends Exception {
+ NoSuchResourceException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica2/NotBorrowedException.java b/src/main/java/derms/replica2/NotBorrowedException.java
new file mode 100644
index 0000000..aff0661
--- /dev/null
+++ b/src/main/java/derms/replica2/NotBorrowedException.java
@@ -0,0 +1,7 @@
+package derms.replica2;
+
+class NotBorrowedException extends Exception {
+ NotBorrowedException (String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica2/ObjectPacket.java b/src/main/java/derms/replica2/ObjectPacket.java
new file mode 100644
index 0000000..27852b0
--- /dev/null
+++ b/src/main/java/derms/replica2/ObjectPacket.java
@@ -0,0 +1,33 @@
+package derms.replica2;
+
+import java.io.*;
+import java.net.DatagramPacket;
+import java.net.SocketAddress;
+
+class ObjectPacket {
+ static DatagramPacket create(Serializable obj, SocketAddress remoteAddr) throws IOException {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ObjectOutputStream objStream = new ObjectOutputStream(byteStream);
+ objStream.writeObject(obj);
+ objStream.flush();
+ byte[] buf = byteStream.toByteArray();
+ objStream.close();
+ return new DatagramPacket(buf, buf.length, remoteAddr);
+ }
+
+ static <E> E deserialize(DatagramPacket pkt, Class<E> clazz) throws IOException {
+ ObjectInputStream objectStream;
+ try {
+ objectStream = new ObjectInputStream(
+ new ByteArrayInputStream(pkt.getData()));
+ } catch (Exception e) {
+ throw new IOException("failed to create input stream: "+e.getMessage());
+ }
+
+ try {
+ return clazz.cast(objectStream.readObject());
+ } catch (Exception e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica2/Replica2.java b/src/main/java/derms/replica2/Replica2.java
new file mode 100644
index 0000000..25609cc
--- /dev/null
+++ b/src/main/java/derms/replica2/Replica2.java
@@ -0,0 +1,237 @@
+package derms.replica2;
+
+import derms.Replica;
+import derms.Request;
+import derms.Response;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+public class Replica2 implements Replica {
+ static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555);
+
+ final City city;
+ final InetAddress localAddr;
+ final Resources resources;
+ final Servers servers;
+ private final Logger log;
+ private final ResponderServer responderServer;
+ private final CoordinatorServer coordinatorServer;
+ private boolean alive;
+
+ public Replica2(City city) throws IOException {
+ this.city = city;
+ this.localAddr = InetAddress.getLocalHost();
+ this.resources = new Resources();
+ this.servers = new Servers();
+ this.log = DermsLogger.getLogger(getClass());
+
+ try {
+ this.responderServer = new ResponderServer(city, resources, servers);
+ } catch (IOException e) {
+ throw new IOException("Failed to create ResponderServer: "+e.getMessage());
+ }
+ log.info("Created ResponderServer");
+
+ try {
+ this.coordinatorServer = new CoordinatorServer(city, resources, servers);
+ } catch (IOException e) {
+ throw new IOException("Failed to create CoordinatorServer: "+e.getMessage());
+ }
+ log.info("Created CoordinatorServer");
+
+ log.info("Running");
+ log.config("Local address is "+localAddr.toString());
+
+ ExecutorService pool = Executors.newCachedThreadPool();
+
+ try {
+ pool.execute(new ResourceAvailability.Server(localAddr, resources));
+ } catch (IOException e) {
+ String msg = "Failed to start ResourceAvailability Server: "+e.getMessage();
+ log.severe(msg);
+ throw e;
+ }
+ try {
+ pool.execute(new RequestResource.Server(localAddr, resources));
+ } catch (IOException e) {
+ log.severe("Failed to start RequestResource Server: "+e.getMessage());
+ throw e;
+ }
+ try {
+ pool.execute(new FindResource.Server(localAddr, resources));
+ } catch (IOException e) {
+ log.severe("Failed to start FindResource Server: "+e.getMessage());
+ throw e;
+ }
+ try {
+ pool.execute(new ReturnResource.Server(localAddr, resources));
+ } catch (IOException e) {
+ log.severe("Failed to start ReturnResource Server: "+e.getMessage());
+ throw e;
+ }
+ try {
+ pool.execute(new SwapResource.Server(localAddr, resources, servers));
+ } catch (IOException e) {
+ log.severe("Failed to start SwapResource Server: "+e.getMessage());
+ throw e;
+ }
+
+ try {
+ pool.execute(new Announcer(announceGroup, localAddr, city));
+ } catch (IOException e) {
+ log.severe("Failed to start Announcer: "+e.getMessage());
+ throw e;
+ }
+ try {
+ pool.execute(new AnnounceListener(announceGroup, localAddr, servers));
+ } catch (IOException e) {
+ log.severe("Failed to start AnnounceListener: "+e.getMessage());
+ throw e;
+ }
+
+ this.alive = true;
+ }
+
+ @Override
+ public boolean isAlive() { return alive; }
+
+ @Override
+ public void startProcess() {
+ // TODO
+ log.info(getClass().getSimpleName() + " started.");
+ }
+
+ @Override
+ public void processRequest(Request request) {
+ log.info(request.toString());
+
+ String status = "";
+ try {
+ switch (request.getFunction()) {
+ case "addResource":
+ status = addResource(request);
+ break;
+ case "removeResource":
+ status = removeResource(request);
+ break;
+ case "listResourceAvailability":
+ status = listResourceAvailability(request);
+ break;
+ case "requestResource":
+ status = requestResource(request);
+ break;
+ case "findResource":
+ status = findResource(request);
+ break;
+ case "returnResource":
+ status = returnResource(request);
+ break;
+ case "swapResource":
+ status = swapResource(request);
+ break;
+ default:
+ status = "Failure: unknown function '" + request.getFunction() + "'";
+ }
+ } catch (Exception e) {
+ log.warning(e.getMessage());
+ status = "Failure: " + request.getFunction() + ": " + e.getMessage();
+ }
+
+ Response response = new Response(request.getSequenceNumber(), status);
+ log.info("Processed request " + request + "; response: " + response);
+ replicaManager.sendResponseToFE(response);
+ }
+
+ @Override
+ public void restart() {
+ // TODO
+ shutdown();
+ startProcess();
+ }
+
+ @Override
+ public int getId() { return 2; }
+
+ private void shutdown() {
+ // TODO
+ }
+
+ private String addResource(Request request) {
+ Resource resource = new Resource(
+ ResourceID.parse(request.getResourceID()),
+ ResourceType.parse(request.getResourceType()),
+ request.getDuration());
+ responderServer.addResource(resource);
+ return "Successfully added resource " + resource;
+ }
+
+ private String removeResource(Request request) {
+ try {
+ responderServer.removeResource(
+ ResourceID.parse(request.getResourceID()),
+ request.getDuration());
+ return "Successfully removed resource " + request.getResourceID();
+ } catch (NoSuchResourceException e) {
+ String msg = "Error removing " + request.getResourceID() + ": " + e.getMessage();
+ log.warning(msg);
+ return msg;
+ }
+ }
+
+ private String listResourceAvailability(Request request) {
+ // TODO
+ throw new NotImplementedException();
+ }
+
+ private String requestResource(Request request) {
+ try {
+ coordinatorServer.requestResource(
+ CoordinatorID.parse(request.getClientID()),
+ ResourceID.parse(request.getResourceID()),
+ request.getDuration());
+ return "Successfully borrowed " + request.getResourceID();
+ } catch (NoSuchResourceException | AlreadyBorrowedException | InvalidDurationException |ServerCommunicationError e) {
+ String msg = "Failed to borrow resource " + request.getResourceID() + ": " + e.getMessage();
+ log.warning(msg);
+ return msg;
+ }
+ }
+
+ private String findResource(Request request) {
+ // TODO
+ throw new NotImplementedException();
+ }
+
+ private String returnResource(Request request) {
+ try {
+ coordinatorServer.returnResource(
+ CoordinatorID.parse(request.getClientID()),
+ ResourceID.parse(request.getResourceID()));
+ return "Successfully returned resource " + request.getResourceID();
+ } catch (NoSuchResourceException | NotBorrowedException | ServerCommunicationError e) {
+ String msg = "Failed to borrow resource " + request.getResourceID() + ": " + e.getMessage();
+ log.warning(msg);
+ return msg;
+ }
+ }
+
+ private String swapResource(Request request) {
+ try {
+ coordinatorServer.swapResource(
+ CoordinatorID.parse(request.getClientID()),
+ ResourceID.parse(request.getOldResourceID()),
+ ResourceID.parse(request.getResourceID()));
+ return "Successfully swapped " + request.getOldResourceID() + " for " + request.getResourceID();
+ } catch (NoSuchResourceException | ServerCommunicationError e) {
+ String msg = "Failed to swap " + request.getOldResourceID() + " for " + request.getResourceID() + ": " + e.getMessage();
+ log.warning(msg);
+ return msg;
+ }
+ }
+}
diff --git a/src/main/java/derms/replica2/RequestResource.java b/src/main/java/derms/replica2/RequestResource.java
new file mode 100644
index 0000000..6a905db
--- /dev/null
+++ b/src/main/java/derms/replica2/RequestResource.java
@@ -0,0 +1,223 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.*;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+class RequestResource {
+ static final int port = 5557;
+ static final int bufsize = 4096;
+
+ static class Client {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+ private int duration;
+
+ Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) {
+ this.coordinatorID = coordinatorID;
+ this.resourceID = resourceID;
+ this.duration = duration;
+ }
+
+ Response sendRequest(InetAddress serverAddr) throws IOException {
+ Request request = new Request(coordinatorID, resourceID, duration);
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage());
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ sock.close();
+ throw new IOException("Request Resource Client: failed to create request: "+e.getMessage());
+ }
+
+ sock.send(requestPkt);
+
+ byte[] buf = new byte[bufsize];
+ DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
+ try {
+ sock.receive(responsePkt);
+ } catch (Exception e) {
+ sock.close();
+ throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage());
+ }
+
+ try {
+ return ObjectPacket.deserialize(responsePkt, Response.class);
+ } catch (IOException e) {
+ throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ static class Server implements Runnable {
+ private InetAddress localAddr;
+ private Resources resources;
+ private ExecutorService pool;
+ private Logger log;
+
+ Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr.toString()
+ +": "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening on "+localAddr.toString()+":"+port);
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.info("Got request");
+
+ Request request = null;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log));
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ private static class Request implements Serializable {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+ private int duration;
+
+ private Request(CoordinatorID cid, ResourceID rid, int duration) {
+ this.coordinatorID = cid;
+ this.resourceID = rid;
+ this.duration = duration;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+ private Logger log;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ this.log = log;
+ }
+
+ @Override
+ public void run() {
+ Response response = borrow();
+
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("Failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket pkt;
+ try {
+ pkt = ObjectPacket.create(response, client);
+ } catch (IOException e) {
+ log.severe("Failed to create response packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+ try {
+ sock.send(pkt);
+ } catch (Exception e) {
+ log.severe("Failed to send response: "+e.getMessage());
+ }
+ sock.close();
+ }
+
+ private Response borrow() {
+ try {
+ Resource resource = resources.getByID(request.resourceID);
+ synchronized (resource) {
+ if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) {
+ return new Response(Response.Status.ALREADY_BORROWED,
+ request.coordinatorID+" cannot borrow "+request.resourceID
+ +"; already borrowed by "+resource.borrower);
+ } else if (request.duration <= 0) {
+ return new Response(Response.Status.INVALID_DURATION,
+ "duration "+request.duration+" less than 1");
+ } else if (request.duration > resource.duration) {
+ return new Response(Response.Status.INVALID_DURATION,
+ "cannot borrow "+resource.id+" for duration of "+request.duration
+ +"; only "+resource.duration+" remaining");
+ }
+
+ if (resource.borrower.equals(request.coordinatorID)) {
+ // Resource is already borrowed. Add to existing duration.
+ resource.borrowDuration += request.duration;
+ } else {
+ resource.borrowDuration = request.duration;
+ }
+ resource.borrower = request.coordinatorID;
+ resource.isBorrowed = true;
+ resource.duration -= request.duration;
+
+ return new Response(Response.Status.SUCCESS, request.coordinatorID
+ +" successfully borrowed "+request.resourceID);
+ }
+ } catch (NoSuchElementException e) {
+ return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID);
+ }
+ }
+ }
+
+ static class Response implements Serializable {
+ Status status;
+ String message;
+
+ private Response(Status status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ private Response() {
+ this(Status.SUCCESS, "");
+ }
+
+ public enum Status {
+ SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION
+ }
+ }
+}
+
diff --git a/src/main/java/derms/replica2/Resource.java b/src/main/java/derms/replica2/Resource.java
new file mode 100644
index 0000000..31d40bc
--- /dev/null
+++ b/src/main/java/derms/replica2/Resource.java
@@ -0,0 +1,39 @@
+package derms.replica2;
+
+import java.io.Serializable;
+
+class Resource implements Serializable {
+ ResourceID id;
+ ResourceType type;
+ int duration;
+ boolean isBorrowed;
+ CoordinatorID borrower;
+ int borrowDuration;
+
+ Resource(ResourceID id, ResourceType type, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) {
+ this.id = id;
+ this.type = type;
+ this.duration = duration;
+ this.isBorrowed = isBorrowed;
+ this.borrower = borrower;
+ this.borrowDuration = borrowDuration;
+ }
+
+ Resource(ResourceID id, ResourceType type, int duration) {
+ this(id, type, duration, false, new CoordinatorID(), -1);
+ }
+
+ Resource() {
+ this(new ResourceID(), ResourceType.AMBULANCE, 0);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return id+" "+duration;
+ }
+}
diff --git a/src/main/java/derms/replica2/ResourceAvailability.java b/src/main/java/derms/replica2/ResourceAvailability.java
new file mode 100644
index 0000000..ce4c698
--- /dev/null
+++ b/src/main/java/derms/replica2/ResourceAvailability.java
@@ -0,0 +1,128 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+class ResourceAvailability {
+ static final int port = 5556;
+
+ static class Client implements Runnable {
+ private InetAddress serverAddr;
+ private ResourceType request;
+ private Collection<Resource> resources;
+ private Logger log;
+
+ Client(InetAddress serverAddr, ResourceType request, Collection<Resource> response) throws IOException {
+ this.serverAddr = serverAddr;
+ this.request = request;
+ this.resources = response;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("Error binding socket: "+e.getMessage());
+ return;
+ }
+ log.fine("Created socket");
+
+ DatagramPacket reqPkt;
+ try {
+ reqPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ log.severe("Error creating request: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ sock.send(reqPkt);
+ } catch (IOException e) {
+ log.severe("Error sending request: "+e.getMessage());
+ sock.close();
+ return;
+ }
+ log.fine("Sent request");
+
+ Resource[] response;
+ try {
+ response = ResourceTransfer.receive(sock);
+ } catch (IOException e) {
+ log.severe(e.getMessage());
+ return;
+ } finally {
+ sock.close();
+ }
+
+ for (Resource resource : response) {
+ resources.add(resource);
+ }
+ }
+ }
+
+ static class Server implements Runnable {
+ static final int bufsize = 1024;
+
+ private InetAddress localAddr;
+ private Resources resources;
+ private Logger log;
+
+ Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr.toString()+": "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening on "+localAddr.toString()+":"+port);
+
+ DatagramPacket request = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(request);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+
+ ResourceType requestedName = null;
+ try {
+ requestedName = ObjectPacket.deserialize(request, ResourceType.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+ log.info("Got request: "+requestedName);
+
+ Resource[] response = resources.getByName(requestedName);
+ try {
+ ResourceTransfer.send(response, request.getSocketAddress());
+ } catch (IOException e) {
+ log.warning("Error transfering resources: "+e.getMessage());
+ }
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+}
diff --git a/src/main/java/derms/replica2/ResourceID.java b/src/main/java/derms/replica2/ResourceID.java
new file mode 100644
index 0000000..24da3d3
--- /dev/null
+++ b/src/main/java/derms/replica2/ResourceID.java
@@ -0,0 +1,49 @@
+package derms.replica2;
+
+import java.io.Serializable;
+
+class ResourceID implements Serializable {
+ String city;
+ short num;
+
+ ResourceID (String city, short num) {
+ this.city = city;
+ this.num = num;
+ }
+
+ ResourceID() {
+ this("XXX", (short) 1111);
+ }
+
+ static ResourceID parse(String s) throws IllegalArgumentException {
+ if (s.length() != City.codeLen+ID.nDigits) {
+ throw new IllegalArgumentException("invalid resource ID: "+s);
+ }
+ try {
+ String cityCode = s.substring(0, City.codeLen);
+ short num = Short.parseShort(s.substring(City.codeLen));
+ return new ResourceID(cityCode, num);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("invalid resource ID: "+e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != this.getClass()) {
+ return false;
+ }
+ ResourceID other = (ResourceID) obj;
+ return (this.city.equals(other.city)) && (this.num == other.num);
+ }
+
+ @Override
+ public int hashCode() {
+ return city.hashCode() * num;
+ }
+
+ @Override
+ public String toString() {
+ return city+num;
+ }
+}
diff --git a/src/main/java/derms/replica2/ResourceTransfer.java b/src/main/java/derms/replica2/ResourceTransfer.java
new file mode 100644
index 0000000..0ed17c5
--- /dev/null
+++ b/src/main/java/derms/replica2/ResourceTransfer.java
@@ -0,0 +1,50 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+class ResourceTransfer {
+ static final int bufsize = 1024;
+
+ static void send(Resource[] resources, SocketAddress remoteAddr) throws IOException {
+ DatagramSocket sock = new DatagramSocket();
+
+ for (Resource resource : resources) {
+ DatagramPacket pkt = ObjectPacket.create(resource, remoteAddr);
+ sock.send(pkt);
+ }
+
+ DatagramPacket pkt = ObjectPacket.create(new EndOfTransmission(), remoteAddr);
+ sock.send(pkt);
+ sock.close();
+ }
+
+ static Resource[] receive(DatagramSocket sock) throws IOException {
+ List<Resource> resources = new ArrayList<Resource>();
+ byte[] buf = new byte[bufsize];
+ DatagramPacket response = new DatagramPacket(buf, buf.length);
+
+ for (;;) {
+ sock.receive(response);
+
+ Object obj = ObjectPacket.deserialize(response, Object.class);
+ if (obj.getClass() == EndOfTransmission.class) {
+ break;
+ }
+ try {
+ resources.add((Resource) obj);
+ } catch (Exception e) {
+ throw new IOException("expected Resource; got "+obj.getClass().toString());
+ }
+ }
+ Resource[] arr = new Resource[0];
+ return resources.toArray(arr);
+ }
+
+ private static class EndOfTransmission implements Serializable {}
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica2/ResourceType.java b/src/main/java/derms/replica2/ResourceType.java
new file mode 100644
index 0000000..92aba32
--- /dev/null
+++ b/src/main/java/derms/replica2/ResourceType.java
@@ -0,0 +1,18 @@
+package derms.replica2;
+
+import java.io.Serializable;
+
+enum ResourceType implements Serializable {
+ AMBULANCE,
+ FIRETRUCK,
+ PERSONNEL;
+
+ static ResourceType parse(String s) throws IllegalArgumentException {
+ switch (s) {
+ case "AMBULANCE": return ResourceType.AMBULANCE;
+ case "FIRETRUCK": return ResourceType.FIRETRUCK;
+ case "PERSONNEL": return ResourceType.PERSONNEL;
+ }
+ throw new IllegalArgumentException("invalid resource name: "+s);
+ }
+}
diff --git a/src/main/java/derms/replica2/Resources.java b/src/main/java/derms/replica2/Resources.java
new file mode 100644
index 0000000..634e315
--- /dev/null
+++ b/src/main/java/derms/replica2/Resources.java
@@ -0,0 +1,74 @@
+package derms.replica2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+
+class Resources {
+ private Map<ResourceType, Map<ResourceID, Resource>> resources;
+
+ Resources() {
+ this.resources = new ConcurrentHashMap<ResourceType, Map<ResourceID, Resource>>();
+ }
+
+ List<Resource> borrowed(CoordinatorID borrower, ResourceType name) {
+ List<Resource> borrowed = new ArrayList<Resource>();
+ Resource[] namedResources = getByName(name);
+ for (Resource r : namedResources) {
+ if (r.isBorrowed && r.borrower.equals(borrower)) {
+ borrowed.add(r);
+ }
+ }
+ return borrowed;
+ }
+
+ Resource getByID(ResourceID id) throws NoSuchElementException {
+ for (Map<ResourceID, Resource> rids : resources.values()) {
+ Resource resource = rids.get(id);
+ if (resource != null) {
+ return resource;
+ }
+ }
+ throw new NoSuchElementException("No such resource "+id);
+ }
+
+ Resource[] getByName(ResourceType name) {
+ Map<ResourceID, Resource> rids = resources.get(name);
+ if (rids == null) {
+ return new Resource[0];
+ }
+ Resource[] r = new Resource[0];
+ return rids.values().toArray(r);
+ }
+
+ void add(Resource r) {
+ Map<ResourceID, Resource> rids;
+ synchronized (resources) {
+ rids = resources.get(r.type);
+ if (rids == null) {
+ rids = new ConcurrentHashMap<ResourceID, Resource>();
+ resources.put(r.type, rids);
+ }
+ }
+ synchronized (rids) {
+ Resource existing = rids.get(r.id);
+ if (existing != null) {
+ existing.duration += r.duration;
+ } else {
+ rids.put(r.id, r);
+ }
+ }
+ }
+
+ void removeByID(ResourceID id) throws NoSuchElementException {
+ for (Map<ResourceID, Resource> rids : resources.values()) {
+ if (rids.containsKey(id)) {
+ rids.remove(id);
+ return;
+ }
+ }
+ throw new NoSuchElementException("No such resource "+id);
+ }
+}
diff --git a/src/main/java/derms/replica2/ResponderID.java b/src/main/java/derms/replica2/ResponderID.java
new file mode 100644
index 0000000..480b471
--- /dev/null
+++ b/src/main/java/derms/replica2/ResponderID.java
@@ -0,0 +1,16 @@
+package derms.replica2;
+
+class ResponderID {
+ City city;
+ short num;
+
+ ResponderID(City city, short num) {
+ this.city = city;
+ this.num = num;
+ }
+
+ @Override
+ public String toString() {
+ return ""+city+"R"+num;
+ }
+}
diff --git a/src/main/java/derms/replica2/ResponderServer.java b/src/main/java/derms/replica2/ResponderServer.java
new file mode 100644
index 0000000..02ff6b6
--- /dev/null
+++ b/src/main/java/derms/replica2/ResponderServer.java
@@ -0,0 +1,94 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+class ResponderServer {
+ static final Duration timeout = Duration.ofSeconds(5);
+
+ private City city;
+ private Resources resources;
+ private Servers servers;
+ private Logger log;
+
+ ResponderServer(City city, Resources resources, Servers servers) throws IOException {
+ this.city = city;
+ this.resources = resources;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ ResponderServer() throws IOException {
+ this(new City(), new Resources(), new Servers());
+ }
+
+ void addResource(Resource r) {
+ resources.add(r);
+ log.info("Added resource "+r+" - success");
+ }
+
+ void removeResource(ResourceID rid, int duration) throws NoSuchResourceException {
+ log.info("Remove duration "+duration+" from "+rid);
+ try {
+ Resource resource = resources.getByID(rid);
+ synchronized (resource) {
+ if (duration < 0 || duration >= resource.duration) {
+ resources.removeByID(rid);
+ log.info("Removed resource "+rid+" - success");
+ } else {
+ resource.duration -= duration;
+ log.info("Removed duration from resource "+rid+". New duration: "+resource.duration+" - success");
+ }
+ }
+ } catch (NoSuchElementException e) {
+ // Wanted to remove duration from resource.
+ if (duration >= 0) {
+ String msg = "Cannot remove duration from "+rid+": resource does not exist.";
+ log.warning(msg);
+ throw new NoSuchResourceException(msg);
+ }
+
+ // Duration is negative---wanted to remove resource completely.
+ // Success because it is already removed.
+ log.info("Not removing "+rid+": resource does not exist.");
+ }
+ }
+
+ Resource[] listResourceAvailability(ResourceType rname) throws ServerCommunicationError {
+ log.info("Request for available "+rname);
+ Collection<Resource> availableResources = ConcurrentHashMap.newKeySet();
+ ExecutorService pool = Executors.newFixedThreadPool(servers.size());
+ try {
+ for (InetAddress serverAddr : servers.all()) {
+ pool.execute(new ResourceAvailability.Client(serverAddr, rname, availableResources));
+ }
+ } catch (IOException e) {
+ String msg = "Failed to start ResourceAvailability Client: "+e.getMessage();
+ log.severe(msg);
+ throw new ServerCommunicationError("ResourceAvailability: "+msg);
+ }
+
+ log.fine("Started worker threads");
+ pool.shutdown();
+ boolean terminated;
+ try {
+ terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new ServerCommunicationError("listResourceAvailability(): listResourceAvailability() interrupted: "+e.getMessage());
+ }
+ if (!terminated) {
+ throw new ServerCommunicationError("ResourceAvailability: request timed out: no response after "+timeout.toString());
+ }
+ log.info("Response length "+availableResources.size());
+ Resource[] arr = new Resource[0];
+ return availableResources.toArray(arr);
+ }
+}
diff --git a/src/main/java/derms/replica2/ReturnResource.java b/src/main/java/derms/replica2/ReturnResource.java
new file mode 100644
index 0000000..6c42b8a
--- /dev/null
+++ b/src/main/java/derms/replica2/ReturnResource.java
@@ -0,0 +1,195 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.*;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+class ReturnResource {
+ public static final int port = 5559;
+ public static final int bufsize = 4096;
+
+ static class Client {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+
+ Client(CoordinatorID cid, ResourceID rid) {
+ this.coordinatorID = cid;
+ this.resourceID = rid;
+ }
+
+ Response sendRequest(InetAddress serverAddr) throws IOException {
+ Request request = new Request(coordinatorID, resourceID);
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ throw new IOException("Return Resource Client: failed to open socket: "+e.getMessage());
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ sock.close();
+ throw new IOException("Return Resource Client: failed to create request: "+e.getMessage());
+ }
+
+ sock.send(requestPkt);
+
+ byte[] buf = new byte[bufsize];
+ DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
+ try {
+ sock.receive(responsePkt);
+ } catch (Exception e) {
+ sock.close();
+ throw new IOException("Return Resource Client: error receiving from server: "+e.getMessage());
+ }
+
+ try {
+ return ObjectPacket.deserialize(responsePkt, Response.class);
+ } catch (IOException e) {
+ throw new IOException("Return Resource Client: failed to deserialize response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ static class Server implements Runnable {
+ private InetAddress localAddr;
+ private Resources resources;
+ private ExecutorService pool;
+ private Logger log;
+
+ Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr.toString()
+ +": "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening on "+localAddr.toString()+":"+port);
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.fine("Got request");
+
+ Request request = null;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources));
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ private static class Request implements Serializable {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+
+ private Request(CoordinatorID cid, ResourceID rid) {
+ this.coordinatorID = cid;
+ this.resourceID = rid;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources) {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ }
+
+ @Override
+ public void run() {
+ Response response = returnResource();
+
+ DatagramSocket sock;
+ try {
+ sock= new DatagramSocket();
+ } catch (Exception e) {
+ System.err.println("Request Resource Server: failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket pkt;
+ try {
+ pkt = ObjectPacket.create(response, client);
+ } catch (IOException e) {
+ System.err.println("Request Resource Server: failed to create response packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+ try {
+ sock.send(pkt);
+ } catch (Exception e) {
+ System.err.println("Request Resource Server: failed to send response: "+e.getMessage());
+ }
+ sock.close();
+ }
+
+ private Response returnResource() {
+ try {
+ Resource resource = resources.getByID(request.resourceID);
+ synchronized (resource) {
+ if (!resource.isBorrowed || !resource.borrower.equals(request.coordinatorID)) {
+ return new Response(Response.Status.NOT_BORROWED,
+ request.resourceID+" is not borrowed by "+request.coordinatorID);
+ }
+ resource.isBorrowed = false;
+ resource.borrower = new CoordinatorID();
+ resource.borrowDuration = -1;
+ return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id);
+ }
+ } catch (NoSuchElementException e) {
+ return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID);
+ }
+ }
+ }
+
+ static class Response implements Serializable {
+ Status status;
+ String message;
+
+ private Response(Status status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ enum Status {
+ SUCCESS, NO_SUCH_RESOURCE, NOT_BORROWED
+ }
+ }
+}
diff --git a/src/main/java/derms/replica2/ServerCommunicationError.java b/src/main/java/derms/replica2/ServerCommunicationError.java
new file mode 100644
index 0000000..2bf4d33
--- /dev/null
+++ b/src/main/java/derms/replica2/ServerCommunicationError.java
@@ -0,0 +1,7 @@
+package derms.replica2;
+
+class ServerCommunicationError extends Exception {
+ ServerCommunicationError(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica2/Servers.java b/src/main/java/derms/replica2/Servers.java
new file mode 100644
index 0000000..498b7ce
--- /dev/null
+++ b/src/main/java/derms/replica2/Servers.java
@@ -0,0 +1,34 @@
+package derms.replica2;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+class Servers {
+ private Map<City, InetAddress> servers = new ConcurrentHashMap<City, InetAddress>();
+
+ /** Returns the address of the server located in the specified city, or null if there is no server in the city. */
+ InetAddress get(City city) {
+ return servers.get(city);
+ }
+
+ /**
+ * Associates the specified server address with the specified city.
+ * If there was already a server associated with the city, the old value is replaced.
+ * @param city the city where the server is located
+ * @param addr the address of the server
+ * @return the previous server address, or null if there was no server associated with this city.
+ */
+ InetAddress put(City city, InetAddress addr) {
+ return servers.put(city, addr);
+ }
+
+ Collection<InetAddress> all() {
+ return servers.values();
+ }
+
+ int size() {
+ return servers.size();
+ }
+}
diff --git a/src/main/java/derms/replica2/SwapResource.java b/src/main/java/derms/replica2/SwapResource.java
new file mode 100644
index 0000000..cc65f29
--- /dev/null
+++ b/src/main/java/derms/replica2/SwapResource.java
@@ -0,0 +1,262 @@
+package derms.replica2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.*;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Logger;
+
+class SwapResource {
+ static final int port = 5560;
+ static final int bufSize = 4096;
+
+ static class Client {
+ private CoordinatorID cid;
+ private ResourceID oldRID;
+ private ResourceID newRID;
+
+ Client(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) {
+ this.cid = cid;
+ this.oldRID = oldRID;
+ this.newRID = newRID;
+ }
+
+ Response sendRequest(InetAddress serverAddr) throws IOException {
+ Request request = new Request(cid, oldRID, newRID);
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ throw new IOException("Swap Resource Client: failed to open socket: "+e.getMessage());
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ sock.close();
+ throw new IOException("Swap Resource Client: failed to create request: "+e.getMessage());
+ }
+
+ sock.send(requestPkt);
+
+ byte[] buf = new byte[bufSize];
+ DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
+ try {
+ sock.receive(responsePkt);
+ } catch (Exception e) {
+ sock.close();
+ throw new IOException("Swap Resource Client: error receiving from server: "+e.getMessage());
+ }
+
+ try {
+ return ObjectPacket.deserialize(responsePkt, Response.class);
+ } catch (IOException e) {
+ throw new IOException("Swap Resource Client: failed to deserialize response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ static class Server implements Runnable {
+ private InetAddress localAddr;
+ private Resources resources;
+ private Servers servers;
+ private ExecutorService pool;
+ private Logger log;
+
+ Server(InetAddress localAddr, Resources resources, Servers servers) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ this.servers = servers;
+ pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr+": "+e.getMessage());
+ return;
+ }
+ log.info("Listening on "+localAddr+":"+port);
+
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufSize], bufSize);
+
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.fine("Got request");
+
+ Request request = null;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ SocketAddress client = requestPkt.getSocketAddress();
+ try {
+ RequestHandler handler = new RequestHandler(request, client, resources, servers);
+ pool.execute(handler);
+ } catch (IOException e) {
+ log.warning("Failed to create request handler: "+e.getMessage());
+ continue;
+ }
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ private static class Request implements Serializable {
+ private CoordinatorID cid;
+ private ResourceID oldRID;
+ private ResourceID newRID;
+
+ private Request(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) {
+ this.cid = cid;
+ this.oldRID = oldRID;
+ this.newRID = newRID;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+ private Servers servers;
+ private Logger log;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources, Servers servers) throws IOException {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ Response response = swapResources();
+
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket pkt;
+ try {
+ pkt = ObjectPacket.create(response, client);
+ } catch (IOException e) {
+ log.severe("failed to create response: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ sock.send(pkt);
+ } catch (Exception e) {
+ log.severe("failed to send response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+
+ private Response swapResources() {
+ try {
+ Resource resource = resources.getByID(request.oldRID);
+ synchronized (resource) {
+ if (!resource.borrower.equals(request.cid)) {
+ return new Response(Response.Status.NOT_BORROWED, "resource "+request.oldRID+" not borrowed by "+request.cid);
+ }
+ try {
+ acquireNewResource(resource.borrowDuration);
+ returnOldResource(resource);
+ return new Response(Response.Status.SUCCESS, request.cid+" success fully swapped "+request.oldRID+" for "+request.newRID);
+ } catch (UnknownHostException e) {
+ return new Response(Response.Status.UNKNOWN_HOST, e.getMessage());
+ } catch (IOException e) {
+ return new Response(Response.Status.FAILURE, e.getMessage());
+ } catch (CannotBorrow e) {
+ return new Response(Response.Status.CANNOT_BORROW, e.getMessage());
+ }
+ }
+ } catch (NoSuchElementException e) {
+ return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.oldRID);
+ }
+ }
+
+ private void acquireNewResource(int borrowDuration) throws UnknownHostException, IOException, CannotBorrow {
+ RequestResource.Client requestClient = new RequestResource.Client(request.cid, request.newRID, borrowDuration);
+ City city = new City(request.newRID.city);
+ InetAddress requestResourceServer = servers.get(city);
+ if (requestResourceServer == null) {
+ throw new UnknownHostException(city.toString());
+ }
+ RequestResource.Response response = requestClient.sendRequest(requestResourceServer);
+ // TODO: make exception handling more granular---pass through status from Request.
+ if (response.status != RequestResource.Response.Status.SUCCESS) {
+ throw new CannotBorrow(request.cid, request.newRID, response.message);
+ }
+ }
+
+ private void returnOldResource(Resource r) {
+ r.isBorrowed = false;
+ r.borrower = new CoordinatorID();
+ r.borrowDuration = -1;
+ }
+ }
+
+ static class Response implements Serializable {
+ Status status;
+ String message;
+
+ private Response(Status status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ enum Status {
+ SUCCESS,
+ FAILURE,
+ NO_SUCH_RESOURCE,
+ NOT_BORROWED,
+ CANNOT_BORROW,
+ UNKNOWN_HOST
+ }
+ }
+
+ private static class CannotBorrow extends Exception {
+ CoordinatorID attemptedBorrower;
+ ResourceID rid;
+ String message;
+
+ private CannotBorrow(CoordinatorID attemptedBorrower, ResourceID rid, String message) {
+ this.attemptedBorrower = attemptedBorrower;
+ this.rid = rid;
+ this.message = message;
+ }
+
+ @Override
+ public String getMessage() {
+ return attemptedBorrower+" failed to borrow "+rid+": "+message;
+ }
+ }
+} \ No newline at end of file