summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSam Anthony <sam@samanthony.xyz>2024-11-23 13:51:12 -0500
committerSam Anthony <sam@samanthony.xyz>2024-11-23 13:51:12 -0500
commit3c62b863509131e78c18ed13c6b83e4fc508848f (patch)
treecf246e91da283edf704af936b9cef7eb8e09a6f8
parente3b72053e8b04f2df013da0d7d49fe33927461a9 (diff)
downloadsoen423-3c62b863509131e78c18ed13c6b83e4fc508848f.zip
import replica code from assignment
-rw-r--r--TODO1
-rw-r--r--src/main/java/derms/replica/replica1/AlreadyBorrowedException.java7
-rw-r--r--src/main/java/derms/replica/replica1/AnnounceListener.java85
-rw-r--r--src/main/java/derms/replica/replica1/Announcer.java73
-rw-r--r--src/main/java/derms/replica/replica1/CLI.java91
-rw-r--r--src/main/java/derms/replica/replica1/City.java38
-rw-r--r--src/main/java/derms/replica/replica1/Client.java30
-rw-r--r--src/main/java/derms/replica/replica1/Coordinator.java22
-rw-r--r--src/main/java/derms/replica/replica1/CoordinatorClient.java40
-rw-r--r--src/main/java/derms/replica/replica1/CoordinatorClientCLI.java212
-rw-r--r--src/main/java/derms/replica/replica1/CoordinatorID.java34
-rw-r--r--src/main/java/derms/replica/replica1/CoordinatorServer.java176
-rw-r--r--src/main/java/derms/replica/replica1/DermsLogger.java23
-rw-r--r--src/main/java/derms/replica/replica1/FindResource.java164
-rw-r--r--src/main/java/derms/replica/replica1/Hosts.java27
-rw-r--r--src/main/java/derms/replica/replica1/ID.java5
-rw-r--r--src/main/java/derms/replica/replica1/InvalidDurationException.java7
-rw-r--r--src/main/java/derms/replica/replica1/NoSuchResourceException.java7
-rw-r--r--src/main/java/derms/replica/replica1/NotBorrowedException.java7
-rw-r--r--src/main/java/derms/replica/replica1/ObjectPacket.java38
-rw-r--r--src/main/java/derms/replica/replica1/RequestResource.java227
-rw-r--r--src/main/java/derms/replica/replica1/Resource.java39
-rw-r--r--src/main/java/derms/replica/replica1/ResourceAvailability.java128
-rw-r--r--src/main/java/derms/replica/replica1/ResourceID.java49
-rw-r--r--src/main/java/derms/replica/replica1/ResourceName.java18
-rw-r--r--src/main/java/derms/replica/replica1/ResourceTransfer.java50
-rw-r--r--src/main/java/derms/replica/replica1/Resources.java74
-rw-r--r--src/main/java/derms/replica/replica1/Responder.java16
-rw-r--r--src/main/java/derms/replica/replica1/ResponderClient.java34
-rw-r--r--src/main/java/derms/replica/replica1/ResponderClientCLI.java151
-rw-r--r--src/main/java/derms/replica/replica1/ResponderID.java16
-rw-r--r--src/main/java/derms/replica/replica1/ResponderServer.java99
-rw-r--r--src/main/java/derms/replica/replica1/ReturnResource.java199
-rw-r--r--src/main/java/derms/replica/replica1/ServerCommunicationError.java7
-rw-r--r--src/main/java/derms/replica/replica1/Servers.java34
-rw-r--r--src/main/java/derms/replica/replica1/StationServer.java146
-rw-r--r--src/main/java/derms/replica/replica1/SwapResource.java267
37 files changed, 2641 insertions, 0 deletions
diff --git a/TODO b/TODO
index 42b1327..ebe578f 100644
--- a/TODO
+++ b/TODO
@@ -21,3 +21,4 @@ sequencer?
test reliable multicast
test total-order multicast
test reliable unicast
+adapt replica code from assignment
diff --git a/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java b/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java
new file mode 100644
index 0000000..634f154
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java
@@ -0,0 +1,7 @@
+package derms.replica.replica1;
+
+public class AlreadyBorrowedException extends Exception {
+ public AlreadyBorrowedException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/AnnounceListener.java b/src/main/java/derms/replica/replica1/AnnounceListener.java
new file mode 100644
index 0000000..bd58884
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/AnnounceListener.java
@@ -0,0 +1,85 @@
+package derms.replica.replica1;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.util.logging.Logger;
+
+public class AnnounceListener implements Runnable {
+ private static final int bufsize = 1024;
+
+ private InetSocketAddress groupAddr;
+ private InetAddress localAddr;
+ private Servers servers;
+ private Logger log;
+
+ public AnnounceListener(InetSocketAddress groupAddr, InetAddress localAddr, Servers servers) throws IOException {
+ this.groupAddr = groupAddr;
+ this.localAddr = localAddr;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ NetworkInterface netInterface = null;
+ try {
+ netInterface = NetworkInterface.getByInetAddress(localAddr);
+ if (netInterface == null) {
+ throw new Exception("netInterface is null");
+ }
+ } catch (Exception e) {
+ log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage());
+ return;
+ }
+
+ MulticastSocket sock = null;
+ try {
+ sock = new MulticastSocket(groupAddr.getPort());
+ sock.joinGroup(groupAddr, netInterface);
+ } catch (Exception e) {
+ log.severe("Failed to open multicast socket: "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening to "+groupAddr.toString()+" from "+localAddr.toString()+" ("+netInterface.getName()+")");
+ byte[] buf = new byte[bufsize];
+ DatagramPacket pkt = new DatagramPacket(buf, buf.length);
+
+ for (;;) {
+ try {
+ sock.receive(pkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from multicast socket: "+e.getMessage());
+ continue;
+ }
+
+ ObjectInputStream objStream;
+ try {
+ objStream = new ObjectInputStream(
+ new ByteArrayInputStream(pkt.getData()));
+ } catch (IOException e) {
+ log.warning("Failed to create input stream: "+e.getMessage());
+ continue;
+ }
+
+ City city;
+ try {
+ city = (City) objStream.readObject();
+ } catch (Exception e) {
+ log.warning("Failed to deserialize data: "+e.getMessage());
+ continue;
+ }
+
+ InetAddress remote = pkt.getAddress();
+ if (servers.put(city, remote) == null) {
+ log.info("Added remote server "+city.toString()+" "+remote.toString());
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/Announcer.java b/src/main/java/derms/replica/replica1/Announcer.java
new file mode 100644
index 0000000..e9dedc4
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Announcer.java
@@ -0,0 +1,73 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketAddress;
+import java.util.logging.Logger;
+
+public class Announcer implements Runnable {
+ public static final long intervalMillis = 3000;
+
+ private SocketAddress group;
+ private InetAddress localAddr;
+ private City city;
+ private Logger log;
+
+ public Announcer(SocketAddress group, InetAddress localAddr, City city) throws IOException {
+ this.group = group;
+ this.localAddr = localAddr;
+ this.city = city;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ NetworkInterface netInterface = null;
+ try {
+ netInterface = NetworkInterface.getByInetAddress(localAddr);
+ if (netInterface == null) {
+ throw new Exception("netInterface is null");
+ }
+ } catch (Exception e) {
+ log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage());
+ return;
+ }
+
+ MulticastSocket sock = null;
+ try {
+ sock = new MulticastSocket();
+ sock.joinGroup(group, netInterface);
+ } catch (Exception e) {
+ log.severe("Failed to open multicast socket: "+e.getMessage());
+ return;
+ }
+
+ log.info("Announcing from "+localAddr.toString()+" ("+netInterface.getName()+") to "+group.toString());
+
+ DatagramPacket pkt = null;
+ try {
+ pkt = ObjectPacket.create(city, group);
+ } catch (IOException e) {
+ log.severe("Failed to create packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ for (;;) {
+ sock.send(pkt);
+ Thread.sleep(intervalMillis);
+ }
+ } catch (IOException e) {
+ log.severe("Failed to send to multicast socket "+group.toString()+": "+e.getMessage());
+ } catch (InterruptedException e) {
+ log.info("Interrupted.");
+ } finally {
+ log.info("Shutting down...");
+ sock.close();
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/CLI.java b/src/main/java/derms/replica/replica1/CLI.java
new file mode 100644
index 0000000..88e166a
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/CLI.java
@@ -0,0 +1,91 @@
+package derms.replica.replica1;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+public abstract class CLI implements Runnable {
+ protected Map<String, Command> commands = new HashMap<String, Command>();
+ protected List<Description> cmdDescriptions = new ArrayList<Description>();
+ protected List<Description> argDescriptions = new ArrayList<Description>();
+
+ protected CLI() {
+ commands.put("quit", new Quit());
+ cmdDescriptions.add(new Description("quit", "Exit the program"));
+
+ commands.put("help", new Help());
+ cmdDescriptions.add(new Description("help", "List commands"));
+ }
+
+ @Override
+ public void run() {
+ Scanner scanner = new Scanner(System.in);
+ System.out.println("Type 'help' for a list of commands.");
+ for (;;) {
+ System.out.print("Command: ");
+ String input = scanner.nextLine();
+ String[] fields = input.split(" ");
+ if (fields.length < 1 || fields[0] == "") {
+ continue;
+ }
+ Command cmd = commands.get(fields[0]);
+ if (cmd == null) {
+ System.out.println("Invalid command '"+fields[0]+"'");
+ System.out.println("Type 'help' for a list of commands.");
+ continue;
+ }
+ String[] args = null;
+ if (fields.length < 2) {
+ args = new String[0];
+ } else {
+ args = Arrays.copyOfRange(fields, 1, fields.length);
+ }
+ cmd.exec(args);
+ }
+ }
+
+ protected interface Command {
+ public void exec(String[] args);
+ }
+
+ protected class Quit implements Command {
+ @Override
+ public void exec(String[] args) {
+ System.out.println("Shutting down...");
+ System.exit(1);
+ }
+ }
+
+ protected class Help implements Command {
+ @Override
+ public void exec(String[] args) {
+ System.out.println("\nCommands:");
+ for (Description d : cmdDescriptions) {
+ System.out.println(d);
+ }
+ System.out.println("\nArguments:");
+ for (Description d : argDescriptions) {
+ System.out.println(d);
+ }
+ System.out.println();
+ }
+ }
+
+ protected class Description {
+ String object; /// The thing being described
+ String description;
+
+ protected Description(String object, String description) {
+ this.object = object;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return object+"\n\t"+description;
+ }
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/City.java b/src/main/java/derms/replica/replica1/City.java
new file mode 100644
index 0000000..548f1fa
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/City.java
@@ -0,0 +1,38 @@
+package derms.replica.replica1;
+
+import java.io.Serializable;
+
+public class City implements Serializable {
+ public static final int codeLen = 3;
+
+ private String code;
+
+ public City(String code) throws IllegalArgumentException {
+ if (code.length() != codeLen)
+ throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters");
+ this.code = code;
+ }
+
+ public City() {
+ this("XXX");
+ }
+
+ @Override
+ public String toString() {
+ return code;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || this.getClass() != obj.getClass()) {
+ return false;
+ }
+ City other = (City) obj;
+ return this.code.equals(other.code);
+ }
+
+ @Override
+ public int hashCode() {
+ return code.hashCode();
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/Client.java b/src/main/java/derms/replica/replica1/Client.java
new file mode 100644
index 0000000..7ebbb68
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Client.java
@@ -0,0 +1,30 @@
+package derms.replica.replica1;
+
+import javax.xml.namespace.QName;
+import javax.xml.ws.Service;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+
+public abstract class Client<I, C> {
+ public static final String namespace = "derms.samanthony.xyz";
+ public static final int port = 8080;
+
+ private final Class<I> endpointInterface;
+ private final QName qname;
+
+ protected Client(Class<I> endpointInterface, Class<C> endpointClass) {
+ this.endpointInterface = endpointInterface;
+ this.qname = new QName("http://"+namespace+"/", endpointClass.getSimpleName()+"Service");
+ }
+
+ protected I connect(String host) throws MalformedURLException {
+ URL url = new URL("http://"+host+":"+port+"/"+endpointInterface.getSimpleName()+"?wsdl");
+ return Service.create(url, qname).getPort(endpointInterface);
+ }
+
+ protected I connect(City city) throws UnknownHostException, MalformedURLException {
+ String host = Hosts.get(city);
+ return connect(host);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/Coordinator.java b/src/main/java/derms/replica/replica1/Coordinator.java
new file mode 100644
index 0000000..b963d58
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Coordinator.java
@@ -0,0 +1,22 @@
+package derms.replica.replica1;
+
+import javax.jws.WebService;
+import javax.jws.soap.SOAPBinding;
+
+@WebService
+@SOAPBinding(style = SOAPBinding.Style.RPC)
+public interface Coordinator {
+ void requestResource(CoordinatorID cid, ResourceID rid, int duration)
+ throws ServerCommunicationError, NoSuchResourceException,
+ AlreadyBorrowedException, InvalidDurationException;
+
+ Resource[] findResource(CoordinatorID cid, ResourceName rname)
+ throws ServerCommunicationError;
+
+ void returnResource(CoordinatorID cid, ResourceID rid)
+ throws ServerCommunicationError, NoSuchResourceException,
+ NotBorrowedException;
+
+ void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID)
+ throws ServerCommunicationError, NoSuchResourceException;
+}
diff --git a/src/main/java/derms/replica/replica1/CoordinatorClient.java b/src/main/java/derms/replica/replica1/CoordinatorClient.java
new file mode 100644
index 0000000..6016c31
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/CoordinatorClient.java
@@ -0,0 +1,40 @@
+package derms.replica.replica1;
+
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+public class CoordinatorClient extends Client<Coordinator, CoordinatorServer> {
+ public CoordinatorID id;
+ public Coordinator server;
+
+ public CoordinatorClient(CoordinatorID id) throws UnknownHostException, MalformedURLException {
+ super(Coordinator.class, CoordinatorServer.class);
+ this.id = id;
+ this.server = connect(new City(id.city));
+ }
+
+ public CoordinatorClient(City city, short idNum) throws UnknownHostException, MalformedURLException {
+ this(new CoordinatorID(city.toString(), idNum));
+ }
+
+ public void requestResource(ResourceID resourceID, int duration)
+ throws ServerCommunicationError, NoSuchResourceException,
+ AlreadyBorrowedException, InvalidDurationException
+ {
+ server.requestResource(id, resourceID, duration);
+ }
+
+ public void returnResource(ResourceID resourceID)
+ throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException
+ {
+ server.returnResource(id, resourceID);
+ }
+
+ public Resource[] findResource(ResourceName name) throws ServerCommunicationError {
+ return server.findResource(id, name);
+ }
+
+ public void swapResource(ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException {
+ server.swapResource(id, oldRID, newRID);
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/CoordinatorClientCLI.java b/src/main/java/derms/replica/replica1/CoordinatorClientCLI.java
new file mode 100644
index 0000000..c1ac48a
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/CoordinatorClientCLI.java
@@ -0,0 +1,212 @@
+package derms.replica.replica1;
+
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+public class CoordinatorClientCLI extends CLI {
+ public static final String usage = "Usage: java CoordinatorClient <city> <id number>";
+
+ private final CoordinatorClient client;
+
+ private CoordinatorClientCLI(City city, short idNum) throws UnknownHostException, MalformedURLException {
+ client = new CoordinatorClient(city, idNum);
+ System.out.println("ID: "+client.id);
+
+ commands.put("request", new Request());
+ cmdDescriptions.add(new Description(
+ "request <resource ID> <duration>",
+ "Borrow a resource and reduce its duration."));
+
+ commands.put("find", new Find());
+ cmdDescriptions.add(new Description(
+ "find <resource name>",
+ "List borrowed resources."));
+
+ commands.put("return", new Return());
+ cmdDescriptions.add(new Description(
+ "return <resourceID>",
+ "Return a currently borrowed resource."));
+
+ commands.put("swap", new Swap());
+ cmdDescriptions.add(new Description(
+ "swap <old resourceID> <new resourceID>",
+ "Return the old resource and borrow the new one."));
+
+ argDescriptions.add(new Description(
+ "<resource ID>",
+ "3-letter city code followed by 4-digit number."));
+ argDescriptions.add(new Description(
+ "<resource name>",
+ "E.g., AMBULANCE."));
+ argDescriptions.add(new Description(
+ "<duration>",
+ "A number representing a time period."));
+ }
+
+ public static void main(String[] cmdlineArgs) {
+ Args args = null;
+ try {
+ args = new Args(cmdlineArgs);
+ } catch (IllegalArgumentException e) {
+ System.err.println(e.getMessage());
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+ try {
+ (new CoordinatorClientCLI(args.city, args.idNum)).run();
+ } catch (UnknownHostException | MalformedURLException e) {
+ System.err.println(e.getMessage());
+ System.exit(1);
+ }
+ }
+
+ private class Request implements Command {
+ public void exec(String[] args) {
+ if (args.length < 2) {
+ System.out.println("invalid arguments for 'request'");
+ } else {
+ requestResource(args[0], args[1]);
+ }
+ }
+ }
+
+ private void requestResource(String resourceIDStr, String durationStr) {
+ ResourceID resourceID;
+ try {
+ resourceID = ResourceID.parse(resourceIDStr);
+ } catch (IllegalArgumentException e) {
+ System.out.println(e.getMessage());
+ return;
+ }
+
+ int duration;
+ try {
+ duration = Integer.parseInt(durationStr);
+ if (duration < 0) {
+ throw new IllegalArgumentException("invalid duration: "+durationStr);
+ }
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ return;
+ }
+
+ try {
+ client.requestResource(resourceID, duration);
+ System.out.println("Successfully borrowed resource.");
+ } catch (ServerCommunicationError | NoSuchResourceException | AlreadyBorrowedException | InvalidDurationException e) {
+ System.out.println("Failed to borrow resource: "+e.getMessage());
+ }
+ }
+
+ private class Return implements Command {
+ public void exec(String[] args) {
+ if (args.length < 1) {
+ System.out.println("invalid arguments for 'return'");
+ } else {
+ returnResource(args[0]);
+ }
+ }
+ }
+
+ private void returnResource(String resourceIDStr) {
+ ResourceID resourceID;
+ try {
+ resourceID = ResourceID.parse(resourceIDStr);
+ } catch (IllegalArgumentException e) {
+ System.out.println("Invalid resource ID: "+e.getMessage());
+ return;
+ }
+
+ try {
+ client.returnResource(resourceID);
+ System.out.println("Successfully returned resource "+resourceIDStr);
+ } catch (ServerCommunicationError | NoSuchResourceException | NotBorrowedException e) {
+ System.out.println("Failed to return resource: "+e.getMessage());
+ }
+ }
+
+ private class Find implements Command {
+ public void exec(String[] args) {
+ if (args.length < 1) {
+ System.out.println("invalid arguments for 'find'");
+ } else {
+ findResource(args[0]);
+ }
+ }
+ }
+
+ private void findResource(String resourceNameStr) {
+ ResourceName resourceName;
+ try {
+ resourceName = ResourceName.parse(resourceNameStr);
+ } catch (Exception e) {
+ System.out.println("Invalid resource name: "+resourceNameStr);
+ return;
+ }
+
+ Resource[] resources;
+ try {
+ resources = client.findResource(resourceName);
+ } catch (ServerCommunicationError e) {
+ System.out.println("Failed to find resource "+resourceName+": "+e.getMessage());
+ return;
+ }
+
+ System.out.println("Borrowed "+resourceNameStr+" resources:");
+ for (Resource r : resources) {
+ String rid = r.id.toString();
+ System.out.println("\t"+rid+" "+r.borrowDuration);
+ }
+ }
+
+ private class Swap implements Command {
+ public void exec(String[] args) {
+ if (args.length < 2) {
+ System.out.println("invalid arguments for 'swap'");
+ } else {
+ swapResource(args[0], args[1]);
+ }
+ }
+ }
+
+ private void swapResource(String oldRIDStr, String newRIDStr) {
+ ResourceID oldRID;
+ ResourceID newRID;
+ try {
+ oldRID = ResourceID.parse(oldRIDStr);
+ newRID = ResourceID.parse(newRIDStr);
+ } catch (IllegalArgumentException e) {
+ System.out.println("Invalid resource id");
+ return;
+ }
+
+ try {
+ client.swapResource(oldRID, newRID);
+ System.out.println("Successfully swapped "+oldRID+" for "+newRID);
+ } catch (ServerCommunicationError | NoSuchResourceException e) {
+ System.out.println("Failed to swap resources: "+e.getMessage());
+ }
+ }
+
+ private static class Args {
+ private final City city;
+ private final short idNum;
+
+ private Args(String[] args) throws IllegalArgumentException {
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Missing argument 'city'");
+ }
+ city = new City(args[0]);
+
+ if (args.length < 2) {
+ throw new IllegalArgumentException("Missing argument 'id number'");
+ }
+ try {
+ idNum = Short.parseShort(args[1]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Bad value of 'id number'");
+ }
+ }
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/CoordinatorID.java b/src/main/java/derms/replica/replica1/CoordinatorID.java
new file mode 100644
index 0000000..c09124c
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/CoordinatorID.java
@@ -0,0 +1,34 @@
+package derms.replica.replica1;
+
+import java.io.Serializable;
+
+public class CoordinatorID implements Serializable {
+ public String city;
+ public short num;
+
+ public CoordinatorID(String city, short num) {
+ this.city = city;
+ this.num = num;
+ }
+
+ public CoordinatorID(String city, int num) {
+ this(city, (short) num);
+ }
+
+ public CoordinatorID() {
+ this("XXX", 0);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() != this.getClass())
+ return false;
+ CoordinatorID other = (CoordinatorID) obj;
+ return other.city.equals(this.city) && other.num == this.num;
+ }
+
+ @Override
+ public String toString() {
+ return city+"C"+num;
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/CoordinatorServer.java b/src/main/java/derms/replica/replica1/CoordinatorServer.java
new file mode 100644
index 0000000..ce1cc9e
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/CoordinatorServer.java
@@ -0,0 +1,176 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import javax.jws.WebService;
+
+@WebService(endpointInterface = "xyz.samanthony.derms.Coordinator")
+public class CoordinatorServer implements Coordinator {
+ public static final Duration timeout = Duration.ofSeconds(5);
+
+ private City city;
+ private Resources resources;
+ private Servers servers;
+ private Logger log;
+
+ public CoordinatorServer(City city, Resources resources, Servers servers) throws IOException {
+ super();
+ this.city = city;
+ this.resources = resources;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ public CoordinatorServer() throws IOException {
+ this(new City(), new Resources(), new Servers());
+ }
+
+ @Override
+ public void requestResource(CoordinatorID cid, ResourceID rid, int duration)
+ throws ServerCommunicationError, NoSuchResourceException,
+ AlreadyBorrowedException, InvalidDurationException
+ {
+ log.info("Request for "+rid+" from "+cid);
+
+ InetAddress server = servers.get(new City(rid.city));
+ if (server == null) {
+ throw new ServerCommunicationError("requestResource(): no connection to server "+rid.city.toString());
+ }
+
+ RequestResource.Client client = new RequestResource.Client(cid, rid, duration);
+ RequestResource.Response response;
+ try {
+ response = client.sendRequest(server);
+ } catch (IOException e) {
+ throw new ServerCommunicationError("requestResource(): "+e.getMessage());
+ }
+ switch (response.status) {
+ case SUCCESS:
+ log.info("Request "+rid+" from "+cid+" - success");
+ break;
+ case NO_SUCH_RESOURCE:
+ log.warning(response.message);
+ throw new NoSuchResourceException(response.message);
+ case ALREADY_BORROWED:
+ log.warning(response.message);
+ throw new AlreadyBorrowedException(response.message);
+ case INVALID_DURATION:
+ log.warning(response.message);
+ throw new InvalidDurationException(response.message);
+ default:
+ log.warning("Unsuccessful response from server: "+response.message);
+ throw new ServerCommunicationError("requestResource(): failed to borrow resource: "+response.message);
+ }
+ }
+
+ @Override
+ public Resource[] findResource(CoordinatorID cid, ResourceName rname) throws ServerCommunicationError {
+ log.info("Find Resource "+rname+" from "+cid);
+ FindResource.Request request = new FindResource.Request(cid, rname);
+ Collection<Resource> response = ConcurrentHashMap.newKeySet();
+ ExecutorService pool = Executors.newFixedThreadPool(servers.size());
+ try {
+ for (InetAddress server : servers.all()) {
+ pool.execute(new FindResource.Client(request, server, response));
+ }
+ } catch (IOException e) {
+ String msg = "Failed to start FindResource Client: "+e.getMessage();
+ log.severe(msg);
+ throw new ServerCommunicationError("findResource(): "+msg);
+ }
+ log.fine("Started worker threads");
+ pool.shutdown();
+ boolean terminated;
+ try {
+ terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ String msg = "findResource() interrupted: "+e.getMessage();
+ log.warning(msg);
+ throw new ServerCommunicationError("findResource(): "+msg);
+ }
+ if (!terminated) {
+ String msg = "Request timed out: no response after "+timeout.toString();
+ log.warning(msg);
+ throw new ServerCommunicationError("findResource(): "+msg);
+ }
+ Resource[] arr = new Resource[0];
+ arr = response.toArray(arr);
+ log.info("Find resource "+rname+" from "+cid+" - success. Response length: "+arr.length);
+ return arr;
+ }
+
+ @Override
+ public void returnResource(CoordinatorID cid, ResourceID rid)
+ throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException
+ {
+ log.info("Return resource "+rid+" from "+cid);
+ InetAddress server = servers.get(new City(rid.city));
+ if (server == null) {
+ String msg = "no connection to server "+rid.city;
+ log.warning(msg);
+ throw new ServerCommunicationError("returnResource(): "+msg);
+ }
+ log.fine("server address: "+server);
+
+ ReturnResource.Client client = new ReturnResource.Client(cid, rid);
+ ReturnResource.Response response;
+ try {
+ response = client.sendRequest(server);
+ } catch (IOException e) {
+ log.warning(e.getMessage());
+ throw new ServerCommunicationError("returnResource(): "+e.getMessage());
+ }
+ switch (response.status) {
+ case SUCCESS:
+ log.info(cid+" return "+rid+" - success");
+ break;
+ case NO_SUCH_RESOURCE:
+ log.warning(response.message);
+ throw new NoSuchResourceException(response.message);
+ case NOT_BORROWED:
+ log.warning(response.message);
+ throw new NotBorrowedException(response.message);
+ default:
+ String msg = "Failed to return resource: "+response.message;
+ log.warning(msg);
+ throw new ServerCommunicationError("returnResource(): "+msg);
+ }
+ }
+
+ @Override
+ public void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException {
+ log.info(cid+": swap "+oldRID+", "+newRID);
+
+ InetAddress server = servers.get(new City(oldRID.city));
+ if (server == null) {
+ String msg = "no connection to server "+oldRID.city;
+ log.warning(msg);
+ throw new ServerCommunicationError("swapResource(): "+msg);
+ }
+ log.fine("server address: "+server);
+
+ SwapResource.Client client = new SwapResource.Client(cid, oldRID, newRID);
+ SwapResource.Response response;
+ try {
+ response = client.sendRequest(server);
+ } catch (IOException e) {
+ throw new ServerCommunicationError("swapResource(): "+e.getMessage());
+ }
+ switch (response.status) {
+ case SUCCESS:
+ log.info(cid+": swap "+oldRID+", "+newRID+" - success");
+ break;
+ case NO_SUCH_RESOURCE:
+ throw new NoSuchResourceException(response.message);
+ default:
+ throw new ServerCommunicationError("swapResource(): "+response.message);
+ }
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/DermsLogger.java b/src/main/java/derms/replica/replica1/DermsLogger.java
new file mode 100644
index 0000000..9ff8249
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/DermsLogger.java
@@ -0,0 +1,23 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.util.logging.FileHandler;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
+import java.util.logging.SimpleFormatter;
+
+public class DermsLogger {
+ public static final String logFile = "server.log";
+
+ private static Logger log = null;
+
+ public static Logger getLogger(Class clazz) throws IOException {
+ if (log == null) {
+ log = Logger.getLogger(clazz.getName());
+ Handler fileHandler = new FileHandler(logFile);
+ fileHandler.setFormatter(new SimpleFormatter());
+ log.addHandler(fileHandler);
+ }
+ return log;
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/FindResource.java b/src/main/java/derms/replica/replica1/FindResource.java
new file mode 100644
index 0000000..7104c4c
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/FindResource.java
@@ -0,0 +1,164 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.List;
+import java.util.logging.Logger;
+
+public class FindResource {
+ public static final int port = 5558;
+
+ public static class Client implements Runnable {
+ private InetAddress serverAddr;
+ private Request request;
+ private Collection<Resource> response;
+ private Logger log;
+
+ public Client(Request request, InetAddress serverAddr, Collection<Resource> response) throws IOException {
+ this.serverAddr = serverAddr;
+ this.request = request;
+ this.response = response;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("Failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ log.severe("Failed to create request packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ sock.send(requestPkt);
+ } catch (Exception e) {
+ log.severe("Failed to send request: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ Resource[] resources;
+ try {
+ resources = ResourceTransfer.receive(sock);
+ } catch (IOException e) {
+ log.severe(e.getMessage());
+ return;
+ } finally {
+ sock.close();
+ }
+
+ for (Resource r : resources) {
+ response.add(r);
+ }
+ }
+ }
+
+ public static class Server implements Runnable {
+ private static final int bufsize = 4096;
+
+ private InetAddress localAddr;
+ private Resources resources;
+ private ExecutorService pool;
+ private Logger log;
+
+ public Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ this.pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket: "+e.getMessage());
+ return;
+ }
+
+ log.info("Running on "+localAddr.toString()+":"+port);
+
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.info("Got request");
+
+ Request request;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log));
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ public static class Request implements Serializable {
+ private CoordinatorID cid;
+ private ResourceName rname;
+
+ public Request(CoordinatorID cid, ResourceName rname) {
+ this.cid = cid;
+ this.rname = rname;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+ private Logger log;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ this.log = log;
+ }
+
+ @Override
+ public void run() {
+ List<Resource> borrowedResources = resources.borrowed(request.cid, request.rname);
+ log.info(""+borrowedResources.size()+" "+request.rname+" resources borrowed by "+request.cid);
+ try {
+ Resource[] arr = new Resource[0];
+ ResourceTransfer.send(borrowedResources.toArray(arr), client);
+ } catch (IOException e) {
+ log.severe("Failed to send response: "+e.getMessage());
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/Hosts.java b/src/main/java/derms/replica/replica1/Hosts.java
new file mode 100644
index 0000000..fed377e
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Hosts.java
@@ -0,0 +1,27 @@
+package derms.replica.replica1;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class Hosts {
+ private static Map<City, String> hosts = null;
+
+ public static String get(City city) throws UnknownHostException {
+ if (hosts == null)
+ init();
+
+ String host = hosts.get(city);
+ if (host == null)
+ throw new UnknownHostException("unknown host: "+city);
+ return host;
+ }
+
+ private static void init() {
+ hosts = new HashMap<City, String>();
+ hosts.put(new City("MTL"), "alpine1");
+ hosts.put(new City("QUE"), "alpine2");
+ hosts.put(new City("SHE"), "alpine3");
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ID.java b/src/main/java/derms/replica/replica1/ID.java
new file mode 100644
index 0000000..8d9f9aa
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ID.java
@@ -0,0 +1,5 @@
+package derms.replica.replica1;
+
+public class ID {
+ public static final int nDigits = 4;
+}
diff --git a/src/main/java/derms/replica/replica1/InvalidDurationException.java b/src/main/java/derms/replica/replica1/InvalidDurationException.java
new file mode 100644
index 0000000..ba3c75a
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/InvalidDurationException.java
@@ -0,0 +1,7 @@
+package derms.replica.replica1;
+
+public class InvalidDurationException extends Exception {
+ public InvalidDurationException (String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/NoSuchResourceException.java b/src/main/java/derms/replica/replica1/NoSuchResourceException.java
new file mode 100644
index 0000000..3979570
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/NoSuchResourceException.java
@@ -0,0 +1,7 @@
+package derms.replica.replica1;
+
+public class NoSuchResourceException extends Exception {
+ public NoSuchResourceException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/NotBorrowedException.java b/src/main/java/derms/replica/replica1/NotBorrowedException.java
new file mode 100644
index 0000000..5c0257a
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/NotBorrowedException.java
@@ -0,0 +1,7 @@
+package derms.replica.replica1;
+
+public class NotBorrowedException extends Exception {
+ public NotBorrowedException (String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ObjectPacket.java b/src/main/java/derms/replica/replica1/ObjectPacket.java
new file mode 100644
index 0000000..28010c0
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ObjectPacket.java
@@ -0,0 +1,38 @@
+package derms.replica.replica1;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.SocketAddress;
+
+public class ObjectPacket {
+ public static DatagramPacket create(Serializable obj, SocketAddress remoteAddr) throws IOException {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ObjectOutputStream objStream = new ObjectOutputStream(byteStream);
+ objStream.writeObject(obj);
+ objStream.flush();
+ byte[] buf = byteStream.toByteArray();
+ objStream.close();
+ return new DatagramPacket(buf, buf.length, remoteAddr);
+ }
+
+ public static <E> E deserialize(DatagramPacket pkt, Class<E> clazz) throws IOException {
+ ObjectInputStream objectStream;
+ try {
+ objectStream = new ObjectInputStream(
+ new ByteArrayInputStream(pkt.getData()));
+ } catch (Exception e) {
+ throw new IOException("failed to create input stream: "+e.getMessage());
+ }
+
+ try {
+ return clazz.cast(objectStream.readObject());
+ } catch (Exception e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/RequestResource.java b/src/main/java/derms/replica/replica1/RequestResource.java
new file mode 100644
index 0000000..f7fa4ab
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/RequestResource.java
@@ -0,0 +1,227 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
+import java.util.NoSuchElementException;
+
+public class RequestResource {
+ public static final int port = 5557;
+ public static final int bufsize = 4096;
+
+ public static class Client {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+ private int duration;
+
+ public Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) {
+ this.coordinatorID = coordinatorID;
+ this.resourceID = resourceID;
+ this.duration = duration;
+ }
+
+ public Response sendRequest(InetAddress serverAddr) throws IOException {
+ Request request = new Request(coordinatorID, resourceID, duration);
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage());
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ sock.close();
+ throw new IOException("Request Resource Client: failed to create request: "+e.getMessage());
+ }
+
+ sock.send(requestPkt);
+
+ byte[] buf = new byte[bufsize];
+ DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
+ try {
+ sock.receive(responsePkt);
+ } catch (Exception e) {
+ sock.close();
+ throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage());
+ }
+
+ try {
+ return ObjectPacket.deserialize(responsePkt, Response.class);
+ } catch (IOException e) {
+ throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ public static class Server implements Runnable {
+ private InetAddress localAddr;
+ private Resources resources;
+ private ExecutorService pool;
+ private Logger log;
+
+ public Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr.toString()
+ +": "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening on "+localAddr.toString()+":"+port);
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.info("Got request");
+
+ Request request = null;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log));
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ private static class Request implements Serializable {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+ private int duration;
+
+ private Request(CoordinatorID cid, ResourceID rid, int duration) {
+ this.coordinatorID = cid;
+ this.resourceID = rid;
+ this.duration = duration;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+ private Logger log;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ this.log = log;
+ }
+
+ @Override
+ public void run() {
+ Response response = borrow();
+
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("Failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket pkt;
+ try {
+ pkt = ObjectPacket.create(response, client);
+ } catch (IOException e) {
+ log.severe("Failed to create response packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+ try {
+ sock.send(pkt);
+ } catch (Exception e) {
+ log.severe("Failed to send response: "+e.getMessage());
+ }
+ sock.close();
+ }
+
+ private Response borrow() {
+ try {
+ Resource resource = resources.getByID(request.resourceID);
+ synchronized (resource) {
+ if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) {
+ return new Response(Response.Status.ALREADY_BORROWED,
+ request.coordinatorID+" cannot borrow "+request.resourceID
+ +"; already borrowed by "+resource.borrower);
+ } else if (request.duration <= 0) {
+ return new Response(Response.Status.INVALID_DURATION,
+ "duration "+request.duration+" less than 1");
+ } else if (request.duration > resource.duration) {
+ return new Response(Response.Status.INVALID_DURATION,
+ "cannot borrow "+resource.id+" for duration of "+request.duration
+ +"; only "+resource.duration+" remaining");
+ }
+
+ if (resource.borrower.equals(request.coordinatorID)) {
+ // Resource is already borrowed. Add to existing duration.
+ resource.borrowDuration += request.duration;
+ } else {
+ resource.borrowDuration = request.duration;
+ }
+ resource.borrower = request.coordinatorID;
+ resource.isBorrowed = true;
+ resource.duration -= request.duration;
+
+ return new Response(Response.Status.SUCCESS, request.coordinatorID
+ +" successfully borrowed "+request.resourceID);
+ }
+ } catch (NoSuchElementException e) {
+ return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID);
+ }
+ }
+ }
+
+ public static class Response implements Serializable {
+ public Status status;
+ public String message;
+
+ private Response(Status status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ private Response() {
+ this(Status.SUCCESS, "");
+ }
+
+ public enum Status {
+ SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION
+ }
+ }
+}
+
diff --git a/src/main/java/derms/replica/replica1/Resource.java b/src/main/java/derms/replica/replica1/Resource.java
new file mode 100644
index 0000000..4abe41b
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Resource.java
@@ -0,0 +1,39 @@
+package derms.replica.replica1;
+
+import java.io.Serializable;
+
+public class Resource implements Serializable {
+ public ResourceID id;
+ public ResourceName name;
+ public int duration;
+ public boolean isBorrowed;
+ public CoordinatorID borrower;
+ public int borrowDuration;
+
+ public Resource(ResourceID id, ResourceName name, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) {
+ this.id = id;
+ this.name = name;
+ this.duration = duration;
+ this.isBorrowed = isBorrowed;
+ this.borrower = borrower;
+ this.borrowDuration = borrowDuration;
+ }
+
+ public Resource(ResourceID id, ResourceName name, int duration) {
+ this(id, name, duration, false, new CoordinatorID(), -1);
+ }
+
+ public Resource() {
+ this(new ResourceID(), ResourceName.AMBULANCE, 0);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return id+" "+duration;
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ResourceAvailability.java b/src/main/java/derms/replica/replica1/ResourceAvailability.java
new file mode 100644
index 0000000..a6083ab
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResourceAvailability.java
@@ -0,0 +1,128 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.logging.Logger;
+
+public class ResourceAvailability {
+ public static final int port = 5556;
+
+ public static class Client implements Runnable {
+ private InetAddress serverAddr;
+ private ResourceName request;
+ private Collection<Resource> resources;
+ private Logger log;
+
+ public Client(InetAddress serverAddr, ResourceName request, Collection<Resource> response) throws IOException {
+ this.serverAddr = serverAddr;
+ this.request = request;
+ this.resources = response;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("Error binding socket: "+e.getMessage());
+ return;
+ }
+ log.fine("Created socket");
+
+ DatagramPacket reqPkt;
+ try {
+ reqPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ log.severe("Error creating request: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ sock.send(reqPkt);
+ } catch (IOException e) {
+ log.severe("Error sending request: "+e.getMessage());
+ sock.close();
+ return;
+ }
+ log.fine("Sent request");
+
+ Resource[] response;
+ try {
+ response = ResourceTransfer.receive(sock);
+ } catch (IOException e) {
+ log.severe(e.getMessage());
+ return;
+ } finally {
+ sock.close();
+ }
+
+ for (Resource resource : response) {
+ resources.add(resource);
+ }
+ }
+ }
+
+ public static class Server implements Runnable {
+ public static final int bufsize = 1024;
+
+ private InetAddress localAddr;
+ private Resources resources;
+ private Logger log;
+
+ public Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr.toString()+": "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening on "+localAddr.toString()+":"+port);
+
+ DatagramPacket request = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(request);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+
+ ResourceName requestedName = null;
+ try {
+ requestedName = ObjectPacket.deserialize(request, ResourceName.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+ log.info("Got request: "+requestedName);
+
+ Resource[] response = resources.getByName(requestedName);
+ try {
+ ResourceTransfer.send(response, request.getSocketAddress());
+ } catch (IOException e) {
+ log.warning("Error transfering resources: "+e.getMessage());
+ }
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ResourceID.java b/src/main/java/derms/replica/replica1/ResourceID.java
new file mode 100644
index 0000000..008b766
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResourceID.java
@@ -0,0 +1,49 @@
+package derms.replica.replica1;
+
+import java.io.Serializable;
+
+public class ResourceID implements Serializable {
+ public String city;
+ public short num;
+
+ public ResourceID (String city, short num) {
+ this.city = city;
+ this.num = num;
+ }
+
+ public ResourceID() {
+ this("XXX", (short) 1111);
+ }
+
+ public static ResourceID parse(String s) throws IllegalArgumentException {
+ if (s.length() != City.codeLen+ID.nDigits) {
+ throw new IllegalArgumentException("invalid resource ID: "+s);
+ }
+ try {
+ String cityCode = s.substring(0, City.codeLen);
+ short num = Short.parseShort(s.substring(City.codeLen));
+ return new ResourceID(cityCode, num);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("invalid resource ID: "+e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != this.getClass()) {
+ return false;
+ }
+ ResourceID other = (ResourceID) obj;
+ return (this.city.equals(other.city)) && (this.num == other.num);
+ }
+
+ @Override
+ public int hashCode() {
+ return city.hashCode() * num;
+ }
+
+ @Override
+ public String toString() {
+ return city+num;
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ResourceName.java b/src/main/java/derms/replica/replica1/ResourceName.java
new file mode 100644
index 0000000..de4bc16
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResourceName.java
@@ -0,0 +1,18 @@
+package derms.replica.replica1;
+
+import java.io.Serializable;
+
+public enum ResourceName implements Serializable {
+ AMBULANCE,
+ FIRETRUCK,
+ PERSONNEL;
+
+ public static ResourceName parse(String s) {
+ switch (s) {
+ case "AMBULANCE": return ResourceName.AMBULANCE;
+ case "FIRETRUCK": return ResourceName.FIRETRUCK;
+ case "PERSONNEL": return ResourceName.PERSONNEL;
+ }
+ throw new IllegalArgumentException("invalid resource name: "+s);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ResourceTransfer.java b/src/main/java/derms/replica/replica1/ResourceTransfer.java
new file mode 100644
index 0000000..fbc695c
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResourceTransfer.java
@@ -0,0 +1,50 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ResourceTransfer {
+ public static final int bufsize = 1024;
+
+ public static void send(Resource[] resources, SocketAddress remoteAddr) throws IOException {
+ DatagramSocket sock = new DatagramSocket();
+
+ for (Resource resource : resources) {
+ DatagramPacket pkt = ObjectPacket.create(resource, remoteAddr);
+ sock.send(pkt);
+ }
+
+ DatagramPacket pkt = ObjectPacket.create(new EndOfTransmission(), remoteAddr);
+ sock.send(pkt);
+ sock.close();
+ }
+
+ public static Resource[] receive(DatagramSocket sock) throws IOException {
+ List<Resource> resources = new ArrayList<Resource>();
+ byte[] buf = new byte[bufsize];
+ DatagramPacket response = new DatagramPacket(buf, buf.length);
+
+ for (;;) {
+ sock.receive(response);
+
+ Object obj = ObjectPacket.deserialize(response, Object.class);
+ if (obj.getClass() == EndOfTransmission.class) {
+ break;
+ }
+ try {
+ resources.add((Resource) obj);
+ } catch (Exception e) {
+ throw new IOException("expected Resource; got "+obj.getClass().toString());
+ }
+ }
+ Resource[] arr = new Resource[0];
+ return resources.toArray(arr);
+ }
+
+ private static class EndOfTransmission implements Serializable {}
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/Resources.java b/src/main/java/derms/replica/replica1/Resources.java
new file mode 100644
index 0000000..0eb8bf2
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Resources.java
@@ -0,0 +1,74 @@
+package derms.replica.replica1;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class Resources {
+ private Map<ResourceName, Map<ResourceID, Resource>> resources;
+
+ public Resources() {
+ this.resources = new ConcurrentHashMap<ResourceName, Map<ResourceID, Resource>>();
+ }
+
+ public List<Resource> borrowed(CoordinatorID borrower, ResourceName name) {
+ List<Resource> borrowed = new ArrayList<Resource>();
+ Resource[] namedResources = getByName(name);
+ for (Resource r : namedResources) {
+ if (r.isBorrowed && r.borrower.equals(borrower)) {
+ borrowed.add(r);
+ }
+ }
+ return borrowed;
+ }
+
+ public Resource getByID(ResourceID id) throws NoSuchElementException {
+ for (Map<ResourceID, Resource> rids : resources.values()) {
+ Resource resource = rids.get(id);
+ if (resource != null) {
+ return resource;
+ }
+ }
+ throw new NoSuchElementException("No such resource "+id);
+ }
+
+ public Resource[] getByName(ResourceName name) {
+ Map<ResourceID, Resource> rids = resources.get(name);
+ if (rids == null) {
+ return new Resource[0];
+ }
+ Resource[] r = new Resource[0];
+ return rids.values().toArray(r);
+ }
+
+ public void add(Resource r) {
+ Map<ResourceID, Resource> rids;
+ synchronized (resources) {
+ rids = resources.get(r.name);
+ if (rids == null) {
+ rids = new ConcurrentHashMap<ResourceID, Resource>();
+ resources.put(r.name, rids);
+ }
+ }
+ synchronized (rids) {
+ Resource existing = rids.get(r.id);
+ if (existing != null) {
+ existing.duration += r.duration;
+ } else {
+ rids.put(r.id, r);
+ }
+ }
+ }
+
+ public void removeByID(ResourceID id) throws NoSuchElementException {
+ for (Map<ResourceID, Resource> rids : resources.values()) {
+ if (rids.containsKey(id)) {
+ rids.remove(id);
+ return;
+ }
+ }
+ throw new NoSuchElementException("No such resource "+id);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/Responder.java b/src/main/java/derms/replica/replica1/Responder.java
new file mode 100644
index 0000000..5fb7359
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Responder.java
@@ -0,0 +1,16 @@
+package derms.replica.replica1;
+
+import javax.jws.WebService;
+import javax.jws.soap.SOAPBinding;
+
+@WebService
+@SOAPBinding(style = SOAPBinding.Style.RPC)
+public interface Responder {
+ void addResource(Resource r);
+
+ void removeResource(ResourceID rid, int duration)
+ throws NoSuchResourceException;
+
+ Resource[] listResourceAvailability(ResourceName rname)
+ throws ServerCommunicationError;
+}
diff --git a/src/main/java/derms/replica/replica1/ResponderClient.java b/src/main/java/derms/replica/replica1/ResponderClient.java
new file mode 100644
index 0000000..b21b67e
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResponderClient.java
@@ -0,0 +1,34 @@
+package derms.replica.replica1;
+
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ResponderClient extends Client<Responder, ResponderServer> {
+ public ResponderID id;
+
+ public ResponderClient(ResponderID id) {
+ super(Responder.class, ResponderServer.class);
+ this.id = id;
+ }
+
+ public ResponderClient(City city, short idNum) {
+ this(new ResponderID(city, idNum));
+ }
+
+ public void add(ResourceID rid, ResourceName name, int duration) throws UnknownHostException, MalformedURLException {
+ Responder server = connect(new City(rid.city));
+ server.addResource(new Resource(rid, name, duration));
+ }
+
+ public void remove(ResourceID rid, int duration) throws UnknownHostException, MalformedURLException, NoSuchResourceException {
+ Responder server = connect(new City(rid.city));
+ server.removeResource(rid, duration);
+ }
+
+ public Resource[] listResources(ResourceName name) throws UnknownHostException, MalformedURLException, ServerCommunicationError {
+ Responder server = connect(id.city);
+ return server.listResourceAvailability(name);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ResponderClientCLI.java b/src/main/java/derms/replica/replica1/ResponderClientCLI.java
new file mode 100644
index 0000000..5bfbb11
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResponderClientCLI.java
@@ -0,0 +1,151 @@
+package derms.replica.replica1;
+
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+public class ResponderClientCLI extends CLI {
+ public static final String usage = "Usage: java ResponderClientCLI <city> <id number>";
+
+ private final ResponderClient client;
+
+ private ResponderClientCLI(City city, short idNum) {
+ client = new ResponderClient(city, idNum);
+ System.out.println("ID: "+client.id);
+
+ commands.put("add", new Add());
+ cmdDescriptions.add(new Description(
+ "add <resource ID> <resource name> <duration>",
+ "Add a resource to the server"));
+
+ commands.put("remove", new Remove());
+ cmdDescriptions.add(new Description(
+ "remove <resource ID> <duration>",
+ "Decrease the duration of a resource. If duration is negative, the tresource is removed entirely."));
+
+ commands.put("list", new List());
+ cmdDescriptions.add(new Description(
+ "list <resource name>",
+ "List available resources."));
+
+ argDescriptions.add(new Description(
+ "<resource ID>",
+ "3-letter city code followed by 4-digit number."));
+ argDescriptions.add(new Description(
+ "<resource name>",
+ "E.g., AMBULANCE."));
+ argDescriptions.add(new Description(
+ "<duration>",
+ "A number representing a time period."));
+ }
+
+ public static void main(String[] cmdlineArgs) {
+ Args args = null;
+ try {
+ args = new Args(cmdlineArgs);
+ } catch (IllegalArgumentException e) {
+ System.err.println(e);
+ System.err.println(usage);
+ System.exit(1);
+ }
+
+ (new ResponderClientCLI(args.city, args.idNum)).run();
+ }
+
+ private class Add implements Command {
+ public void exec(String[] args) {
+ if (args.length < 3) {
+ System.out.println("invalid arguments for 'add'");
+ } else {
+ add(args[0], args[1], args[2]);
+ }
+ }
+ }
+
+ private void add(String resourceIDStr, String resourceNameStr, String durationStr) {
+ ResourceID rid = ResourceID.parse(resourceIDStr);
+ ResourceName name = ResourceName.parse(resourceNameStr);
+ int duration = Integer.parseInt(durationStr);
+ if (duration < 0) {
+ throw new NumberFormatException("duration less than 0");
+ }
+ try {
+ client.add(rid, name, duration);
+ System.out.println("Successfully added resource to server.");
+ } catch (Exception e) {
+ System.out.println("Failed to add resource: "+e.getMessage());
+ }
+ }
+
+ private class Remove implements Command {
+ public void exec(String[] args) {
+ if (args.length < 2) {
+ System.out.println("invalid arguments for 'remove'");
+ } else {
+ remove(args[0], args[1]);
+ }
+ }
+ }
+
+ private void remove(String resourceIDStr, String durationStr) {
+ try {
+ ResourceID resourceID = ResourceID.parse(resourceIDStr);
+ int duration = Integer.parseInt(durationStr);
+ client.remove(resourceID, duration);
+ System.out.println("Successfully removed resource from server.");
+ } catch (NumberFormatException e) {
+ System.out.println("invalid duration: "+durationStr);
+ } catch (IllegalArgumentException e) {
+ System.out.println(e.getMessage());
+ } catch (Exception e) {
+ System.out.println("Failed to remove resource: "+e.getMessage());
+ }
+ }
+
+ private class List implements Command {
+ public void exec(String[] args) {
+ if (args.length < 1) {
+ System.out.println("invalid arguments for 'list'");
+ } else {
+ list(args[0]);
+ }
+ }
+ }
+
+ private void list(String resourceNameStr) {
+ try {
+ ResourceName name = ResourceName.parse(resourceNameStr);
+ Resource[] resources = client.listResources(name);
+ System.out.println("Available resources:");
+ for (Resource resource : resources) {
+ System.out.println(resource.toString());
+ }
+ } catch (IllegalArgumentException e) {
+ System.out.println("invalid resource name: " + resourceNameStr);
+ } catch (UnknownHostException | MalformedURLException e) {
+ System.err.println(e.getMessage());
+ } catch (ServerCommunicationError e) {
+ System.err.println("Failed to retrieve resources from server: "+e.getMessage());
+ }
+ }
+
+ private static class Args {
+ private final City city;
+ private final short idNum;
+
+ private Args(String[] args) throws IllegalArgumentException {
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Missing argument 'city'");
+ }
+ city = new City(args[0]);
+
+ if (args.length < 2) {
+ throw new IllegalArgumentException("Missing argument 'id number'");
+ }
+ try {
+ idNum = Short.parseShort(args[1]);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Bad value of 'id number'");
+ }
+ }
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ResponderID.java b/src/main/java/derms/replica/replica1/ResponderID.java
new file mode 100644
index 0000000..0ccda8a
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResponderID.java
@@ -0,0 +1,16 @@
+package derms.replica.replica1;
+
+public class ResponderID {
+ public City city;
+ public short num;
+
+ public ResponderID(City city, short num) {
+ this.city = city;
+ this.num = num;
+ }
+
+ @Override
+ public String toString() {
+ return ""+city+"R"+num;
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ResponderServer.java b/src/main/java/derms/replica/replica1/ResponderServer.java
new file mode 100644
index 0000000..d2406fb
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ResponderServer.java
@@ -0,0 +1,99 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import java.util.NoSuchElementException;
+import javax.jws.WebService;
+
+@WebService(endpointInterface = "xyz.samanthony.derms.Responder")
+public class ResponderServer implements Responder {
+ public static final Duration timeout = Duration.ofSeconds(5);
+
+ private City city;
+ private Resources resources;
+ private Servers servers;
+ private Logger log;
+
+ public ResponderServer(City city, Resources resources, Servers servers) throws IOException {
+ this.city = city;
+ this.resources = resources;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ public ResponderServer() throws IOException {
+ this(new City(), new Resources(), new Servers());
+ }
+
+ @Override
+ public void addResource(Resource r) {
+ resources.add(r);
+ log.info("Added resource "+r+" - success");
+ }
+
+ @Override
+ public void removeResource(ResourceID rid, int duration) throws NoSuchResourceException {
+ log.info("Remove duration "+duration+" from "+rid);
+ try {
+ Resource resource = resources.getByID(rid);
+ synchronized (resource) {
+ if (duration < 0 || duration >= resource.duration) {
+ resources.removeByID(rid);
+ log.info("Removed resource "+rid+" - success");
+ } else {
+ resource.duration -= duration;
+ log.info("Removed duration from resource "+rid+". New duration: "+resource.duration+" - success");
+ }
+ }
+ } catch (NoSuchElementException e) {
+ // Wanted to remove duration from resource.
+ if (duration >= 0) {
+ String msg = "Cannot remove duration from "+rid+": resource does not exist.";
+ log.warning(msg);
+ throw new NoSuchResourceException(msg);
+ }
+
+ // Duration is negative---wanted to remove resource completely.
+ // Success because it is already removed.
+ log.info("Not removing "+rid+": resource does not exist.");
+ }
+ }
+
+ @Override
+ public Resource[] listResourceAvailability(ResourceName rname) throws ServerCommunicationError {
+ log.info("Request for available "+rname);
+ Collection<Resource> availableResources = ConcurrentHashMap.newKeySet();
+ ExecutorService pool = Executors.newFixedThreadPool(servers.size());
+ try {
+ for (InetAddress serverAddr : servers.all()) {
+ pool.execute(new ResourceAvailability.Client(serverAddr, rname, availableResources));
+ }
+ } catch (IOException e) {
+ String msg = "Failed to start ResourceAvailability Client: "+e.getMessage();
+ log.severe(msg);
+ throw new ServerCommunicationError("ResourceAvailability: "+msg);
+ }
+
+ log.fine("Started worker threads");
+ pool.shutdown();
+ boolean terminated;
+ try {
+ terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new ServerCommunicationError("listResourceAvailability(): listResourceAvailability() interrupted: "+e.getMessage());
+ }
+ if (!terminated) {
+ throw new ServerCommunicationError("ResourceAvailability: request timed out: no response after "+timeout.toString());
+ }
+ log.info("Response length "+availableResources.size());
+ Resource[] arr = new Resource[0];
+ return availableResources.toArray(arr);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ReturnResource.java b/src/main/java/derms/replica/replica1/ReturnResource.java
new file mode 100644
index 0000000..a157631
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ReturnResource.java
@@ -0,0 +1,199 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
+import java.util.NoSuchElementException;
+
+public class ReturnResource {
+ public static final int port = 5559;
+ public static final int bufsize = 4096;
+
+ public static class Client {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+
+ public Client(CoordinatorID cid, ResourceID rid) {
+ this.coordinatorID = cid;
+ this.resourceID = rid;
+ }
+
+ public Response sendRequest(InetAddress serverAddr) throws IOException {
+ Request request = new Request(coordinatorID, resourceID);
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ throw new IOException("Return Resource Client: failed to open socket: "+e.getMessage());
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ sock.close();
+ throw new IOException("Return Resource Client: failed to create request: "+e.getMessage());
+ }
+
+ sock.send(requestPkt);
+
+ byte[] buf = new byte[bufsize];
+ DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
+ try {
+ sock.receive(responsePkt);
+ } catch (Exception e) {
+ sock.close();
+ throw new IOException("Return Resource Client: error receiving from server: "+e.getMessage());
+ }
+
+ try {
+ return ObjectPacket.deserialize(responsePkt, Response.class);
+ } catch (IOException e) {
+ throw new IOException("Return Resource Client: failed to deserialize response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ public static class Server implements Runnable {
+ private InetAddress localAddr;
+ private Resources resources;
+ private ExecutorService pool;
+ private Logger log;
+
+ public Server(InetAddress localAddr, Resources resources) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr.toString()
+ +": "+e.getMessage());
+ return;
+ }
+
+ log.info("Listening on "+localAddr.toString()+":"+port);
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.fine("Got request");
+
+ Request request = null;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources));
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ private static class Request implements Serializable {
+ private CoordinatorID coordinatorID;
+ private ResourceID resourceID;
+
+ private Request(CoordinatorID cid, ResourceID rid) {
+ this.coordinatorID = cid;
+ this.resourceID = rid;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources) {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ }
+
+ @Override
+ public void run() {
+ Response response = returnResource();
+
+ DatagramSocket sock;
+ try {
+ sock= new DatagramSocket();
+ } catch (Exception e) {
+ System.err.println("Request Resource Server: failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket pkt;
+ try {
+ pkt = ObjectPacket.create(response, client);
+ } catch (IOException e) {
+ System.err.println("Request Resource Server: failed to create response packet: "+e.getMessage());
+ sock.close();
+ return;
+ }
+ try {
+ sock.send(pkt);
+ } catch (Exception e) {
+ System.err.println("Request Resource Server: failed to send response: "+e.getMessage());
+ }
+ sock.close();
+ }
+
+ private Response returnResource() {
+ try {
+ Resource resource = resources.getByID(request.resourceID);
+ synchronized (resource) {
+ if (!resource.isBorrowed || !resource.borrower.equals(request.coordinatorID)) {
+ return new Response(Response.Status.NOT_BORROWED,
+ request.resourceID+" is not borrowed by "+request.coordinatorID);
+ }
+ resource.isBorrowed = false;
+ resource.borrower = new CoordinatorID();
+ resource.borrowDuration = -1;
+ return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id);
+ }
+ } catch (NoSuchElementException e) {
+ return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID);
+ }
+ }
+ }
+
+ public static class Response implements Serializable {
+ public Status status;
+ public String message;
+
+ private Response(Status status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ public enum Status {
+ SUCCESS, NO_SUCH_RESOURCE, NOT_BORROWED
+ }
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/ServerCommunicationError.java b/src/main/java/derms/replica/replica1/ServerCommunicationError.java
new file mode 100644
index 0000000..0f0586b
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/ServerCommunicationError.java
@@ -0,0 +1,7 @@
+package derms.replica.replica1;
+
+public class ServerCommunicationError extends Exception {
+ public ServerCommunicationError(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/Servers.java b/src/main/java/derms/replica/replica1/Servers.java
new file mode 100644
index 0000000..dc084d7
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/Servers.java
@@ -0,0 +1,34 @@
+package derms.replica.replica1;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+
+public class Servers {
+ private Map<City, InetAddress> servers = new ConcurrentHashMap<City, InetAddress>();
+
+ /** Returns the address of the server located in the specified city, or null if there is no server in the city. */
+ public InetAddress get(City city) {
+ return servers.get(city);
+ }
+
+ /**
+ * Associates the specified server address with the specified city.
+ * If there was already a server associated with the city, the old value is replaced.
+ * @param city the city where the server is located
+ * @param addr the address of the server
+ * @return the previous server address, or null if there was no server associated with this city.
+ */
+ public InetAddress put(City city, InetAddress addr) {
+ return servers.put(city, addr);
+ }
+
+ public Collection<InetAddress> all() {
+ return servers.values();
+ }
+
+ public int size() {
+ return servers.size();
+ }
+}
diff --git a/src/main/java/derms/replica/replica1/StationServer.java b/src/main/java/derms/replica/replica1/StationServer.java
new file mode 100644
index 0000000..3187ecb
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/StationServer.java
@@ -0,0 +1,146 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
+import javax.xml.ws.Endpoint;
+
+public class StationServer implements Runnable {
+ public static final String usage = "Usage: java StationServer <city> <local address>";
+ public static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555);
+ public static final int publishPort = 8080;
+
+ public City city;
+ public InetAddress localAddr;
+ public Resources resources;
+ public Servers servers;
+ private Logger log;
+ private ResponderServer responderServer;
+ private CoordinatorServer coordinatorServer;
+
+ public StationServer(City city, InetAddress localAddr) throws IOException {
+ this.city = city;
+ this.localAddr = localAddr;
+ this.resources = new Resources();
+ this.servers = new Servers();
+ this.log = DermsLogger.getLogger(getClass());
+
+ try {
+ this.responderServer = new ResponderServer(city, resources, servers);
+ } catch (IOException e) {
+ throw new IOException("Failed to create ResponderServer: "+e.getMessage());
+ }
+ log.info("Created ResponderServer");
+
+ try {
+ this.coordinatorServer = new CoordinatorServer(city, resources, servers);
+ } catch (IOException e) {
+ throw new IOException("Failed to create CoordinatorServer: "+e.getMessage());
+ }
+ log.info("Created CoordinatorServer");
+ }
+
+ public static void main(String cmdlineArgs[]) {
+ Args args = null;
+ try {
+ args = new Args(cmdlineArgs);
+ } catch (IllegalArgumentException e) {
+ System.err.println(e.getMessage());
+ System.out.println(usage);
+ System.exit(1);
+ }
+
+ try {
+ (new StationServer(args.city, args.localAddr)).run();
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ System.exit(1);
+ }
+ }
+
+ @Override
+ public void run() {
+ log.info("Running");
+ log.config("Local address is "+localAddr.toString());
+
+ ExecutorService pool = Executors.newCachedThreadPool();
+
+ try {
+ pool.execute(new ResourceAvailability.Server(localAddr, resources));
+ } catch (IOException e) {
+ String msg = "Failed to start ResourceAvailability Server: "+e.getMessage();
+ log.severe(msg);
+ return;
+ }
+ try {
+ pool.execute(new RequestResource.Server(localAddr, resources));
+ } catch (IOException e) {
+ log.severe("Failed to start RequestResource Server: "+e.getMessage());
+ return;
+ }
+ try {
+ pool.execute(new FindResource.Server(localAddr, resources));
+ } catch (IOException e) {
+ log.severe("Failed to start FindResource Server: "+e.getMessage());
+ return;
+ }
+ try {
+ pool.execute(new ReturnResource.Server(localAddr, resources));
+ } catch (IOException e) {
+ log.severe("Failed to start ReturnResource Server: "+e.getMessage());
+ return;
+ }
+ try {
+ pool.execute(new SwapResource.Server(localAddr, resources, servers));
+ } catch (IOException e) {
+ log.severe("Failed to start SwapResource Server: "+e.getMessage());
+ return;
+ }
+
+ String url = "http://"+localAddr.getHostAddress()+":"+publishPort+"/"+Responder.class.getSimpleName();
+ log.info("Publishing Responder to '"+url+"'...");
+ Endpoint.publish(url, responderServer);
+
+ url = "http://"+localAddr.getHostAddress()+":"+publishPort+"/"+Coordinator.class.getSimpleName();
+ log.info("Publishing Coordinator to '"+url+"'...");
+ Endpoint.publish(url, coordinatorServer);
+
+ try {
+ pool.execute(new Announcer(announceGroup, localAddr, city));
+ } catch (IOException e) {
+ log.severe("Failed to start Announcer: "+e.getMessage());
+ return;
+ }
+ try {
+ pool.execute(new AnnounceListener(announceGroup, localAddr, servers));
+ } catch (IOException e) {
+ log.severe("Failed to start AnnounceListener: "+e.getMessage());
+ return;
+ }
+ }
+
+ private static class Args {
+ private City city;
+ private InetAddress localAddr;
+
+ private Args(String[] args) throws IllegalArgumentException {
+ if (args.length < 1) {
+ throw new IllegalArgumentException("Missing argument 'city'");
+ }
+ city = new City(args[0]);
+
+ if (args.length < 2) {
+ throw new IllegalArgumentException("Missing argument 'local host'");
+ }
+ try {
+ localAddr = InetAddress.getByName(args[1]);
+ } catch (UnknownHostException | SecurityException e) {
+ throw new IllegalArgumentException("Bad value of 'local host': "+e.getMessage());
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/SwapResource.java b/src/main/java/derms/replica/replica1/SwapResource.java
new file mode 100644
index 0000000..34d5b66
--- /dev/null
+++ b/src/main/java/derms/replica/replica1/SwapResource.java
@@ -0,0 +1,267 @@
+package derms.replica.replica1;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
+import java.util.NoSuchElementException;
+
+public class SwapResource {
+ public static final int port = 5560;
+ public static final int bufSize = 4096;
+
+ public static class Client {
+ private CoordinatorID cid;
+ private ResourceID oldRID;
+ private ResourceID newRID;
+
+ public Client(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) {
+ this.cid = cid;
+ this.oldRID = oldRID;
+ this.newRID = newRID;
+ }
+
+ public Response sendRequest(InetAddress serverAddr) throws IOException {
+ Request request = new Request(cid, oldRID, newRID);
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ throw new IOException("Swap Resource Client: failed to open socket: "+e.getMessage());
+ }
+
+ DatagramPacket requestPkt;
+ try {
+ requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
+ } catch (IOException e) {
+ sock.close();
+ throw new IOException("Swap Resource Client: failed to create request: "+e.getMessage());
+ }
+
+ sock.send(requestPkt);
+
+ byte[] buf = new byte[bufSize];
+ DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
+ try {
+ sock.receive(responsePkt);
+ } catch (Exception e) {
+ sock.close();
+ throw new IOException("Swap Resource Client: error receiving from server: "+e.getMessage());
+ }
+
+ try {
+ return ObjectPacket.deserialize(responsePkt, Response.class);
+ } catch (IOException e) {
+ throw new IOException("Swap Resource Client: failed to deserialize response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ public static class Server implements Runnable {
+ private InetAddress localAddr;
+ private Resources resources;
+ private Servers servers;
+ private ExecutorService pool;
+ private Logger log;
+
+ public Server(InetAddress localAddr, Resources resources, Servers servers) throws IOException {
+ this.localAddr = localAddr;
+ this.resources = resources;
+ this.servers = servers;
+ pool = Executors.newWorkStealingPool();
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ DatagramSocket sock = null;
+ try {
+ sock = new DatagramSocket(port, localAddr);
+ } catch (Exception e) {
+ log.severe("Failed to bind socket to "+localAddr+": "+e.getMessage());
+ return;
+ }
+ log.info("Listening on "+localAddr+":"+port);
+
+ DatagramPacket requestPkt = new DatagramPacket(new byte[bufSize], bufSize);
+
+ try {
+ for (;;) {
+ try {
+ sock.receive(requestPkt);
+ } catch (Exception e) {
+ log.warning("Error receiving from socket: "+e.getMessage());
+ continue;
+ }
+ log.fine("Got request");
+
+ Request request = null;
+ try {
+ request = ObjectPacket.deserialize(requestPkt, Request.class);
+ } catch (IOException e) {
+ log.warning("Failed to deserialize request: "+e.getMessage());
+ continue;
+ }
+
+ SocketAddress client = requestPkt.getSocketAddress();
+ try {
+ RequestHandler handler = new RequestHandler(request, client, resources, servers);
+ pool.execute(handler);
+ } catch (IOException e) {
+ log.warning("Failed to create request handler: "+e.getMessage());
+ continue;
+ }
+ }
+ } finally {
+ sock.close();
+ }
+ }
+ }
+
+ private static class Request implements Serializable {
+ private CoordinatorID cid;
+ private ResourceID oldRID;
+ private ResourceID newRID;
+
+ private Request(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) {
+ this.cid = cid;
+ this.oldRID = oldRID;
+ this.newRID = newRID;
+ }
+ }
+
+ private static class RequestHandler implements Runnable {
+ private Request request;
+ private SocketAddress client;
+ private Resources resources;
+ private Servers servers;
+ private Logger log;
+
+ private RequestHandler(Request request, SocketAddress client, Resources resources, Servers servers) throws IOException {
+ this.request = request;
+ this.client = client;
+ this.resources = resources;
+ this.servers = servers;
+ this.log = DermsLogger.getLogger(this.getClass());
+ }
+
+ @Override
+ public void run() {
+ Response response = swapResources();
+
+ DatagramSocket sock;
+ try {
+ sock = new DatagramSocket();
+ } catch (Exception e) {
+ log.severe("failed to open socket: "+e.getMessage());
+ return;
+ }
+
+ DatagramPacket pkt;
+ try {
+ pkt = ObjectPacket.create(response, client);
+ } catch (IOException e) {
+ log.severe("failed to create response: "+e.getMessage());
+ sock.close();
+ return;
+ }
+
+ try {
+ sock.send(pkt);
+ } catch (Exception e) {
+ log.severe("failed to send response: "+e.getMessage());
+ } finally {
+ sock.close();
+ }
+ }
+
+ private Response swapResources() {
+ try {
+ Resource resource = resources.getByID(request.oldRID);
+ synchronized (resource) {
+ if (!resource.borrower.equals(request.cid)) {
+ return new Response(Response.Status.NOT_BORROWED, "resource "+request.oldRID+" not borrowed by "+request.cid);
+ }
+ try {
+ acquireNewResource(resource.borrowDuration);
+ returnOldResource(resource);
+ return new Response(Response.Status.SUCCESS, request.cid+" success fully swapped "+request.oldRID+" for "+request.newRID);
+ } catch (UnknownHostException e) {
+ return new Response(Response.Status.UNKNOWN_HOST, e.getMessage());
+ } catch (IOException e) {
+ return new Response(Response.Status.FAILURE, e.getMessage());
+ } catch (CannotBorrow e) {
+ return new Response(Response.Status.CANNOT_BORROW, e.getMessage());
+ }
+ }
+ } catch (NoSuchElementException e) {
+ return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.oldRID);
+ }
+ }
+
+ private void acquireNewResource(int borrowDuration) throws UnknownHostException, IOException, CannotBorrow {
+ RequestResource.Client requestClient = new RequestResource.Client(request.cid, request.newRID, borrowDuration);
+ City city = new City(request.newRID.city);
+ InetAddress requestResourceServer = servers.get(city);
+ if (requestResourceServer == null) {
+ throw new UnknownHostException(city.toString());
+ }
+ RequestResource.Response response = requestClient.sendRequest(requestResourceServer);
+ // TODO: make exception handling more granular---pass through status from Request.
+ if (response.status != RequestResource.Response.Status.SUCCESS) {
+ throw new CannotBorrow(request.cid, request.newRID, response.message);
+ }
+ }
+
+ private void returnOldResource(Resource r) {
+ r.isBorrowed = false;
+ r.borrower = new CoordinatorID();
+ r.borrowDuration = -1;
+ }
+ }
+
+ public static class Response implements Serializable {
+ public Status status;
+ public String message;
+
+ private Response(Status status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+
+ public enum Status {
+ SUCCESS,
+ FAILURE,
+ NO_SUCH_RESOURCE,
+ NOT_BORROWED,
+ CANNOT_BORROW,
+ UNKNOWN_HOST
+ }
+ }
+
+ private static class CannotBorrow extends Exception {
+ CoordinatorID attemptedBorrower;
+ ResourceID rid;
+ String message;
+
+ private CannotBorrow(CoordinatorID attemptedBorrower, ResourceID rid, String message) {
+ this.attemptedBorrower = attemptedBorrower;
+ this.rid = rid;
+ this.message = message;
+ }
+
+ @Override
+ public String getMessage() {
+ return attemptedBorrower+" failed to borrow "+rid+": "+message;
+ }
+ }
+} \ No newline at end of file