From 6a710c3943f2350f4575f8cb1898129ef3c7dfdd Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Thu, 28 Nov 2024 10:42:31 -0500 Subject: rename assignment code replica package --- .../replica/replica1/AlreadyBorrowedException.java | 7 - .../derms/replica/replica1/AnnounceListener.java | 81 ------- .../java/derms/replica/replica1/Announcer.java | 69 ------ src/main/java/derms/replica/replica1/City.java | 38 --- .../java/derms/replica/replica1/CoordinatorID.java | 34 --- .../derms/replica/replica1/CoordinatorServer.java | 170 ------------- .../java/derms/replica/replica1/DermsLogger.java | 23 -- .../java/derms/replica/replica1/FindResource.java | 160 ------------- src/main/java/derms/replica/replica1/Hosts.java | 26 -- src/main/java/derms/replica/replica1/ID.java | 5 - .../replica/replica1/InvalidDurationException.java | 7 - .../replica/replica1/NoSuchResourceException.java | 7 - .../replica/replica1/NotBorrowedException.java | 7 - .../java/derms/replica/replica1/ObjectPacket.java | 33 --- .../derms/replica/replica1/RequestResource.java | 223 ------------------ src/main/java/derms/replica/replica1/Resource.java | 39 --- .../replica/replica1/ResourceAvailability.java | 128 ---------- .../java/derms/replica/replica1/ResourceID.java | 49 ---- .../java/derms/replica/replica1/ResourceName.java | 18 -- .../derms/replica/replica1/ResourceTransfer.java | 50 ---- .../java/derms/replica/replica1/Resources.java | 74 ------ .../java/derms/replica/replica1/ResponderID.java | 16 -- .../derms/replica/replica1/ResponderServer.java | 94 -------- .../derms/replica/replica1/ReturnResource.java | 195 --------------- .../replica/replica1/ServerCommunicationError.java | 7 - src/main/java/derms/replica/replica1/Servers.java | 34 --- .../java/derms/replica/replica1/StationServer.java | 136 ----------- .../java/derms/replica/replica1/SwapResource.java | 262 --------------------- .../replica/replica2/AlreadyBorrowedException.java | 7 + .../derms/replica/replica2/AnnounceListener.java | 81 +++++++ .../java/derms/replica/replica2/Announcer.java | 69 ++++++ src/main/java/derms/replica/replica2/City.java | 38 +++ .../java/derms/replica/replica2/CoordinatorID.java | 34 +++ .../derms/replica/replica2/CoordinatorServer.java | 170 +++++++++++++ .../java/derms/replica/replica2/DermsLogger.java | 23 ++ .../java/derms/replica/replica2/FindResource.java | 160 +++++++++++++ src/main/java/derms/replica/replica2/Hosts.java | 26 ++ src/main/java/derms/replica/replica2/ID.java | 5 + .../replica/replica2/InvalidDurationException.java | 7 + .../replica/replica2/NoSuchResourceException.java | 7 + .../replica/replica2/NotBorrowedException.java | 7 + .../java/derms/replica/replica2/ObjectPacket.java | 33 +++ .../derms/replica/replica2/RequestResource.java | 223 ++++++++++++++++++ src/main/java/derms/replica/replica2/Resource.java | 39 +++ .../replica/replica2/ResourceAvailability.java | 128 ++++++++++ .../java/derms/replica/replica2/ResourceID.java | 49 ++++ .../java/derms/replica/replica2/ResourceName.java | 18 ++ .../derms/replica/replica2/ResourceTransfer.java | 50 ++++ .../java/derms/replica/replica2/Resources.java | 74 ++++++ .../java/derms/replica/replica2/ResponderID.java | 16 ++ .../derms/replica/replica2/ResponderServer.java | 94 ++++++++ .../derms/replica/replica2/ReturnResource.java | 195 +++++++++++++++ .../replica/replica2/ServerCommunicationError.java | 7 + src/main/java/derms/replica/replica2/Servers.java | 34 +++ .../java/derms/replica/replica2/StationServer.java | 136 +++++++++++ .../java/derms/replica/replica2/SwapResource.java | 262 +++++++++++++++++++++ 56 files changed, 1992 insertions(+), 1992 deletions(-) delete mode 100644 src/main/java/derms/replica/replica1/AlreadyBorrowedException.java delete mode 100644 src/main/java/derms/replica/replica1/AnnounceListener.java delete mode 100644 src/main/java/derms/replica/replica1/Announcer.java delete mode 100644 src/main/java/derms/replica/replica1/City.java delete mode 100644 src/main/java/derms/replica/replica1/CoordinatorID.java delete mode 100644 src/main/java/derms/replica/replica1/CoordinatorServer.java delete mode 100644 src/main/java/derms/replica/replica1/DermsLogger.java delete mode 100644 src/main/java/derms/replica/replica1/FindResource.java delete mode 100644 src/main/java/derms/replica/replica1/Hosts.java delete mode 100644 src/main/java/derms/replica/replica1/ID.java delete mode 100644 src/main/java/derms/replica/replica1/InvalidDurationException.java delete mode 100644 src/main/java/derms/replica/replica1/NoSuchResourceException.java delete mode 100644 src/main/java/derms/replica/replica1/NotBorrowedException.java delete mode 100644 src/main/java/derms/replica/replica1/ObjectPacket.java delete mode 100644 src/main/java/derms/replica/replica1/RequestResource.java delete mode 100644 src/main/java/derms/replica/replica1/Resource.java delete mode 100644 src/main/java/derms/replica/replica1/ResourceAvailability.java delete mode 100644 src/main/java/derms/replica/replica1/ResourceID.java delete mode 100644 src/main/java/derms/replica/replica1/ResourceName.java delete mode 100644 src/main/java/derms/replica/replica1/ResourceTransfer.java delete mode 100644 src/main/java/derms/replica/replica1/Resources.java delete mode 100644 src/main/java/derms/replica/replica1/ResponderID.java delete mode 100644 src/main/java/derms/replica/replica1/ResponderServer.java delete mode 100644 src/main/java/derms/replica/replica1/ReturnResource.java delete mode 100644 src/main/java/derms/replica/replica1/ServerCommunicationError.java delete mode 100644 src/main/java/derms/replica/replica1/Servers.java delete mode 100644 src/main/java/derms/replica/replica1/StationServer.java delete mode 100644 src/main/java/derms/replica/replica1/SwapResource.java create mode 100644 src/main/java/derms/replica/replica2/AlreadyBorrowedException.java create mode 100644 src/main/java/derms/replica/replica2/AnnounceListener.java create mode 100644 src/main/java/derms/replica/replica2/Announcer.java create mode 100644 src/main/java/derms/replica/replica2/City.java create mode 100644 src/main/java/derms/replica/replica2/CoordinatorID.java create mode 100644 src/main/java/derms/replica/replica2/CoordinatorServer.java create mode 100644 src/main/java/derms/replica/replica2/DermsLogger.java create mode 100644 src/main/java/derms/replica/replica2/FindResource.java create mode 100644 src/main/java/derms/replica/replica2/Hosts.java create mode 100644 src/main/java/derms/replica/replica2/ID.java create mode 100644 src/main/java/derms/replica/replica2/InvalidDurationException.java create mode 100644 src/main/java/derms/replica/replica2/NoSuchResourceException.java create mode 100644 src/main/java/derms/replica/replica2/NotBorrowedException.java create mode 100644 src/main/java/derms/replica/replica2/ObjectPacket.java create mode 100644 src/main/java/derms/replica/replica2/RequestResource.java create mode 100644 src/main/java/derms/replica/replica2/Resource.java create mode 100644 src/main/java/derms/replica/replica2/ResourceAvailability.java create mode 100644 src/main/java/derms/replica/replica2/ResourceID.java create mode 100644 src/main/java/derms/replica/replica2/ResourceName.java create mode 100644 src/main/java/derms/replica/replica2/ResourceTransfer.java create mode 100644 src/main/java/derms/replica/replica2/Resources.java create mode 100644 src/main/java/derms/replica/replica2/ResponderID.java create mode 100644 src/main/java/derms/replica/replica2/ResponderServer.java create mode 100644 src/main/java/derms/replica/replica2/ReturnResource.java create mode 100644 src/main/java/derms/replica/replica2/ServerCommunicationError.java create mode 100644 src/main/java/derms/replica/replica2/Servers.java create mode 100644 src/main/java/derms/replica/replica2/StationServer.java create mode 100644 src/main/java/derms/replica/replica2/SwapResource.java diff --git a/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java b/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java deleted file mode 100644 index 634f154..0000000 --- a/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java +++ /dev/null @@ -1,7 +0,0 @@ -package derms.replica.replica1; - -public class AlreadyBorrowedException extends Exception { - public AlreadyBorrowedException(String message) { - super(message); - } -} diff --git a/src/main/java/derms/replica/replica1/AnnounceListener.java b/src/main/java/derms/replica/replica1/AnnounceListener.java deleted file mode 100644 index 9b5980d..0000000 --- a/src/main/java/derms/replica/replica1/AnnounceListener.java +++ /dev/null @@ -1,81 +0,0 @@ -package derms.replica.replica1; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.net.*; -import java.util.logging.Logger; - -public class AnnounceListener implements Runnable { - private static final int bufsize = 1024; - - private InetSocketAddress groupAddr; - private InetAddress localAddr; - private Servers servers; - private Logger log; - - public AnnounceListener(InetSocketAddress groupAddr, InetAddress localAddr, Servers servers) throws IOException { - this.groupAddr = groupAddr; - this.localAddr = localAddr; - this.servers = servers; - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - NetworkInterface netInterface = null; - try { - netInterface = NetworkInterface.getByInetAddress(localAddr); - if (netInterface == null) { - throw new Exception("netInterface is null"); - } - } catch (Exception e) { - log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); - return; - } - - MulticastSocket sock = null; - try { - sock = new MulticastSocket(groupAddr.getPort()); - sock.joinGroup(groupAddr, netInterface); - } catch (Exception e) { - log.severe("Failed to open multicast socket: "+e.getMessage()); - return; - } - - log.info("Listening to "+groupAddr.toString()+" from "+localAddr.toString()+" ("+netInterface.getName()+")"); - byte[] buf = new byte[bufsize]; - DatagramPacket pkt = new DatagramPacket(buf, buf.length); - - for (;;) { - try { - sock.receive(pkt); - } catch (Exception e) { - log.warning("Error receiving from multicast socket: "+e.getMessage()); - continue; - } - - ObjectInputStream objStream; - try { - objStream = new ObjectInputStream( - new ByteArrayInputStream(pkt.getData())); - } catch (IOException e) { - log.warning("Failed to create input stream: "+e.getMessage()); - continue; - } - - City city; - try { - city = (City) objStream.readObject(); - } catch (Exception e) { - log.warning("Failed to deserialize data: "+e.getMessage()); - continue; - } - - InetAddress remote = pkt.getAddress(); - if (servers.put(city, remote) == null) { - log.info("Added remote server "+city.toString()+" "+remote.toString()); - } - } - } -} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/Announcer.java b/src/main/java/derms/replica/replica1/Announcer.java deleted file mode 100644 index 93bbe24..0000000 --- a/src/main/java/derms/replica/replica1/Announcer.java +++ /dev/null @@ -1,69 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.net.*; -import java.util.logging.Logger; - -public class Announcer implements Runnable { - public static final long intervalMillis = 3000; - - private SocketAddress group; - private InetAddress localAddr; - private City city; - private Logger log; - - public Announcer(SocketAddress group, InetAddress localAddr, City city) throws IOException { - this.group = group; - this.localAddr = localAddr; - this.city = city; - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - NetworkInterface netInterface = null; - try { - netInterface = NetworkInterface.getByInetAddress(localAddr); - if (netInterface == null) { - throw new Exception("netInterface is null"); - } - } catch (Exception e) { - log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); - return; - } - - MulticastSocket sock = null; - try { - sock = new MulticastSocket(); - sock.joinGroup(group, netInterface); - } catch (Exception e) { - log.severe("Failed to open multicast socket: "+e.getMessage()); - return; - } - - log.info("Announcing from "+localAddr.toString()+" ("+netInterface.getName()+") to "+group.toString()); - - DatagramPacket pkt = null; - try { - pkt = ObjectPacket.create(city, group); - } catch (IOException e) { - log.severe("Failed to create packet: "+e.getMessage()); - sock.close(); - return; - } - - try { - for (;;) { - sock.send(pkt); - Thread.sleep(intervalMillis); - } - } catch (IOException e) { - log.severe("Failed to send to multicast socket "+group.toString()+": "+e.getMessage()); - } catch (InterruptedException e) { - log.info("Interrupted."); - } finally { - log.info("Shutting down..."); - sock.close(); - } - } -} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/City.java b/src/main/java/derms/replica/replica1/City.java deleted file mode 100644 index 548f1fa..0000000 --- a/src/main/java/derms/replica/replica1/City.java +++ /dev/null @@ -1,38 +0,0 @@ -package derms.replica.replica1; - -import java.io.Serializable; - -public class City implements Serializable { - public static final int codeLen = 3; - - private String code; - - public City(String code) throws IllegalArgumentException { - if (code.length() != codeLen) - throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters"); - this.code = code; - } - - public City() { - this("XXX"); - } - - @Override - public String toString() { - return code; - } - - @Override - public boolean equals(Object obj) { - if (obj == null || this.getClass() != obj.getClass()) { - return false; - } - City other = (City) obj; - return this.code.equals(other.code); - } - - @Override - public int hashCode() { - return code.hashCode(); - } -} diff --git a/src/main/java/derms/replica/replica1/CoordinatorID.java b/src/main/java/derms/replica/replica1/CoordinatorID.java deleted file mode 100644 index c09124c..0000000 --- a/src/main/java/derms/replica/replica1/CoordinatorID.java +++ /dev/null @@ -1,34 +0,0 @@ -package derms.replica.replica1; - -import java.io.Serializable; - -public class CoordinatorID implements Serializable { - public String city; - public short num; - - public CoordinatorID(String city, short num) { - this.city = city; - this.num = num; - } - - public CoordinatorID(String city, int num) { - this(city, (short) num); - } - - public CoordinatorID() { - this("XXX", 0); - } - - @Override - public boolean equals(Object obj) { - if (obj.getClass() != this.getClass()) - return false; - CoordinatorID other = (CoordinatorID) obj; - return other.city.equals(this.city) && other.num == this.num; - } - - @Override - public String toString() { - return city+"C"+num; - } -} diff --git a/src/main/java/derms/replica/replica1/CoordinatorServer.java b/src/main/java/derms/replica/replica1/CoordinatorServer.java deleted file mode 100644 index bdcf242..0000000 --- a/src/main/java/derms/replica/replica1/CoordinatorServer.java +++ /dev/null @@ -1,170 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.net.InetAddress; -import java.time.Duration; -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -public class CoordinatorServer { - public static final Duration timeout = Duration.ofSeconds(5); - - private City city; - private Resources resources; - private Servers servers; - private Logger log; - - public CoordinatorServer(City city, Resources resources, Servers servers) throws IOException { - super(); - this.city = city; - this.resources = resources; - this.servers = servers; - this.log = DermsLogger.getLogger(this.getClass()); - } - - public CoordinatorServer() throws IOException { - this(new City(), new Resources(), new Servers()); - } - - public void requestResource(CoordinatorID cid, ResourceID rid, int duration) - throws ServerCommunicationError, NoSuchResourceException, - AlreadyBorrowedException, InvalidDurationException - { - log.info("Request for "+rid+" from "+cid); - - InetAddress server = servers.get(new City(rid.city)); - if (server == null) { - throw new ServerCommunicationError("requestResource(): no connection to server "+rid.city.toString()); - } - - RequestResource.Client client = new RequestResource.Client(cid, rid, duration); - RequestResource.Response response; - try { - response = client.sendRequest(server); - } catch (IOException e) { - throw new ServerCommunicationError("requestResource(): "+e.getMessage()); - } - switch (response.status) { - case SUCCESS: - log.info("Request "+rid+" from "+cid+" - success"); - break; - case NO_SUCH_RESOURCE: - log.warning(response.message); - throw new NoSuchResourceException(response.message); - case ALREADY_BORROWED: - log.warning(response.message); - throw new AlreadyBorrowedException(response.message); - case INVALID_DURATION: - log.warning(response.message); - throw new InvalidDurationException(response.message); - default: - log.warning("Unsuccessful response from server: "+response.message); - throw new ServerCommunicationError("requestResource(): failed to borrow resource: "+response.message); - } - } - - public Resource[] findResource(CoordinatorID cid, ResourceName rname) throws ServerCommunicationError { - log.info("Find Resource "+rname+" from "+cid); - FindResource.Request request = new FindResource.Request(cid, rname); - Collection response = ConcurrentHashMap.newKeySet(); - ExecutorService pool = Executors.newFixedThreadPool(servers.size()); - try { - for (InetAddress server : servers.all()) { - pool.execute(new FindResource.Client(request, server, response)); - } - } catch (IOException e) { - String msg = "Failed to start FindResource Client: "+e.getMessage(); - log.severe(msg); - throw new ServerCommunicationError("findResource(): "+msg); - } - log.fine("Started worker threads"); - pool.shutdown(); - boolean terminated; - try { - terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - String msg = "findResource() interrupted: "+e.getMessage(); - log.warning(msg); - throw new ServerCommunicationError("findResource(): "+msg); - } - if (!terminated) { - String msg = "Request timed out: no response after "+timeout.toString(); - log.warning(msg); - throw new ServerCommunicationError("findResource(): "+msg); - } - Resource[] arr = new Resource[0]; - arr = response.toArray(arr); - log.info("Find resource "+rname+" from "+cid+" - success. Response length: "+arr.length); - return arr; - } - - public void returnResource(CoordinatorID cid, ResourceID rid) - throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException - { - log.info("Return resource "+rid+" from "+cid); - InetAddress server = servers.get(new City(rid.city)); - if (server == null) { - String msg = "no connection to server "+rid.city; - log.warning(msg); - throw new ServerCommunicationError("returnResource(): "+msg); - } - log.fine("server address: "+server); - - ReturnResource.Client client = new ReturnResource.Client(cid, rid); - ReturnResource.Response response; - try { - response = client.sendRequest(server); - } catch (IOException e) { - log.warning(e.getMessage()); - throw new ServerCommunicationError("returnResource(): "+e.getMessage()); - } - switch (response.status) { - case SUCCESS: - log.info(cid+" return "+rid+" - success"); - break; - case NO_SUCH_RESOURCE: - log.warning(response.message); - throw new NoSuchResourceException(response.message); - case NOT_BORROWED: - log.warning(response.message); - throw new NotBorrowedException(response.message); - default: - String msg = "Failed to return resource: "+response.message; - log.warning(msg); - throw new ServerCommunicationError("returnResource(): "+msg); - } - } - - public void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException { - log.info(cid+": swap "+oldRID+", "+newRID); - - InetAddress server = servers.get(new City(oldRID.city)); - if (server == null) { - String msg = "no connection to server "+oldRID.city; - log.warning(msg); - throw new ServerCommunicationError("swapResource(): "+msg); - } - log.fine("server address: "+server); - - SwapResource.Client client = new SwapResource.Client(cid, oldRID, newRID); - SwapResource.Response response; - try { - response = client.sendRequest(server); - } catch (IOException e) { - throw new ServerCommunicationError("swapResource(): "+e.getMessage()); - } - switch (response.status) { - case SUCCESS: - log.info(cid+": swap "+oldRID+", "+newRID+" - success"); - break; - case NO_SUCH_RESOURCE: - throw new NoSuchResourceException(response.message); - default: - throw new ServerCommunicationError("swapResource(): "+response.message); - } - } -} diff --git a/src/main/java/derms/replica/replica1/DermsLogger.java b/src/main/java/derms/replica/replica1/DermsLogger.java deleted file mode 100644 index 9ff8249..0000000 --- a/src/main/java/derms/replica/replica1/DermsLogger.java +++ /dev/null @@ -1,23 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.util.logging.FileHandler; -import java.util.logging.Handler; -import java.util.logging.Logger; -import java.util.logging.SimpleFormatter; - -public class DermsLogger { - public static final String logFile = "server.log"; - - private static Logger log = null; - - public static Logger getLogger(Class clazz) throws IOException { - if (log == null) { - log = Logger.getLogger(clazz.getName()); - Handler fileHandler = new FileHandler(logFile); - fileHandler.setFormatter(new SimpleFormatter()); - log.addHandler(fileHandler); - } - return log; - } -} diff --git a/src/main/java/derms/replica/replica1/FindResource.java b/src/main/java/derms/replica/replica1/FindResource.java deleted file mode 100644 index c9a0552..0000000 --- a/src/main/java/derms/replica/replica1/FindResource.java +++ /dev/null @@ -1,160 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.io.Serializable; -import java.net.*; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class FindResource { - public static final int port = 5558; - - public static class Client implements Runnable { - private InetAddress serverAddr; - private Request request; - private Collection response; - private Logger log; - - public Client(Request request, InetAddress serverAddr, Collection response) throws IOException { - this.serverAddr = serverAddr; - this.request = request; - this.response = response; - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - DatagramSocket sock; - try { - sock = new DatagramSocket(); - } catch (Exception e) { - log.severe("Failed to open socket: "+e.getMessage()); - return; - } - - DatagramPacket requestPkt; - try { - requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); - } catch (IOException e) { - log.severe("Failed to create request packet: "+e.getMessage()); - sock.close(); - return; - } - - try { - sock.send(requestPkt); - } catch (Exception e) { - log.severe("Failed to send request: "+e.getMessage()); - sock.close(); - return; - } - - Resource[] resources; - try { - resources = ResourceTransfer.receive(sock); - } catch (IOException e) { - log.severe(e.getMessage()); - return; - } finally { - sock.close(); - } - - for (Resource r : resources) { - response.add(r); - } - } - } - - public static class Server implements Runnable { - private static final int bufsize = 4096; - - private InetAddress localAddr; - private Resources resources; - private ExecutorService pool; - private Logger log; - - public Server(InetAddress localAddr, Resources resources) throws IOException { - this.localAddr = localAddr; - this.resources = resources; - this.pool = Executors.newWorkStealingPool(); - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - DatagramSocket sock = null; - try { - sock = new DatagramSocket(port, localAddr); - } catch (Exception e) { - log.severe("Failed to bind socket: "+e.getMessage()); - return; - } - - log.info("Running on "+localAddr.toString()+":"+port); - - DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); - try { - for (;;) { - try { - sock.receive(requestPkt); - } catch (Exception e) { - log.warning("Error receiving from socket: "+e.getMessage()); - continue; - } - log.info("Got request"); - - Request request; - try { - request = ObjectPacket.deserialize(requestPkt, Request.class); - } catch (IOException e) { - log.warning("Failed to deserialize request: "+e.getMessage()); - continue; - } - - pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); - } - } finally { - sock.close(); - } - } - } - - public static class Request implements Serializable { - private CoordinatorID cid; - private ResourceName rname; - - public Request(CoordinatorID cid, ResourceName rname) { - this.cid = cid; - this.rname = rname; - } - } - - private static class RequestHandler implements Runnable { - private Request request; - private SocketAddress client; - private Resources resources; - private Logger log; - - private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { - this.request = request; - this.client = client; - this.resources = resources; - this.log = log; - } - - @Override - public void run() { - List borrowedResources = resources.borrowed(request.cid, request.rname); - log.info(""+borrowedResources.size()+" "+request.rname+" resources borrowed by "+request.cid); - try { - Resource[] arr = new Resource[0]; - ResourceTransfer.send(borrowedResources.toArray(arr), client); - } catch (IOException e) { - log.severe("Failed to send response: "+e.getMessage()); - } - } - } -} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/Hosts.java b/src/main/java/derms/replica/replica1/Hosts.java deleted file mode 100644 index e27413d..0000000 --- a/src/main/java/derms/replica/replica1/Hosts.java +++ /dev/null @@ -1,26 +0,0 @@ -package derms.replica.replica1; - -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; - -public class Hosts { - private static Map hosts = null; - - public static String get(City city) throws UnknownHostException { - if (hosts == null) - init(); - - String host = hosts.get(city); - if (host == null) - throw new UnknownHostException("unknown host: "+city); - return host; - } - - private static void init() { - hosts = new HashMap(); - hosts.put(new City("MTL"), "alpine1"); - hosts.put(new City("QUE"), "alpine2"); - hosts.put(new City("SHE"), "alpine3"); - } -} diff --git a/src/main/java/derms/replica/replica1/ID.java b/src/main/java/derms/replica/replica1/ID.java deleted file mode 100644 index 8d9f9aa..0000000 --- a/src/main/java/derms/replica/replica1/ID.java +++ /dev/null @@ -1,5 +0,0 @@ -package derms.replica.replica1; - -public class ID { - public static final int nDigits = 4; -} diff --git a/src/main/java/derms/replica/replica1/InvalidDurationException.java b/src/main/java/derms/replica/replica1/InvalidDurationException.java deleted file mode 100644 index ba3c75a..0000000 --- a/src/main/java/derms/replica/replica1/InvalidDurationException.java +++ /dev/null @@ -1,7 +0,0 @@ -package derms.replica.replica1; - -public class InvalidDurationException extends Exception { - public InvalidDurationException (String message) { - super(message); - } -} diff --git a/src/main/java/derms/replica/replica1/NoSuchResourceException.java b/src/main/java/derms/replica/replica1/NoSuchResourceException.java deleted file mode 100644 index 3979570..0000000 --- a/src/main/java/derms/replica/replica1/NoSuchResourceException.java +++ /dev/null @@ -1,7 +0,0 @@ -package derms.replica.replica1; - -public class NoSuchResourceException extends Exception { - public NoSuchResourceException(String message) { - super(message); - } -} diff --git a/src/main/java/derms/replica/replica1/NotBorrowedException.java b/src/main/java/derms/replica/replica1/NotBorrowedException.java deleted file mode 100644 index 5c0257a..0000000 --- a/src/main/java/derms/replica/replica1/NotBorrowedException.java +++ /dev/null @@ -1,7 +0,0 @@ -package derms.replica.replica1; - -public class NotBorrowedException extends Exception { - public NotBorrowedException (String message) { - super(message); - } -} diff --git a/src/main/java/derms/replica/replica1/ObjectPacket.java b/src/main/java/derms/replica/replica1/ObjectPacket.java deleted file mode 100644 index 9d7de45..0000000 --- a/src/main/java/derms/replica/replica1/ObjectPacket.java +++ /dev/null @@ -1,33 +0,0 @@ -package derms.replica.replica1; - -import java.io.*; -import java.net.DatagramPacket; -import java.net.SocketAddress; - -public class ObjectPacket { - public static DatagramPacket create(Serializable obj, SocketAddress remoteAddr) throws IOException { - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - ObjectOutputStream objStream = new ObjectOutputStream(byteStream); - objStream.writeObject(obj); - objStream.flush(); - byte[] buf = byteStream.toByteArray(); - objStream.close(); - return new DatagramPacket(buf, buf.length, remoteAddr); - } - - public static E deserialize(DatagramPacket pkt, Class clazz) throws IOException { - ObjectInputStream objectStream; - try { - objectStream = new ObjectInputStream( - new ByteArrayInputStream(pkt.getData())); - } catch (Exception e) { - throw new IOException("failed to create input stream: "+e.getMessage()); - } - - try { - return clazz.cast(objectStream.readObject()); - } catch (Exception e) { - throw new IOException(e.getMessage()); - } - } -} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/RequestResource.java b/src/main/java/derms/replica/replica1/RequestResource.java deleted file mode 100644 index f7c4cbd..0000000 --- a/src/main/java/derms/replica/replica1/RequestResource.java +++ /dev/null @@ -1,223 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.io.Serializable; -import java.net.*; -import java.util.NoSuchElementException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class RequestResource { - public static final int port = 5557; - public static final int bufsize = 4096; - - public static class Client { - private CoordinatorID coordinatorID; - private ResourceID resourceID; - private int duration; - - public Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) { - this.coordinatorID = coordinatorID; - this.resourceID = resourceID; - this.duration = duration; - } - - public Response sendRequest(InetAddress serverAddr) throws IOException { - Request request = new Request(coordinatorID, resourceID, duration); - DatagramSocket sock; - try { - sock = new DatagramSocket(); - } catch (Exception e) { - throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage()); - } - - DatagramPacket requestPkt; - try { - requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); - } catch (IOException e) { - sock.close(); - throw new IOException("Request Resource Client: failed to create request: "+e.getMessage()); - } - - sock.send(requestPkt); - - byte[] buf = new byte[bufsize]; - DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); - try { - sock.receive(responsePkt); - } catch (Exception e) { - sock.close(); - throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage()); - } - - try { - return ObjectPacket.deserialize(responsePkt, Response.class); - } catch (IOException e) { - throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage()); - } finally { - sock.close(); - } - } - } - - public static class Server implements Runnable { - private InetAddress localAddr; - private Resources resources; - private ExecutorService pool; - private Logger log; - - public Server(InetAddress localAddr, Resources resources) throws IOException { - this.localAddr = localAddr; - this.resources = resources; - pool = Executors.newWorkStealingPool(); - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - DatagramSocket sock = null; - try { - sock = new DatagramSocket(port, localAddr); - } catch (Exception e) { - log.severe("Failed to bind socket to "+localAddr.toString() - +": "+e.getMessage()); - return; - } - - log.info("Listening on "+localAddr.toString()+":"+port); - DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); - try { - for (;;) { - try { - sock.receive(requestPkt); - } catch (Exception e) { - log.warning("Error receiving from socket: "+e.getMessage()); - continue; - } - log.info("Got request"); - - Request request = null; - try { - request = ObjectPacket.deserialize(requestPkt, Request.class); - } catch (IOException e) { - log.warning("Failed to deserialize request: "+e.getMessage()); - continue; - } - - pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); - } - } finally { - sock.close(); - } - } - } - - private static class Request implements Serializable { - private CoordinatorID coordinatorID; - private ResourceID resourceID; - private int duration; - - private Request(CoordinatorID cid, ResourceID rid, int duration) { - this.coordinatorID = cid; - this.resourceID = rid; - this.duration = duration; - } - } - - private static class RequestHandler implements Runnable { - private Request request; - private SocketAddress client; - private Resources resources; - private Logger log; - - private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { - this.request = request; - this.client = client; - this.resources = resources; - this.log = log; - } - - @Override - public void run() { - Response response = borrow(); - - DatagramSocket sock; - try { - sock = new DatagramSocket(); - } catch (Exception e) { - log.severe("Failed to open socket: "+e.getMessage()); - return; - } - - DatagramPacket pkt; - try { - pkt = ObjectPacket.create(response, client); - } catch (IOException e) { - log.severe("Failed to create response packet: "+e.getMessage()); - sock.close(); - return; - } - try { - sock.send(pkt); - } catch (Exception e) { - log.severe("Failed to send response: "+e.getMessage()); - } - sock.close(); - } - - private Response borrow() { - try { - Resource resource = resources.getByID(request.resourceID); - synchronized (resource) { - if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) { - return new Response(Response.Status.ALREADY_BORROWED, - request.coordinatorID+" cannot borrow "+request.resourceID - +"; already borrowed by "+resource.borrower); - } else if (request.duration <= 0) { - return new Response(Response.Status.INVALID_DURATION, - "duration "+request.duration+" less than 1"); - } else if (request.duration > resource.duration) { - return new Response(Response.Status.INVALID_DURATION, - "cannot borrow "+resource.id+" for duration of "+request.duration - +"; only "+resource.duration+" remaining"); - } - - if (resource.borrower.equals(request.coordinatorID)) { - // Resource is already borrowed. Add to existing duration. - resource.borrowDuration += request.duration; - } else { - resource.borrowDuration = request.duration; - } - resource.borrower = request.coordinatorID; - resource.isBorrowed = true; - resource.duration -= request.duration; - - return new Response(Response.Status.SUCCESS, request.coordinatorID - +" successfully borrowed "+request.resourceID); - } - } catch (NoSuchElementException e) { - return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); - } - } - } - - public static class Response implements Serializable { - public Status status; - public String message; - - private Response(Status status, String message) { - this.status = status; - this.message = message; - } - - private Response() { - this(Status.SUCCESS, ""); - } - - public enum Status { - SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION - } - } -} - diff --git a/src/main/java/derms/replica/replica1/Resource.java b/src/main/java/derms/replica/replica1/Resource.java deleted file mode 100644 index 4abe41b..0000000 --- a/src/main/java/derms/replica/replica1/Resource.java +++ /dev/null @@ -1,39 +0,0 @@ -package derms.replica.replica1; - -import java.io.Serializable; - -public class Resource implements Serializable { - public ResourceID id; - public ResourceName name; - public int duration; - public boolean isBorrowed; - public CoordinatorID borrower; - public int borrowDuration; - - public Resource(ResourceID id, ResourceName name, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) { - this.id = id; - this.name = name; - this.duration = duration; - this.isBorrowed = isBorrowed; - this.borrower = borrower; - this.borrowDuration = borrowDuration; - } - - public Resource(ResourceID id, ResourceName name, int duration) { - this(id, name, duration, false, new CoordinatorID(), -1); - } - - public Resource() { - this(new ResourceID(), ResourceName.AMBULANCE, 0); - } - - @Override - public int hashCode() { - return id.hashCode(); - } - - @Override - public String toString() { - return id+" "+duration; - } -} diff --git a/src/main/java/derms/replica/replica1/ResourceAvailability.java b/src/main/java/derms/replica/replica1/ResourceAvailability.java deleted file mode 100644 index a6083ab..0000000 --- a/src/main/java/derms/replica/replica1/ResourceAvailability.java +++ /dev/null @@ -1,128 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.logging.Logger; - -public class ResourceAvailability { - public static final int port = 5556; - - public static class Client implements Runnable { - private InetAddress serverAddr; - private ResourceName request; - private Collection resources; - private Logger log; - - public Client(InetAddress serverAddr, ResourceName request, Collection response) throws IOException { - this.serverAddr = serverAddr; - this.request = request; - this.resources = response; - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - DatagramSocket sock; - try { - sock = new DatagramSocket(); - } catch (Exception e) { - log.severe("Error binding socket: "+e.getMessage()); - return; - } - log.fine("Created socket"); - - DatagramPacket reqPkt; - try { - reqPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); - } catch (IOException e) { - log.severe("Error creating request: "+e.getMessage()); - sock.close(); - return; - } - - try { - sock.send(reqPkt); - } catch (IOException e) { - log.severe("Error sending request: "+e.getMessage()); - sock.close(); - return; - } - log.fine("Sent request"); - - Resource[] response; - try { - response = ResourceTransfer.receive(sock); - } catch (IOException e) { - log.severe(e.getMessage()); - return; - } finally { - sock.close(); - } - - for (Resource resource : response) { - resources.add(resource); - } - } - } - - public static class Server implements Runnable { - public static final int bufsize = 1024; - - private InetAddress localAddr; - private Resources resources; - private Logger log; - - public Server(InetAddress localAddr, Resources resources) throws IOException { - this.localAddr = localAddr; - this.resources = resources; - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - DatagramSocket sock = null; - try { - sock = new DatagramSocket(port, localAddr); - } catch (Exception e) { - log.severe("Failed to bind socket to "+localAddr.toString()+": "+e.getMessage()); - return; - } - - log.info("Listening on "+localAddr.toString()+":"+port); - - DatagramPacket request = new DatagramPacket(new byte[bufsize], bufsize); - try { - for (;;) { - try { - sock.receive(request); - } catch (Exception e) { - log.warning("Error receiving from socket: "+e.getMessage()); - continue; - } - - ResourceName requestedName = null; - try { - requestedName = ObjectPacket.deserialize(request, ResourceName.class); - } catch (IOException e) { - log.warning("Failed to deserialize request: "+e.getMessage()); - continue; - } - log.info("Got request: "+requestedName); - - Resource[] response = resources.getByName(requestedName); - try { - ResourceTransfer.send(response, request.getSocketAddress()); - } catch (IOException e) { - log.warning("Error transfering resources: "+e.getMessage()); - } - } - } finally { - sock.close(); - } - } - } -} diff --git a/src/main/java/derms/replica/replica1/ResourceID.java b/src/main/java/derms/replica/replica1/ResourceID.java deleted file mode 100644 index 008b766..0000000 --- a/src/main/java/derms/replica/replica1/ResourceID.java +++ /dev/null @@ -1,49 +0,0 @@ -package derms.replica.replica1; - -import java.io.Serializable; - -public class ResourceID implements Serializable { - public String city; - public short num; - - public ResourceID (String city, short num) { - this.city = city; - this.num = num; - } - - public ResourceID() { - this("XXX", (short) 1111); - } - - public static ResourceID parse(String s) throws IllegalArgumentException { - if (s.length() != City.codeLen+ID.nDigits) { - throw new IllegalArgumentException("invalid resource ID: "+s); - } - try { - String cityCode = s.substring(0, City.codeLen); - short num = Short.parseShort(s.substring(City.codeLen)); - return new ResourceID(cityCode, num); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("invalid resource ID: "+e.getMessage()); - } - } - - @Override - public boolean equals(Object obj) { - if (obj == null || obj.getClass() != this.getClass()) { - return false; - } - ResourceID other = (ResourceID) obj; - return (this.city.equals(other.city)) && (this.num == other.num); - } - - @Override - public int hashCode() { - return city.hashCode() * num; - } - - @Override - public String toString() { - return city+num; - } -} diff --git a/src/main/java/derms/replica/replica1/ResourceName.java b/src/main/java/derms/replica/replica1/ResourceName.java deleted file mode 100644 index de4bc16..0000000 --- a/src/main/java/derms/replica/replica1/ResourceName.java +++ /dev/null @@ -1,18 +0,0 @@ -package derms.replica.replica1; - -import java.io.Serializable; - -public enum ResourceName implements Serializable { - AMBULANCE, - FIRETRUCK, - PERSONNEL; - - public static ResourceName parse(String s) { - switch (s) { - case "AMBULANCE": return ResourceName.AMBULANCE; - case "FIRETRUCK": return ResourceName.FIRETRUCK; - case "PERSONNEL": return ResourceName.PERSONNEL; - } - throw new IllegalArgumentException("invalid resource name: "+s); - } -} diff --git a/src/main/java/derms/replica/replica1/ResourceTransfer.java b/src/main/java/derms/replica/replica1/ResourceTransfer.java deleted file mode 100644 index fbc695c..0000000 --- a/src/main/java/derms/replica/replica1/ResourceTransfer.java +++ /dev/null @@ -1,50 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.io.Serializable; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; - -public class ResourceTransfer { - public static final int bufsize = 1024; - - public static void send(Resource[] resources, SocketAddress remoteAddr) throws IOException { - DatagramSocket sock = new DatagramSocket(); - - for (Resource resource : resources) { - DatagramPacket pkt = ObjectPacket.create(resource, remoteAddr); - sock.send(pkt); - } - - DatagramPacket pkt = ObjectPacket.create(new EndOfTransmission(), remoteAddr); - sock.send(pkt); - sock.close(); - } - - public static Resource[] receive(DatagramSocket sock) throws IOException { - List resources = new ArrayList(); - byte[] buf = new byte[bufsize]; - DatagramPacket response = new DatagramPacket(buf, buf.length); - - for (;;) { - sock.receive(response); - - Object obj = ObjectPacket.deserialize(response, Object.class); - if (obj.getClass() == EndOfTransmission.class) { - break; - } - try { - resources.add((Resource) obj); - } catch (Exception e) { - throw new IOException("expected Resource; got "+obj.getClass().toString()); - } - } - Resource[] arr = new Resource[0]; - return resources.toArray(arr); - } - - private static class EndOfTransmission implements Serializable {} -} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/Resources.java b/src/main/java/derms/replica/replica1/Resources.java deleted file mode 100644 index a6a8b3a..0000000 --- a/src/main/java/derms/replica/replica1/Resources.java +++ /dev/null @@ -1,74 +0,0 @@ -package derms.replica.replica1; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; - -public class Resources { - private Map> resources; - - public Resources() { - this.resources = new ConcurrentHashMap>(); - } - - public List borrowed(CoordinatorID borrower, ResourceName name) { - List borrowed = new ArrayList(); - Resource[] namedResources = getByName(name); - for (Resource r : namedResources) { - if (r.isBorrowed && r.borrower.equals(borrower)) { - borrowed.add(r); - } - } - return borrowed; - } - - public Resource getByID(ResourceID id) throws NoSuchElementException { - for (Map rids : resources.values()) { - Resource resource = rids.get(id); - if (resource != null) { - return resource; - } - } - throw new NoSuchElementException("No such resource "+id); - } - - public Resource[] getByName(ResourceName name) { - Map rids = resources.get(name); - if (rids == null) { - return new Resource[0]; - } - Resource[] r = new Resource[0]; - return rids.values().toArray(r); - } - - public void add(Resource r) { - Map rids; - synchronized (resources) { - rids = resources.get(r.name); - if (rids == null) { - rids = new ConcurrentHashMap(); - resources.put(r.name, rids); - } - } - synchronized (rids) { - Resource existing = rids.get(r.id); - if (existing != null) { - existing.duration += r.duration; - } else { - rids.put(r.id, r); - } - } - } - - public void removeByID(ResourceID id) throws NoSuchElementException { - for (Map rids : resources.values()) { - if (rids.containsKey(id)) { - rids.remove(id); - return; - } - } - throw new NoSuchElementException("No such resource "+id); - } -} diff --git a/src/main/java/derms/replica/replica1/ResponderID.java b/src/main/java/derms/replica/replica1/ResponderID.java deleted file mode 100644 index 0ccda8a..0000000 --- a/src/main/java/derms/replica/replica1/ResponderID.java +++ /dev/null @@ -1,16 +0,0 @@ -package derms.replica.replica1; - -public class ResponderID { - public City city; - public short num; - - public ResponderID(City city, short num) { - this.city = city; - this.num = num; - } - - @Override - public String toString() { - return ""+city+"R"+num; - } -} diff --git a/src/main/java/derms/replica/replica1/ResponderServer.java b/src/main/java/derms/replica/replica1/ResponderServer.java deleted file mode 100644 index 2a6b480..0000000 --- a/src/main/java/derms/replica/replica1/ResponderServer.java +++ /dev/null @@ -1,94 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.net.InetAddress; -import java.time.Duration; -import java.util.Collection; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -public class ResponderServer { - public static final Duration timeout = Duration.ofSeconds(5); - - private City city; - private Resources resources; - private Servers servers; - private Logger log; - - public ResponderServer(City city, Resources resources, Servers servers) throws IOException { - this.city = city; - this.resources = resources; - this.servers = servers; - this.log = DermsLogger.getLogger(this.getClass()); - } - - public ResponderServer() throws IOException { - this(new City(), new Resources(), new Servers()); - } - - public void addResource(Resource r) { - resources.add(r); - log.info("Added resource "+r+" - success"); - } - - public void removeResource(ResourceID rid, int duration) throws NoSuchResourceException { - log.info("Remove duration "+duration+" from "+rid); - try { - Resource resource = resources.getByID(rid); - synchronized (resource) { - if (duration < 0 || duration >= resource.duration) { - resources.removeByID(rid); - log.info("Removed resource "+rid+" - success"); - } else { - resource.duration -= duration; - log.info("Removed duration from resource "+rid+". New duration: "+resource.duration+" - success"); - } - } - } catch (NoSuchElementException e) { - // Wanted to remove duration from resource. - if (duration >= 0) { - String msg = "Cannot remove duration from "+rid+": resource does not exist."; - log.warning(msg); - throw new NoSuchResourceException(msg); - } - - // Duration is negative---wanted to remove resource completely. - // Success because it is already removed. - log.info("Not removing "+rid+": resource does not exist."); - } - } - - public Resource[] listResourceAvailability(ResourceName rname) throws ServerCommunicationError { - log.info("Request for available "+rname); - Collection availableResources = ConcurrentHashMap.newKeySet(); - ExecutorService pool = Executors.newFixedThreadPool(servers.size()); - try { - for (InetAddress serverAddr : servers.all()) { - pool.execute(new ResourceAvailability.Client(serverAddr, rname, availableResources)); - } - } catch (IOException e) { - String msg = "Failed to start ResourceAvailability Client: "+e.getMessage(); - log.severe(msg); - throw new ServerCommunicationError("ResourceAvailability: "+msg); - } - - log.fine("Started worker threads"); - pool.shutdown(); - boolean terminated; - try { - terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new ServerCommunicationError("listResourceAvailability(): listResourceAvailability() interrupted: "+e.getMessage()); - } - if (!terminated) { - throw new ServerCommunicationError("ResourceAvailability: request timed out: no response after "+timeout.toString()); - } - log.info("Response length "+availableResources.size()); - Resource[] arr = new Resource[0]; - return availableResources.toArray(arr); - } -} diff --git a/src/main/java/derms/replica/replica1/ReturnResource.java b/src/main/java/derms/replica/replica1/ReturnResource.java deleted file mode 100644 index e28815f..0000000 --- a/src/main/java/derms/replica/replica1/ReturnResource.java +++ /dev/null @@ -1,195 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.io.Serializable; -import java.net.*; -import java.util.NoSuchElementException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class ReturnResource { - public static final int port = 5559; - public static final int bufsize = 4096; - - public static class Client { - private CoordinatorID coordinatorID; - private ResourceID resourceID; - - public Client(CoordinatorID cid, ResourceID rid) { - this.coordinatorID = cid; - this.resourceID = rid; - } - - public Response sendRequest(InetAddress serverAddr) throws IOException { - Request request = new Request(coordinatorID, resourceID); - DatagramSocket sock; - try { - sock = new DatagramSocket(); - } catch (Exception e) { - throw new IOException("Return Resource Client: failed to open socket: "+e.getMessage()); - } - - DatagramPacket requestPkt; - try { - requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); - } catch (IOException e) { - sock.close(); - throw new IOException("Return Resource Client: failed to create request: "+e.getMessage()); - } - - sock.send(requestPkt); - - byte[] buf = new byte[bufsize]; - DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); - try { - sock.receive(responsePkt); - } catch (Exception e) { - sock.close(); - throw new IOException("Return Resource Client: error receiving from server: "+e.getMessage()); - } - - try { - return ObjectPacket.deserialize(responsePkt, Response.class); - } catch (IOException e) { - throw new IOException("Return Resource Client: failed to deserialize response: "+e.getMessage()); - } finally { - sock.close(); - } - } - } - - public static class Server implements Runnable { - private InetAddress localAddr; - private Resources resources; - private ExecutorService pool; - private Logger log; - - public Server(InetAddress localAddr, Resources resources) throws IOException { - this.localAddr = localAddr; - this.resources = resources; - pool = Executors.newWorkStealingPool(); - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - DatagramSocket sock = null; - try { - sock = new DatagramSocket(port, localAddr); - } catch (Exception e) { - log.severe("Failed to bind socket to "+localAddr.toString() - +": "+e.getMessage()); - return; - } - - log.info("Listening on "+localAddr.toString()+":"+port); - DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); - try { - for (;;) { - try { - sock.receive(requestPkt); - } catch (Exception e) { - log.warning("Error receiving from socket: "+e.getMessage()); - continue; - } - log.fine("Got request"); - - Request request = null; - try { - request = ObjectPacket.deserialize(requestPkt, Request.class); - } catch (IOException e) { - log.warning("Failed to deserialize request: "+e.getMessage()); - continue; - } - - pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources)); - } - } finally { - sock.close(); - } - } - } - - private static class Request implements Serializable { - private CoordinatorID coordinatorID; - private ResourceID resourceID; - - private Request(CoordinatorID cid, ResourceID rid) { - this.coordinatorID = cid; - this.resourceID = rid; - } - } - - private static class RequestHandler implements Runnable { - private Request request; - private SocketAddress client; - private Resources resources; - - private RequestHandler(Request request, SocketAddress client, Resources resources) { - this.request = request; - this.client = client; - this.resources = resources; - } - - @Override - public void run() { - Response response = returnResource(); - - DatagramSocket sock; - try { - sock= new DatagramSocket(); - } catch (Exception e) { - System.err.println("Request Resource Server: failed to open socket: "+e.getMessage()); - return; - } - - DatagramPacket pkt; - try { - pkt = ObjectPacket.create(response, client); - } catch (IOException e) { - System.err.println("Request Resource Server: failed to create response packet: "+e.getMessage()); - sock.close(); - return; - } - try { - sock.send(pkt); - } catch (Exception e) { - System.err.println("Request Resource Server: failed to send response: "+e.getMessage()); - } - sock.close(); - } - - private Response returnResource() { - try { - Resource resource = resources.getByID(request.resourceID); - synchronized (resource) { - if (!resource.isBorrowed || !resource.borrower.equals(request.coordinatorID)) { - return new Response(Response.Status.NOT_BORROWED, - request.resourceID+" is not borrowed by "+request.coordinatorID); - } - resource.isBorrowed = false; - resource.borrower = new CoordinatorID(); - resource.borrowDuration = -1; - return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id); - } - } catch (NoSuchElementException e) { - return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); - } - } - } - - public static class Response implements Serializable { - public Status status; - public String message; - - private Response(Status status, String message) { - this.status = status; - this.message = message; - } - - public enum Status { - SUCCESS, NO_SUCH_RESOURCE, NOT_BORROWED - } - } -} diff --git a/src/main/java/derms/replica/replica1/ServerCommunicationError.java b/src/main/java/derms/replica/replica1/ServerCommunicationError.java deleted file mode 100644 index 0f0586b..0000000 --- a/src/main/java/derms/replica/replica1/ServerCommunicationError.java +++ /dev/null @@ -1,7 +0,0 @@ -package derms.replica.replica1; - -public class ServerCommunicationError extends Exception { - public ServerCommunicationError(String message) { - super(message); - } -} diff --git a/src/main/java/derms/replica/replica1/Servers.java b/src/main/java/derms/replica/replica1/Servers.java deleted file mode 100644 index ed1255a..0000000 --- a/src/main/java/derms/replica/replica1/Servers.java +++ /dev/null @@ -1,34 +0,0 @@ -package derms.replica.replica1; - -import java.net.InetAddress; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class Servers { - private Map servers = new ConcurrentHashMap(); - - /** Returns the address of the server located in the specified city, or null if there is no server in the city. */ - public InetAddress get(City city) { - return servers.get(city); - } - - /** - * Associates the specified server address with the specified city. - * If there was already a server associated with the city, the old value is replaced. - * @param city the city where the server is located - * @param addr the address of the server - * @return the previous server address, or null if there was no server associated with this city. - */ - public InetAddress put(City city, InetAddress addr) { - return servers.put(city, addr); - } - - public Collection all() { - return servers.values(); - } - - public int size() { - return servers.size(); - } -} diff --git a/src/main/java/derms/replica/replica1/StationServer.java b/src/main/java/derms/replica/replica1/StationServer.java deleted file mode 100644 index 6bfc7c2..0000000 --- a/src/main/java/derms/replica/replica1/StationServer.java +++ /dev/null @@ -1,136 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class StationServer implements Runnable { - public static final String usage = "Usage: java StationServer "; - public static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); - - public City city; - public InetAddress localAddr; - public Resources resources; - public Servers servers; - private Logger log; - private ResponderServer responderServer; - private CoordinatorServer coordinatorServer; - - public StationServer(City city, InetAddress localAddr) throws IOException { - this.city = city; - this.localAddr = localAddr; - this.resources = new Resources(); - this.servers = new Servers(); - this.log = DermsLogger.getLogger(getClass()); - - try { - this.responderServer = new ResponderServer(city, resources, servers); - } catch (IOException e) { - throw new IOException("Failed to create ResponderServer: "+e.getMessage()); - } - log.info("Created ResponderServer"); - - try { - this.coordinatorServer = new CoordinatorServer(city, resources, servers); - } catch (IOException e) { - throw new IOException("Failed to create CoordinatorServer: "+e.getMessage()); - } - log.info("Created CoordinatorServer"); - } - - public static void main(String cmdlineArgs[]) { - Args args = null; - try { - args = new Args(cmdlineArgs); - } catch (IllegalArgumentException e) { - System.err.println(e.getMessage()); - System.out.println(usage); - System.exit(1); - } - - try { - (new StationServer(args.city, args.localAddr)).run(); - } catch (Exception e) { - System.err.println(e.getMessage()); - System.exit(1); - } - } - - @Override - public void run() { - log.info("Running"); - log.config("Local address is "+localAddr.toString()); - - ExecutorService pool = Executors.newCachedThreadPool(); - - try { - pool.execute(new ResourceAvailability.Server(localAddr, resources)); - } catch (IOException e) { - String msg = "Failed to start ResourceAvailability Server: "+e.getMessage(); - log.severe(msg); - return; - } - try { - pool.execute(new RequestResource.Server(localAddr, resources)); - } catch (IOException e) { - log.severe("Failed to start RequestResource Server: "+e.getMessage()); - return; - } - try { - pool.execute(new FindResource.Server(localAddr, resources)); - } catch (IOException e) { - log.severe("Failed to start FindResource Server: "+e.getMessage()); - return; - } - try { - pool.execute(new ReturnResource.Server(localAddr, resources)); - } catch (IOException e) { - log.severe("Failed to start ReturnResource Server: "+e.getMessage()); - return; - } - try { - pool.execute(new SwapResource.Server(localAddr, resources, servers)); - } catch (IOException e) { - log.severe("Failed to start SwapResource Server: "+e.getMessage()); - return; - } - - try { - pool.execute(new Announcer(announceGroup, localAddr, city)); - } catch (IOException e) { - log.severe("Failed to start Announcer: "+e.getMessage()); - return; - } - try { - pool.execute(new AnnounceListener(announceGroup, localAddr, servers)); - } catch (IOException e) { - log.severe("Failed to start AnnounceListener: "+e.getMessage()); - return; - } - } - - private static class Args { - private City city; - private InetAddress localAddr; - - private Args(String[] args) throws IllegalArgumentException { - if (args.length < 1) { - throw new IllegalArgumentException("Missing argument 'city'"); - } - city = new City(args[0]); - - if (args.length < 2) { - throw new IllegalArgumentException("Missing argument 'local host'"); - } - try { - localAddr = InetAddress.getByName(args[1]); - } catch (UnknownHostException | SecurityException e) { - throw new IllegalArgumentException("Bad value of 'local host': "+e.getMessage()); - } - } - } -} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/SwapResource.java b/src/main/java/derms/replica/replica1/SwapResource.java deleted file mode 100644 index 5c6502c..0000000 --- a/src/main/java/derms/replica/replica1/SwapResource.java +++ /dev/null @@ -1,262 +0,0 @@ -package derms.replica.replica1; - -import java.io.IOException; -import java.io.Serializable; -import java.net.*; -import java.util.NoSuchElementException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class SwapResource { - public static final int port = 5560; - public static final int bufSize = 4096; - - public static class Client { - private CoordinatorID cid; - private ResourceID oldRID; - private ResourceID newRID; - - public Client(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { - this.cid = cid; - this.oldRID = oldRID; - this.newRID = newRID; - } - - public Response sendRequest(InetAddress serverAddr) throws IOException { - Request request = new Request(cid, oldRID, newRID); - DatagramSocket sock; - try { - sock = new DatagramSocket(); - } catch (Exception e) { - throw new IOException("Swap Resource Client: failed to open socket: "+e.getMessage()); - } - - DatagramPacket requestPkt; - try { - requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); - } catch (IOException e) { - sock.close(); - throw new IOException("Swap Resource Client: failed to create request: "+e.getMessage()); - } - - sock.send(requestPkt); - - byte[] buf = new byte[bufSize]; - DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); - try { - sock.receive(responsePkt); - } catch (Exception e) { - sock.close(); - throw new IOException("Swap Resource Client: error receiving from server: "+e.getMessage()); - } - - try { - return ObjectPacket.deserialize(responsePkt, Response.class); - } catch (IOException e) { - throw new IOException("Swap Resource Client: failed to deserialize response: "+e.getMessage()); - } finally { - sock.close(); - } - } - } - - public static class Server implements Runnable { - private InetAddress localAddr; - private Resources resources; - private Servers servers; - private ExecutorService pool; - private Logger log; - - public Server(InetAddress localAddr, Resources resources, Servers servers) throws IOException { - this.localAddr = localAddr; - this.resources = resources; - this.servers = servers; - pool = Executors.newWorkStealingPool(); - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - DatagramSocket sock = null; - try { - sock = new DatagramSocket(port, localAddr); - } catch (Exception e) { - log.severe("Failed to bind socket to "+localAddr+": "+e.getMessage()); - return; - } - log.info("Listening on "+localAddr+":"+port); - - DatagramPacket requestPkt = new DatagramPacket(new byte[bufSize], bufSize); - - try { - for (;;) { - try { - sock.receive(requestPkt); - } catch (Exception e) { - log.warning("Error receiving from socket: "+e.getMessage()); - continue; - } - log.fine("Got request"); - - Request request = null; - try { - request = ObjectPacket.deserialize(requestPkt, Request.class); - } catch (IOException e) { - log.warning("Failed to deserialize request: "+e.getMessage()); - continue; - } - - SocketAddress client = requestPkt.getSocketAddress(); - try { - RequestHandler handler = new RequestHandler(request, client, resources, servers); - pool.execute(handler); - } catch (IOException e) { - log.warning("Failed to create request handler: "+e.getMessage()); - continue; - } - } - } finally { - sock.close(); - } - } - } - - private static class Request implements Serializable { - private CoordinatorID cid; - private ResourceID oldRID; - private ResourceID newRID; - - private Request(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { - this.cid = cid; - this.oldRID = oldRID; - this.newRID = newRID; - } - } - - private static class RequestHandler implements Runnable { - private Request request; - private SocketAddress client; - private Resources resources; - private Servers servers; - private Logger log; - - private RequestHandler(Request request, SocketAddress client, Resources resources, Servers servers) throws IOException { - this.request = request; - this.client = client; - this.resources = resources; - this.servers = servers; - this.log = DermsLogger.getLogger(this.getClass()); - } - - @Override - public void run() { - Response response = swapResources(); - - DatagramSocket sock; - try { - sock = new DatagramSocket(); - } catch (Exception e) { - log.severe("failed to open socket: "+e.getMessage()); - return; - } - - DatagramPacket pkt; - try { - pkt = ObjectPacket.create(response, client); - } catch (IOException e) { - log.severe("failed to create response: "+e.getMessage()); - sock.close(); - return; - } - - try { - sock.send(pkt); - } catch (Exception e) { - log.severe("failed to send response: "+e.getMessage()); - } finally { - sock.close(); - } - } - - private Response swapResources() { - try { - Resource resource = resources.getByID(request.oldRID); - synchronized (resource) { - if (!resource.borrower.equals(request.cid)) { - return new Response(Response.Status.NOT_BORROWED, "resource "+request.oldRID+" not borrowed by "+request.cid); - } - try { - acquireNewResource(resource.borrowDuration); - returnOldResource(resource); - return new Response(Response.Status.SUCCESS, request.cid+" success fully swapped "+request.oldRID+" for "+request.newRID); - } catch (UnknownHostException e) { - return new Response(Response.Status.UNKNOWN_HOST, e.getMessage()); - } catch (IOException e) { - return new Response(Response.Status.FAILURE, e.getMessage()); - } catch (CannotBorrow e) { - return new Response(Response.Status.CANNOT_BORROW, e.getMessage()); - } - } - } catch (NoSuchElementException e) { - return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.oldRID); - } - } - - private void acquireNewResource(int borrowDuration) throws UnknownHostException, IOException, CannotBorrow { - RequestResource.Client requestClient = new RequestResource.Client(request.cid, request.newRID, borrowDuration); - City city = new City(request.newRID.city); - InetAddress requestResourceServer = servers.get(city); - if (requestResourceServer == null) { - throw new UnknownHostException(city.toString()); - } - RequestResource.Response response = requestClient.sendRequest(requestResourceServer); - // TODO: make exception handling more granular---pass through status from Request. - if (response.status != RequestResource.Response.Status.SUCCESS) { - throw new CannotBorrow(request.cid, request.newRID, response.message); - } - } - - private void returnOldResource(Resource r) { - r.isBorrowed = false; - r.borrower = new CoordinatorID(); - r.borrowDuration = -1; - } - } - - public static class Response implements Serializable { - public Status status; - public String message; - - private Response(Status status, String message) { - this.status = status; - this.message = message; - } - - public enum Status { - SUCCESS, - FAILURE, - NO_SUCH_RESOURCE, - NOT_BORROWED, - CANNOT_BORROW, - UNKNOWN_HOST - } - } - - private static class CannotBorrow extends Exception { - CoordinatorID attemptedBorrower; - ResourceID rid; - String message; - - private CannotBorrow(CoordinatorID attemptedBorrower, ResourceID rid, String message) { - this.attemptedBorrower = attemptedBorrower; - this.rid = rid; - this.message = message; - } - - @Override - public String getMessage() { - return attemptedBorrower+" failed to borrow "+rid+": "+message; - } - } -} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica2/AlreadyBorrowedException.java b/src/main/java/derms/replica/replica2/AlreadyBorrowedException.java new file mode 100644 index 0000000..fb185b1 --- /dev/null +++ b/src/main/java/derms/replica/replica2/AlreadyBorrowedException.java @@ -0,0 +1,7 @@ +package derms.replica.replica2; + +public class AlreadyBorrowedException extends Exception { + public AlreadyBorrowedException(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica2/AnnounceListener.java b/src/main/java/derms/replica/replica2/AnnounceListener.java new file mode 100644 index 0000000..71aae9d --- /dev/null +++ b/src/main/java/derms/replica/replica2/AnnounceListener.java @@ -0,0 +1,81 @@ +package derms.replica.replica2; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.*; +import java.util.logging.Logger; + +public class AnnounceListener implements Runnable { + private static final int bufsize = 1024; + + private InetSocketAddress groupAddr; + private InetAddress localAddr; + private Servers servers; + private Logger log; + + public AnnounceListener(InetSocketAddress groupAddr, InetAddress localAddr, Servers servers) throws IOException { + this.groupAddr = groupAddr; + this.localAddr = localAddr; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + NetworkInterface netInterface = null; + try { + netInterface = NetworkInterface.getByInetAddress(localAddr); + if (netInterface == null) { + throw new Exception("netInterface is null"); + } + } catch (Exception e) { + log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + MulticastSocket sock = null; + try { + sock = new MulticastSocket(groupAddr.getPort()); + sock.joinGroup(groupAddr, netInterface); + } catch (Exception e) { + log.severe("Failed to open multicast socket: "+e.getMessage()); + return; + } + + log.info("Listening to "+groupAddr.toString()+" from "+localAddr.toString()+" ("+netInterface.getName()+")"); + byte[] buf = new byte[bufsize]; + DatagramPacket pkt = new DatagramPacket(buf, buf.length); + + for (;;) { + try { + sock.receive(pkt); + } catch (Exception e) { + log.warning("Error receiving from multicast socket: "+e.getMessage()); + continue; + } + + ObjectInputStream objStream; + try { + objStream = new ObjectInputStream( + new ByteArrayInputStream(pkt.getData())); + } catch (IOException e) { + log.warning("Failed to create input stream: "+e.getMessage()); + continue; + } + + City city; + try { + city = (City) objStream.readObject(); + } catch (Exception e) { + log.warning("Failed to deserialize data: "+e.getMessage()); + continue; + } + + InetAddress remote = pkt.getAddress(); + if (servers.put(city, remote) == null) { + log.info("Added remote server "+city.toString()+" "+remote.toString()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica2/Announcer.java b/src/main/java/derms/replica/replica2/Announcer.java new file mode 100644 index 0000000..546e55b --- /dev/null +++ b/src/main/java/derms/replica/replica2/Announcer.java @@ -0,0 +1,69 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.net.*; +import java.util.logging.Logger; + +public class Announcer implements Runnable { + public static final long intervalMillis = 3000; + + private SocketAddress group; + private InetAddress localAddr; + private City city; + private Logger log; + + public Announcer(SocketAddress group, InetAddress localAddr, City city) throws IOException { + this.group = group; + this.localAddr = localAddr; + this.city = city; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + NetworkInterface netInterface = null; + try { + netInterface = NetworkInterface.getByInetAddress(localAddr); + if (netInterface == null) { + throw new Exception("netInterface is null"); + } + } catch (Exception e) { + log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + MulticastSocket sock = null; + try { + sock = new MulticastSocket(); + sock.joinGroup(group, netInterface); + } catch (Exception e) { + log.severe("Failed to open multicast socket: "+e.getMessage()); + return; + } + + log.info("Announcing from "+localAddr.toString()+" ("+netInterface.getName()+") to "+group.toString()); + + DatagramPacket pkt = null; + try { + pkt = ObjectPacket.create(city, group); + } catch (IOException e) { + log.severe("Failed to create packet: "+e.getMessage()); + sock.close(); + return; + } + + try { + for (;;) { + sock.send(pkt); + Thread.sleep(intervalMillis); + } + } catch (IOException e) { + log.severe("Failed to send to multicast socket "+group.toString()+": "+e.getMessage()); + } catch (InterruptedException e) { + log.info("Interrupted."); + } finally { + log.info("Shutting down..."); + sock.close(); + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica2/City.java b/src/main/java/derms/replica/replica2/City.java new file mode 100644 index 0000000..b027d44 --- /dev/null +++ b/src/main/java/derms/replica/replica2/City.java @@ -0,0 +1,38 @@ +package derms.replica.replica2; + +import java.io.Serializable; + +public class City implements Serializable { + public static final int codeLen = 3; + + private String code; + + public City(String code) throws IllegalArgumentException { + if (code.length() != codeLen) + throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters"); + this.code = code; + } + + public City() { + this("XXX"); + } + + @Override + public String toString() { + return code; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || this.getClass() != obj.getClass()) { + return false; + } + City other = (City) obj; + return this.code.equals(other.code); + } + + @Override + public int hashCode() { + return code.hashCode(); + } +} diff --git a/src/main/java/derms/replica/replica2/CoordinatorID.java b/src/main/java/derms/replica/replica2/CoordinatorID.java new file mode 100644 index 0000000..00aa99b --- /dev/null +++ b/src/main/java/derms/replica/replica2/CoordinatorID.java @@ -0,0 +1,34 @@ +package derms.replica.replica2; + +import java.io.Serializable; + +public class CoordinatorID implements Serializable { + public String city; + public short num; + + public CoordinatorID(String city, short num) { + this.city = city; + this.num = num; + } + + public CoordinatorID(String city, int num) { + this(city, (short) num); + } + + public CoordinatorID() { + this("XXX", 0); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() != this.getClass()) + return false; + CoordinatorID other = (CoordinatorID) obj; + return other.city.equals(this.city) && other.num == this.num; + } + + @Override + public String toString() { + return city+"C"+num; + } +} diff --git a/src/main/java/derms/replica/replica2/CoordinatorServer.java b/src/main/java/derms/replica/replica2/CoordinatorServer.java new file mode 100644 index 0000000..a4f7547 --- /dev/null +++ b/src/main/java/derms/replica/replica2/CoordinatorServer.java @@ -0,0 +1,170 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class CoordinatorServer { + public static final Duration timeout = Duration.ofSeconds(5); + + private City city; + private Resources resources; + private Servers servers; + private Logger log; + + public CoordinatorServer(City city, Resources resources, Servers servers) throws IOException { + super(); + this.city = city; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + public CoordinatorServer() throws IOException { + this(new City(), new Resources(), new Servers()); + } + + public void requestResource(CoordinatorID cid, ResourceID rid, int duration) + throws ServerCommunicationError, NoSuchResourceException, + AlreadyBorrowedException, InvalidDurationException + { + log.info("Request for "+rid+" from "+cid); + + InetAddress server = servers.get(new City(rid.city)); + if (server == null) { + throw new ServerCommunicationError("requestResource(): no connection to server "+rid.city.toString()); + } + + RequestResource.Client client = new RequestResource.Client(cid, rid, duration); + RequestResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + throw new ServerCommunicationError("requestResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info("Request "+rid+" from "+cid+" - success"); + break; + case NO_SUCH_RESOURCE: + log.warning(response.message); + throw new NoSuchResourceException(response.message); + case ALREADY_BORROWED: + log.warning(response.message); + throw new AlreadyBorrowedException(response.message); + case INVALID_DURATION: + log.warning(response.message); + throw new InvalidDurationException(response.message); + default: + log.warning("Unsuccessful response from server: "+response.message); + throw new ServerCommunicationError("requestResource(): failed to borrow resource: "+response.message); + } + } + + public Resource[] findResource(CoordinatorID cid, ResourceName rname) throws ServerCommunicationError { + log.info("Find Resource "+rname+" from "+cid); + FindResource.Request request = new FindResource.Request(cid, rname); + Collection response = ConcurrentHashMap.newKeySet(); + ExecutorService pool = Executors.newFixedThreadPool(servers.size()); + try { + for (InetAddress server : servers.all()) { + pool.execute(new FindResource.Client(request, server, response)); + } + } catch (IOException e) { + String msg = "Failed to start FindResource Client: "+e.getMessage(); + log.severe(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + log.fine("Started worker threads"); + pool.shutdown(); + boolean terminated; + try { + terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + String msg = "findResource() interrupted: "+e.getMessage(); + log.warning(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + if (!terminated) { + String msg = "Request timed out: no response after "+timeout.toString(); + log.warning(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + Resource[] arr = new Resource[0]; + arr = response.toArray(arr); + log.info("Find resource "+rname+" from "+cid+" - success. Response length: "+arr.length); + return arr; + } + + public void returnResource(CoordinatorID cid, ResourceID rid) + throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException + { + log.info("Return resource "+rid+" from "+cid); + InetAddress server = servers.get(new City(rid.city)); + if (server == null) { + String msg = "no connection to server "+rid.city; + log.warning(msg); + throw new ServerCommunicationError("returnResource(): "+msg); + } + log.fine("server address: "+server); + + ReturnResource.Client client = new ReturnResource.Client(cid, rid); + ReturnResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + log.warning(e.getMessage()); + throw new ServerCommunicationError("returnResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info(cid+" return "+rid+" - success"); + break; + case NO_SUCH_RESOURCE: + log.warning(response.message); + throw new NoSuchResourceException(response.message); + case NOT_BORROWED: + log.warning(response.message); + throw new NotBorrowedException(response.message); + default: + String msg = "Failed to return resource: "+response.message; + log.warning(msg); + throw new ServerCommunicationError("returnResource(): "+msg); + } + } + + public void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException { + log.info(cid+": swap "+oldRID+", "+newRID); + + InetAddress server = servers.get(new City(oldRID.city)); + if (server == null) { + String msg = "no connection to server "+oldRID.city; + log.warning(msg); + throw new ServerCommunicationError("swapResource(): "+msg); + } + log.fine("server address: "+server); + + SwapResource.Client client = new SwapResource.Client(cid, oldRID, newRID); + SwapResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + throw new ServerCommunicationError("swapResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info(cid+": swap "+oldRID+", "+newRID+" - success"); + break; + case NO_SUCH_RESOURCE: + throw new NoSuchResourceException(response.message); + default: + throw new ServerCommunicationError("swapResource(): "+response.message); + } + } +} diff --git a/src/main/java/derms/replica/replica2/DermsLogger.java b/src/main/java/derms/replica/replica2/DermsLogger.java new file mode 100644 index 0000000..0495620 --- /dev/null +++ b/src/main/java/derms/replica/replica2/DermsLogger.java @@ -0,0 +1,23 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.util.logging.FileHandler; +import java.util.logging.Handler; +import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; + +public class DermsLogger { + public static final String logFile = "server.log"; + + private static Logger log = null; + + public static Logger getLogger(Class clazz) throws IOException { + if (log == null) { + log = Logger.getLogger(clazz.getName()); + Handler fileHandler = new FileHandler(logFile); + fileHandler.setFormatter(new SimpleFormatter()); + log.addHandler(fileHandler); + } + return log; + } +} diff --git a/src/main/java/derms/replica/replica2/FindResource.java b/src/main/java/derms/replica/replica2/FindResource.java new file mode 100644 index 0000000..be2eed9 --- /dev/null +++ b/src/main/java/derms/replica/replica2/FindResource.java @@ -0,0 +1,160 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class FindResource { + public static final int port = 5558; + + public static class Client implements Runnable { + private InetAddress serverAddr; + private Request request; + private Collection response; + private Logger log; + + public Client(Request request, InetAddress serverAddr, Collection response) throws IOException { + this.serverAddr = serverAddr; + this.request = request; + this.response = response; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + log.severe("Failed to create request packet: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(requestPkt); + } catch (Exception e) { + log.severe("Failed to send request: "+e.getMessage()); + sock.close(); + return; + } + + Resource[] resources; + try { + resources = ResourceTransfer.receive(sock); + } catch (IOException e) { + log.severe(e.getMessage()); + return; + } finally { + sock.close(); + } + + for (Resource r : resources) { + response.add(r); + } + } + } + + public static class Server implements Runnable { + private static final int bufsize = 4096; + + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket: "+e.getMessage()); + return; + } + + log.info("Running on "+localAddr.toString()+":"+port); + + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.info("Got request"); + + Request request; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); + } + } finally { + sock.close(); + } + } + } + + public static class Request implements Serializable { + private CoordinatorID cid; + private ResourceName rname; + + public Request(CoordinatorID cid, ResourceName rname) { + this.cid = cid; + this.rname = rname; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { + this.request = request; + this.client = client; + this.resources = resources; + this.log = log; + } + + @Override + public void run() { + List borrowedResources = resources.borrowed(request.cid, request.rname); + log.info(""+borrowedResources.size()+" "+request.rname+" resources borrowed by "+request.cid); + try { + Resource[] arr = new Resource[0]; + ResourceTransfer.send(borrowedResources.toArray(arr), client); + } catch (IOException e) { + log.severe("Failed to send response: "+e.getMessage()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica2/Hosts.java b/src/main/java/derms/replica/replica2/Hosts.java new file mode 100644 index 0000000..16eba3d --- /dev/null +++ b/src/main/java/derms/replica/replica2/Hosts.java @@ -0,0 +1,26 @@ +package derms.replica.replica2; + +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +public class Hosts { + private static Map hosts = null; + + public static String get(City city) throws UnknownHostException { + if (hosts == null) + init(); + + String host = hosts.get(city); + if (host == null) + throw new UnknownHostException("unknown host: "+city); + return host; + } + + private static void init() { + hosts = new HashMap(); + hosts.put(new City("MTL"), "alpine1"); + hosts.put(new City("QUE"), "alpine2"); + hosts.put(new City("SHE"), "alpine3"); + } +} diff --git a/src/main/java/derms/replica/replica2/ID.java b/src/main/java/derms/replica/replica2/ID.java new file mode 100644 index 0000000..cd31d36 --- /dev/null +++ b/src/main/java/derms/replica/replica2/ID.java @@ -0,0 +1,5 @@ +package derms.replica.replica2; + +public class ID { + public static final int nDigits = 4; +} diff --git a/src/main/java/derms/replica/replica2/InvalidDurationException.java b/src/main/java/derms/replica/replica2/InvalidDurationException.java new file mode 100644 index 0000000..a5702b2 --- /dev/null +++ b/src/main/java/derms/replica/replica2/InvalidDurationException.java @@ -0,0 +1,7 @@ +package derms.replica.replica2; + +public class InvalidDurationException extends Exception { + public InvalidDurationException (String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica2/NoSuchResourceException.java b/src/main/java/derms/replica/replica2/NoSuchResourceException.java new file mode 100644 index 0000000..0dcb2f8 --- /dev/null +++ b/src/main/java/derms/replica/replica2/NoSuchResourceException.java @@ -0,0 +1,7 @@ +package derms.replica.replica2; + +public class NoSuchResourceException extends Exception { + public NoSuchResourceException(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica2/NotBorrowedException.java b/src/main/java/derms/replica/replica2/NotBorrowedException.java new file mode 100644 index 0000000..19db8c1 --- /dev/null +++ b/src/main/java/derms/replica/replica2/NotBorrowedException.java @@ -0,0 +1,7 @@ +package derms.replica.replica2; + +public class NotBorrowedException extends Exception { + public NotBorrowedException (String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica2/ObjectPacket.java b/src/main/java/derms/replica/replica2/ObjectPacket.java new file mode 100644 index 0000000..65b032f --- /dev/null +++ b/src/main/java/derms/replica/replica2/ObjectPacket.java @@ -0,0 +1,33 @@ +package derms.replica.replica2; + +import java.io.*; +import java.net.DatagramPacket; +import java.net.SocketAddress; + +public class ObjectPacket { + public static DatagramPacket create(Serializable obj, SocketAddress remoteAddr) throws IOException { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ObjectOutputStream objStream = new ObjectOutputStream(byteStream); + objStream.writeObject(obj); + objStream.flush(); + byte[] buf = byteStream.toByteArray(); + objStream.close(); + return new DatagramPacket(buf, buf.length, remoteAddr); + } + + public static E deserialize(DatagramPacket pkt, Class clazz) throws IOException { + ObjectInputStream objectStream; + try { + objectStream = new ObjectInputStream( + new ByteArrayInputStream(pkt.getData())); + } catch (Exception e) { + throw new IOException("failed to create input stream: "+e.getMessage()); + } + + try { + return clazz.cast(objectStream.readObject()); + } catch (Exception e) { + throw new IOException(e.getMessage()); + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica2/RequestResource.java b/src/main/java/derms/replica/replica2/RequestResource.java new file mode 100644 index 0000000..44d85be --- /dev/null +++ b/src/main/java/derms/replica/replica2/RequestResource.java @@ -0,0 +1,223 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class RequestResource { + public static final int port = 5557; + public static final int bufsize = 4096; + + public static class Client { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + public Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) { + this.coordinatorID = coordinatorID; + this.resourceID = resourceID; + this.duration = duration; + } + + public Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(coordinatorID, resourceID, duration); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Request Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufsize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + public static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString() + +": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.info("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + private Request(CoordinatorID cid, ResourceID rid, int duration) { + this.coordinatorID = cid; + this.resourceID = rid; + this.duration = duration; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { + this.request = request; + this.client = client; + this.resources = resources; + this.log = log; + } + + @Override + public void run() { + Response response = borrow(); + + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + log.severe("Failed to create response packet: "+e.getMessage()); + sock.close(); + return; + } + try { + sock.send(pkt); + } catch (Exception e) { + log.severe("Failed to send response: "+e.getMessage()); + } + sock.close(); + } + + private Response borrow() { + try { + Resource resource = resources.getByID(request.resourceID); + synchronized (resource) { + if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) { + return new Response(Response.Status.ALREADY_BORROWED, + request.coordinatorID+" cannot borrow "+request.resourceID + +"; already borrowed by "+resource.borrower); + } else if (request.duration <= 0) { + return new Response(Response.Status.INVALID_DURATION, + "duration "+request.duration+" less than 1"); + } else if (request.duration > resource.duration) { + return new Response(Response.Status.INVALID_DURATION, + "cannot borrow "+resource.id+" for duration of "+request.duration + +"; only "+resource.duration+" remaining"); + } + + if (resource.borrower.equals(request.coordinatorID)) { + // Resource is already borrowed. Add to existing duration. + resource.borrowDuration += request.duration; + } else { + resource.borrowDuration = request.duration; + } + resource.borrower = request.coordinatorID; + resource.isBorrowed = true; + resource.duration -= request.duration; + + return new Response(Response.Status.SUCCESS, request.coordinatorID + +" successfully borrowed "+request.resourceID); + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); + } + } + } + + public static class Response implements Serializable { + public Status status; + public String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + private Response() { + this(Status.SUCCESS, ""); + } + + public enum Status { + SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION + } + } +} + diff --git a/src/main/java/derms/replica/replica2/Resource.java b/src/main/java/derms/replica/replica2/Resource.java new file mode 100644 index 0000000..18aa847 --- /dev/null +++ b/src/main/java/derms/replica/replica2/Resource.java @@ -0,0 +1,39 @@ +package derms.replica.replica2; + +import java.io.Serializable; + +public class Resource implements Serializable { + public ResourceID id; + public ResourceName name; + public int duration; + public boolean isBorrowed; + public CoordinatorID borrower; + public int borrowDuration; + + public Resource(ResourceID id, ResourceName name, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) { + this.id = id; + this.name = name; + this.duration = duration; + this.isBorrowed = isBorrowed; + this.borrower = borrower; + this.borrowDuration = borrowDuration; + } + + public Resource(ResourceID id, ResourceName name, int duration) { + this(id, name, duration, false, new CoordinatorID(), -1); + } + + public Resource() { + this(new ResourceID(), ResourceName.AMBULANCE, 0); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return id+" "+duration; + } +} diff --git a/src/main/java/derms/replica/replica2/ResourceAvailability.java b/src/main/java/derms/replica/replica2/ResourceAvailability.java new file mode 100644 index 0000000..8eacbf2 --- /dev/null +++ b/src/main/java/derms/replica/replica2/ResourceAvailability.java @@ -0,0 +1,128 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.logging.Logger; + +public class ResourceAvailability { + public static final int port = 5556; + + public static class Client implements Runnable { + private InetAddress serverAddr; + private ResourceName request; + private Collection resources; + private Logger log; + + public Client(InetAddress serverAddr, ResourceName request, Collection response) throws IOException { + this.serverAddr = serverAddr; + this.request = request; + this.resources = response; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Error binding socket: "+e.getMessage()); + return; + } + log.fine("Created socket"); + + DatagramPacket reqPkt; + try { + reqPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + log.severe("Error creating request: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(reqPkt); + } catch (IOException e) { + log.severe("Error sending request: "+e.getMessage()); + sock.close(); + return; + } + log.fine("Sent request"); + + Resource[] response; + try { + response = ResourceTransfer.receive(sock); + } catch (IOException e) { + log.severe(e.getMessage()); + return; + } finally { + sock.close(); + } + + for (Resource resource : response) { + resources.add(resource); + } + } + } + + public static class Server implements Runnable { + public static final int bufsize = 1024; + + private InetAddress localAddr; + private Resources resources; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + + DatagramPacket request = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(request); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + + ResourceName requestedName = null; + try { + requestedName = ObjectPacket.deserialize(request, ResourceName.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + log.info("Got request: "+requestedName); + + Resource[] response = resources.getByName(requestedName); + try { + ResourceTransfer.send(response, request.getSocketAddress()); + } catch (IOException e) { + log.warning("Error transfering resources: "+e.getMessage()); + } + } + } finally { + sock.close(); + } + } + } +} diff --git a/src/main/java/derms/replica/replica2/ResourceID.java b/src/main/java/derms/replica/replica2/ResourceID.java new file mode 100644 index 0000000..8081ace --- /dev/null +++ b/src/main/java/derms/replica/replica2/ResourceID.java @@ -0,0 +1,49 @@ +package derms.replica.replica2; + +import java.io.Serializable; + +public class ResourceID implements Serializable { + public String city; + public short num; + + public ResourceID (String city, short num) { + this.city = city; + this.num = num; + } + + public ResourceID() { + this("XXX", (short) 1111); + } + + public static ResourceID parse(String s) throws IllegalArgumentException { + if (s.length() != City.codeLen+ID.nDigits) { + throw new IllegalArgumentException("invalid resource ID: "+s); + } + try { + String cityCode = s.substring(0, City.codeLen); + short num = Short.parseShort(s.substring(City.codeLen)); + return new ResourceID(cityCode, num); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid resource ID: "+e.getMessage()); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + ResourceID other = (ResourceID) obj; + return (this.city.equals(other.city)) && (this.num == other.num); + } + + @Override + public int hashCode() { + return city.hashCode() * num; + } + + @Override + public String toString() { + return city+num; + } +} diff --git a/src/main/java/derms/replica/replica2/ResourceName.java b/src/main/java/derms/replica/replica2/ResourceName.java new file mode 100644 index 0000000..f4c315e --- /dev/null +++ b/src/main/java/derms/replica/replica2/ResourceName.java @@ -0,0 +1,18 @@ +package derms.replica.replica2; + +import java.io.Serializable; + +public enum ResourceName implements Serializable { + AMBULANCE, + FIRETRUCK, + PERSONNEL; + + public static ResourceName parse(String s) { + switch (s) { + case "AMBULANCE": return ResourceName.AMBULANCE; + case "FIRETRUCK": return ResourceName.FIRETRUCK; + case "PERSONNEL": return ResourceName.PERSONNEL; + } + throw new IllegalArgumentException("invalid resource name: "+s); + } +} diff --git a/src/main/java/derms/replica/replica2/ResourceTransfer.java b/src/main/java/derms/replica/replica2/ResourceTransfer.java new file mode 100644 index 0000000..98750c7 --- /dev/null +++ b/src/main/java/derms/replica/replica2/ResourceTransfer.java @@ -0,0 +1,50 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; + +public class ResourceTransfer { + public static final int bufsize = 1024; + + public static void send(Resource[] resources, SocketAddress remoteAddr) throws IOException { + DatagramSocket sock = new DatagramSocket(); + + for (Resource resource : resources) { + DatagramPacket pkt = ObjectPacket.create(resource, remoteAddr); + sock.send(pkt); + } + + DatagramPacket pkt = ObjectPacket.create(new EndOfTransmission(), remoteAddr); + sock.send(pkt); + sock.close(); + } + + public static Resource[] receive(DatagramSocket sock) throws IOException { + List resources = new ArrayList(); + byte[] buf = new byte[bufsize]; + DatagramPacket response = new DatagramPacket(buf, buf.length); + + for (;;) { + sock.receive(response); + + Object obj = ObjectPacket.deserialize(response, Object.class); + if (obj.getClass() == EndOfTransmission.class) { + break; + } + try { + resources.add((Resource) obj); + } catch (Exception e) { + throw new IOException("expected Resource; got "+obj.getClass().toString()); + } + } + Resource[] arr = new Resource[0]; + return resources.toArray(arr); + } + + private static class EndOfTransmission implements Serializable {} +} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica2/Resources.java b/src/main/java/derms/replica/replica2/Resources.java new file mode 100644 index 0000000..4341def --- /dev/null +++ b/src/main/java/derms/replica/replica2/Resources.java @@ -0,0 +1,74 @@ +package derms.replica.replica2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +public class Resources { + private Map> resources; + + public Resources() { + this.resources = new ConcurrentHashMap>(); + } + + public List borrowed(CoordinatorID borrower, ResourceName name) { + List borrowed = new ArrayList(); + Resource[] namedResources = getByName(name); + for (Resource r : namedResources) { + if (r.isBorrowed && r.borrower.equals(borrower)) { + borrowed.add(r); + } + } + return borrowed; + } + + public Resource getByID(ResourceID id) throws NoSuchElementException { + for (Map rids : resources.values()) { + Resource resource = rids.get(id); + if (resource != null) { + return resource; + } + } + throw new NoSuchElementException("No such resource "+id); + } + + public Resource[] getByName(ResourceName name) { + Map rids = resources.get(name); + if (rids == null) { + return new Resource[0]; + } + Resource[] r = new Resource[0]; + return rids.values().toArray(r); + } + + public void add(Resource r) { + Map rids; + synchronized (resources) { + rids = resources.get(r.name); + if (rids == null) { + rids = new ConcurrentHashMap(); + resources.put(r.name, rids); + } + } + synchronized (rids) { + Resource existing = rids.get(r.id); + if (existing != null) { + existing.duration += r.duration; + } else { + rids.put(r.id, r); + } + } + } + + public void removeByID(ResourceID id) throws NoSuchElementException { + for (Map rids : resources.values()) { + if (rids.containsKey(id)) { + rids.remove(id); + return; + } + } + throw new NoSuchElementException("No such resource "+id); + } +} diff --git a/src/main/java/derms/replica/replica2/ResponderID.java b/src/main/java/derms/replica/replica2/ResponderID.java new file mode 100644 index 0000000..5d342c9 --- /dev/null +++ b/src/main/java/derms/replica/replica2/ResponderID.java @@ -0,0 +1,16 @@ +package derms.replica.replica2; + +public class ResponderID { + public City city; + public short num; + + public ResponderID(City city, short num) { + this.city = city; + this.num = num; + } + + @Override + public String toString() { + return ""+city+"R"+num; + } +} diff --git a/src/main/java/derms/replica/replica2/ResponderServer.java b/src/main/java/derms/replica/replica2/ResponderServer.java new file mode 100644 index 0000000..0eb4544 --- /dev/null +++ b/src/main/java/derms/replica/replica2/ResponderServer.java @@ -0,0 +1,94 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Collection; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class ResponderServer { + public static final Duration timeout = Duration.ofSeconds(5); + + private City city; + private Resources resources; + private Servers servers; + private Logger log; + + public ResponderServer(City city, Resources resources, Servers servers) throws IOException { + this.city = city; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + public ResponderServer() throws IOException { + this(new City(), new Resources(), new Servers()); + } + + public void addResource(Resource r) { + resources.add(r); + log.info("Added resource "+r+" - success"); + } + + public void removeResource(ResourceID rid, int duration) throws NoSuchResourceException { + log.info("Remove duration "+duration+" from "+rid); + try { + Resource resource = resources.getByID(rid); + synchronized (resource) { + if (duration < 0 || duration >= resource.duration) { + resources.removeByID(rid); + log.info("Removed resource "+rid+" - success"); + } else { + resource.duration -= duration; + log.info("Removed duration from resource "+rid+". New duration: "+resource.duration+" - success"); + } + } + } catch (NoSuchElementException e) { + // Wanted to remove duration from resource. + if (duration >= 0) { + String msg = "Cannot remove duration from "+rid+": resource does not exist."; + log.warning(msg); + throw new NoSuchResourceException(msg); + } + + // Duration is negative---wanted to remove resource completely. + // Success because it is already removed. + log.info("Not removing "+rid+": resource does not exist."); + } + } + + public Resource[] listResourceAvailability(ResourceName rname) throws ServerCommunicationError { + log.info("Request for available "+rname); + Collection availableResources = ConcurrentHashMap.newKeySet(); + ExecutorService pool = Executors.newFixedThreadPool(servers.size()); + try { + for (InetAddress serverAddr : servers.all()) { + pool.execute(new ResourceAvailability.Client(serverAddr, rname, availableResources)); + } + } catch (IOException e) { + String msg = "Failed to start ResourceAvailability Client: "+e.getMessage(); + log.severe(msg); + throw new ServerCommunicationError("ResourceAvailability: "+msg); + } + + log.fine("Started worker threads"); + pool.shutdown(); + boolean terminated; + try { + terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new ServerCommunicationError("listResourceAvailability(): listResourceAvailability() interrupted: "+e.getMessage()); + } + if (!terminated) { + throw new ServerCommunicationError("ResourceAvailability: request timed out: no response after "+timeout.toString()); + } + log.info("Response length "+availableResources.size()); + Resource[] arr = new Resource[0]; + return availableResources.toArray(arr); + } +} diff --git a/src/main/java/derms/replica/replica2/ReturnResource.java b/src/main/java/derms/replica/replica2/ReturnResource.java new file mode 100644 index 0000000..0b45db5 --- /dev/null +++ b/src/main/java/derms/replica/replica2/ReturnResource.java @@ -0,0 +1,195 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class ReturnResource { + public static final int port = 5559; + public static final int bufsize = 4096; + + public static class Client { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + + public Client(CoordinatorID cid, ResourceID rid) { + this.coordinatorID = cid; + this.resourceID = rid; + } + + public Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(coordinatorID, resourceID); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Return Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Return Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufsize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Return Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Return Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + public static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString() + +": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.fine("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources)); + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + + private Request(CoordinatorID cid, ResourceID rid) { + this.coordinatorID = cid; + this.resourceID = rid; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + + private RequestHandler(Request request, SocketAddress client, Resources resources) { + this.request = request; + this.client = client; + this.resources = resources; + } + + @Override + public void run() { + Response response = returnResource(); + + DatagramSocket sock; + try { + sock= new DatagramSocket(); + } catch (Exception e) { + System.err.println("Request Resource Server: failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + System.err.println("Request Resource Server: failed to create response packet: "+e.getMessage()); + sock.close(); + return; + } + try { + sock.send(pkt); + } catch (Exception e) { + System.err.println("Request Resource Server: failed to send response: "+e.getMessage()); + } + sock.close(); + } + + private Response returnResource() { + try { + Resource resource = resources.getByID(request.resourceID); + synchronized (resource) { + if (!resource.isBorrowed || !resource.borrower.equals(request.coordinatorID)) { + return new Response(Response.Status.NOT_BORROWED, + request.resourceID+" is not borrowed by "+request.coordinatorID); + } + resource.isBorrowed = false; + resource.borrower = new CoordinatorID(); + resource.borrowDuration = -1; + return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id); + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); + } + } + } + + public static class Response implements Serializable { + public Status status; + public String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + public enum Status { + SUCCESS, NO_SUCH_RESOURCE, NOT_BORROWED + } + } +} diff --git a/src/main/java/derms/replica/replica2/ServerCommunicationError.java b/src/main/java/derms/replica/replica2/ServerCommunicationError.java new file mode 100644 index 0000000..bde3639 --- /dev/null +++ b/src/main/java/derms/replica/replica2/ServerCommunicationError.java @@ -0,0 +1,7 @@ +package derms.replica.replica2; + +public class ServerCommunicationError extends Exception { + public ServerCommunicationError(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica2/Servers.java b/src/main/java/derms/replica/replica2/Servers.java new file mode 100644 index 0000000..4d24745 --- /dev/null +++ b/src/main/java/derms/replica/replica2/Servers.java @@ -0,0 +1,34 @@ +package derms.replica.replica2; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class Servers { + private Map servers = new ConcurrentHashMap(); + + /** Returns the address of the server located in the specified city, or null if there is no server in the city. */ + public InetAddress get(City city) { + return servers.get(city); + } + + /** + * Associates the specified server address with the specified city. + * If there was already a server associated with the city, the old value is replaced. + * @param city the city where the server is located + * @param addr the address of the server + * @return the previous server address, or null if there was no server associated with this city. + */ + public InetAddress put(City city, InetAddress addr) { + return servers.put(city, addr); + } + + public Collection all() { + return servers.values(); + } + + public int size() { + return servers.size(); + } +} diff --git a/src/main/java/derms/replica/replica2/StationServer.java b/src/main/java/derms/replica/replica2/StationServer.java new file mode 100644 index 0000000..12525f7 --- /dev/null +++ b/src/main/java/derms/replica/replica2/StationServer.java @@ -0,0 +1,136 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class StationServer implements Runnable { + public static final String usage = "Usage: java StationServer "; + public static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); + + public City city; + public InetAddress localAddr; + public Resources resources; + public Servers servers; + private Logger log; + private ResponderServer responderServer; + private CoordinatorServer coordinatorServer; + + public StationServer(City city, InetAddress localAddr) throws IOException { + this.city = city; + this.localAddr = localAddr; + this.resources = new Resources(); + this.servers = new Servers(); + this.log = DermsLogger.getLogger(getClass()); + + try { + this.responderServer = new ResponderServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create ResponderServer: "+e.getMessage()); + } + log.info("Created ResponderServer"); + + try { + this.coordinatorServer = new CoordinatorServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create CoordinatorServer: "+e.getMessage()); + } + log.info("Created CoordinatorServer"); + } + + public static void main(String cmdlineArgs[]) { + Args args = null; + try { + args = new Args(cmdlineArgs); + } catch (IllegalArgumentException e) { + System.err.println(e.getMessage()); + System.out.println(usage); + System.exit(1); + } + + try { + (new StationServer(args.city, args.localAddr)).run(); + } catch (Exception e) { + System.err.println(e.getMessage()); + System.exit(1); + } + } + + @Override + public void run() { + log.info("Running"); + log.config("Local address is "+localAddr.toString()); + + ExecutorService pool = Executors.newCachedThreadPool(); + + try { + pool.execute(new ResourceAvailability.Server(localAddr, resources)); + } catch (IOException e) { + String msg = "Failed to start ResourceAvailability Server: "+e.getMessage(); + log.severe(msg); + return; + } + try { + pool.execute(new RequestResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start RequestResource Server: "+e.getMessage()); + return; + } + try { + pool.execute(new FindResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start FindResource Server: "+e.getMessage()); + return; + } + try { + pool.execute(new ReturnResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start ReturnResource Server: "+e.getMessage()); + return; + } + try { + pool.execute(new SwapResource.Server(localAddr, resources, servers)); + } catch (IOException e) { + log.severe("Failed to start SwapResource Server: "+e.getMessage()); + return; + } + + try { + pool.execute(new Announcer(announceGroup, localAddr, city)); + } catch (IOException e) { + log.severe("Failed to start Announcer: "+e.getMessage()); + return; + } + try { + pool.execute(new AnnounceListener(announceGroup, localAddr, servers)); + } catch (IOException e) { + log.severe("Failed to start AnnounceListener: "+e.getMessage()); + return; + } + } + + private static class Args { + private City city; + private InetAddress localAddr; + + private Args(String[] args) throws IllegalArgumentException { + if (args.length < 1) { + throw new IllegalArgumentException("Missing argument 'city'"); + } + city = new City(args[0]); + + if (args.length < 2) { + throw new IllegalArgumentException("Missing argument 'local host'"); + } + try { + localAddr = InetAddress.getByName(args[1]); + } catch (UnknownHostException | SecurityException e) { + throw new IllegalArgumentException("Bad value of 'local host': "+e.getMessage()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica/replica2/SwapResource.java b/src/main/java/derms/replica/replica2/SwapResource.java new file mode 100644 index 0000000..a3b88e8 --- /dev/null +++ b/src/main/java/derms/replica/replica2/SwapResource.java @@ -0,0 +1,262 @@ +package derms.replica.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class SwapResource { + public static final int port = 5560; + public static final int bufSize = 4096; + + public static class Client { + private CoordinatorID cid; + private ResourceID oldRID; + private ResourceID newRID; + + public Client(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { + this.cid = cid; + this.oldRID = oldRID; + this.newRID = newRID; + } + + public Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(cid, oldRID, newRID); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Swap Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Swap Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufSize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Swap Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Swap Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + public static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private Servers servers; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources, Servers servers) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.servers = servers; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr+": "+e.getMessage()); + return; + } + log.info("Listening on "+localAddr+":"+port); + + DatagramPacket requestPkt = new DatagramPacket(new byte[bufSize], bufSize); + + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.fine("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + SocketAddress client = requestPkt.getSocketAddress(); + try { + RequestHandler handler = new RequestHandler(request, client, resources, servers); + pool.execute(handler); + } catch (IOException e) { + log.warning("Failed to create request handler: "+e.getMessage()); + continue; + } + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID cid; + private ResourceID oldRID; + private ResourceID newRID; + + private Request(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { + this.cid = cid; + this.oldRID = oldRID; + this.newRID = newRID; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Servers servers; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Servers servers) throws IOException { + this.request = request; + this.client = client; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + Response response = swapResources(); + + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + log.severe("failed to create response: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(pkt); + } catch (Exception e) { + log.severe("failed to send response: "+e.getMessage()); + } finally { + sock.close(); + } + } + + private Response swapResources() { + try { + Resource resource = resources.getByID(request.oldRID); + synchronized (resource) { + if (!resource.borrower.equals(request.cid)) { + return new Response(Response.Status.NOT_BORROWED, "resource "+request.oldRID+" not borrowed by "+request.cid); + } + try { + acquireNewResource(resource.borrowDuration); + returnOldResource(resource); + return new Response(Response.Status.SUCCESS, request.cid+" success fully swapped "+request.oldRID+" for "+request.newRID); + } catch (UnknownHostException e) { + return new Response(Response.Status.UNKNOWN_HOST, e.getMessage()); + } catch (IOException e) { + return new Response(Response.Status.FAILURE, e.getMessage()); + } catch (CannotBorrow e) { + return new Response(Response.Status.CANNOT_BORROW, e.getMessage()); + } + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.oldRID); + } + } + + private void acquireNewResource(int borrowDuration) throws UnknownHostException, IOException, CannotBorrow { + RequestResource.Client requestClient = new RequestResource.Client(request.cid, request.newRID, borrowDuration); + City city = new City(request.newRID.city); + InetAddress requestResourceServer = servers.get(city); + if (requestResourceServer == null) { + throw new UnknownHostException(city.toString()); + } + RequestResource.Response response = requestClient.sendRequest(requestResourceServer); + // TODO: make exception handling more granular---pass through status from Request. + if (response.status != RequestResource.Response.Status.SUCCESS) { + throw new CannotBorrow(request.cid, request.newRID, response.message); + } + } + + private void returnOldResource(Resource r) { + r.isBorrowed = false; + r.borrower = new CoordinatorID(); + r.borrowDuration = -1; + } + } + + public static class Response implements Serializable { + public Status status; + public String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + public enum Status { + SUCCESS, + FAILURE, + NO_SUCH_RESOURCE, + NOT_BORROWED, + CANNOT_BORROW, + UNKNOWN_HOST + } + } + + private static class CannotBorrow extends Exception { + CoordinatorID attemptedBorrower; + ResourceID rid; + String message; + + private CannotBorrow(CoordinatorID attemptedBorrower, ResourceID rid, String message) { + this.attemptedBorrower = attemptedBorrower; + this.rid = rid; + this.message = message; + } + + @Override + public String getMessage() { + return attemptedBorrower+" failed to borrow "+rid+": "+message; + } + } +} \ No newline at end of file -- cgit v1.2.3