From d267dd1dda606f0c56d8afaa7187485e60ebfd86 Mon Sep 17 00:00:00 2001 From: Sam Anthony Date: Thu, 28 Nov 2024 17:32:28 -0500 Subject: move replica2 to top level --- .../derms/replica2/AlreadyBorrowedException.java | 7 + src/main/java/derms/replica2/AnnounceListener.java | 81 +++++++ src/main/java/derms/replica2/Announcer.java | 69 ++++++ src/main/java/derms/replica2/City.java | 38 +++ src/main/java/derms/replica2/CoordinatorID.java | 39 +++ .../java/derms/replica2/CoordinatorServer.java | 170 +++++++++++++ src/main/java/derms/replica2/DermsLogger.java | 23 ++ src/main/java/derms/replica2/FindResource.java | 160 +++++++++++++ src/main/java/derms/replica2/Hosts.java | 26 ++ src/main/java/derms/replica2/ID.java | 5 + .../derms/replica2/InvalidDurationException.java | 7 + .../derms/replica2/NoSuchResourceException.java | 7 + .../java/derms/replica2/NotBorrowedException.java | 7 + src/main/java/derms/replica2/ObjectPacket.java | 33 +++ src/main/java/derms/replica2/Replica2.java | 237 +++++++++++++++++++ src/main/java/derms/replica2/RequestResource.java | 223 ++++++++++++++++++ src/main/java/derms/replica2/Resource.java | 39 +++ .../java/derms/replica2/ResourceAvailability.java | 128 ++++++++++ src/main/java/derms/replica2/ResourceID.java | 49 ++++ src/main/java/derms/replica2/ResourceTransfer.java | 50 ++++ src/main/java/derms/replica2/ResourceType.java | 18 ++ src/main/java/derms/replica2/Resources.java | 74 ++++++ src/main/java/derms/replica2/ResponderID.java | 16 ++ src/main/java/derms/replica2/ResponderServer.java | 94 ++++++++ src/main/java/derms/replica2/ReturnResource.java | 195 +++++++++++++++ .../derms/replica2/ServerCommunicationError.java | 7 + src/main/java/derms/replica2/Servers.java | 34 +++ src/main/java/derms/replica2/SwapResource.java | 262 +++++++++++++++++++++ 28 files changed, 2098 insertions(+) create mode 100644 src/main/java/derms/replica2/AlreadyBorrowedException.java create mode 100644 src/main/java/derms/replica2/AnnounceListener.java create mode 100644 src/main/java/derms/replica2/Announcer.java create mode 100644 src/main/java/derms/replica2/City.java create mode 100644 src/main/java/derms/replica2/CoordinatorID.java create mode 100644 src/main/java/derms/replica2/CoordinatorServer.java create mode 100644 src/main/java/derms/replica2/DermsLogger.java create mode 100644 src/main/java/derms/replica2/FindResource.java create mode 100644 src/main/java/derms/replica2/Hosts.java create mode 100644 src/main/java/derms/replica2/ID.java create mode 100644 src/main/java/derms/replica2/InvalidDurationException.java create mode 100644 src/main/java/derms/replica2/NoSuchResourceException.java create mode 100644 src/main/java/derms/replica2/NotBorrowedException.java create mode 100644 src/main/java/derms/replica2/ObjectPacket.java create mode 100644 src/main/java/derms/replica2/Replica2.java create mode 100644 src/main/java/derms/replica2/RequestResource.java create mode 100644 src/main/java/derms/replica2/Resource.java create mode 100644 src/main/java/derms/replica2/ResourceAvailability.java create mode 100644 src/main/java/derms/replica2/ResourceID.java create mode 100644 src/main/java/derms/replica2/ResourceTransfer.java create mode 100644 src/main/java/derms/replica2/ResourceType.java create mode 100644 src/main/java/derms/replica2/Resources.java create mode 100644 src/main/java/derms/replica2/ResponderID.java create mode 100644 src/main/java/derms/replica2/ResponderServer.java create mode 100644 src/main/java/derms/replica2/ReturnResource.java create mode 100644 src/main/java/derms/replica2/ServerCommunicationError.java create mode 100644 src/main/java/derms/replica2/Servers.java create mode 100644 src/main/java/derms/replica2/SwapResource.java (limited to 'src/main/java/derms/replica2') diff --git a/src/main/java/derms/replica2/AlreadyBorrowedException.java b/src/main/java/derms/replica2/AlreadyBorrowedException.java new file mode 100644 index 0000000..e1146e7 --- /dev/null +++ b/src/main/java/derms/replica2/AlreadyBorrowedException.java @@ -0,0 +1,7 @@ +package derms.replica2; + +class AlreadyBorrowedException extends Exception { + AlreadyBorrowedException(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica2/AnnounceListener.java b/src/main/java/derms/replica2/AnnounceListener.java new file mode 100644 index 0000000..dd21b8d --- /dev/null +++ b/src/main/java/derms/replica2/AnnounceListener.java @@ -0,0 +1,81 @@ +package derms.replica2; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.*; +import java.util.logging.Logger; + +class AnnounceListener implements Runnable { + private static final int bufsize = 1024; + + private InetSocketAddress groupAddr; + private InetAddress localAddr; + private Servers servers; + private Logger log; + + AnnounceListener(InetSocketAddress groupAddr, InetAddress localAddr, Servers servers) throws IOException { + this.groupAddr = groupAddr; + this.localAddr = localAddr; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + NetworkInterface netInterface = null; + try { + netInterface = NetworkInterface.getByInetAddress(localAddr); + if (netInterface == null) { + throw new Exception("netInterface is null"); + } + } catch (Exception e) { + log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + MulticastSocket sock = null; + try { + sock = new MulticastSocket(groupAddr.getPort()); + sock.joinGroup(groupAddr, netInterface); + } catch (Exception e) { + log.severe("Failed to open multicast socket: "+e.getMessage()); + return; + } + + log.info("Listening to "+groupAddr.toString()+" from "+localAddr.toString()+" ("+netInterface.getName()+")"); + byte[] buf = new byte[bufsize]; + DatagramPacket pkt = new DatagramPacket(buf, buf.length); + + for (;;) { + try { + sock.receive(pkt); + } catch (Exception e) { + log.warning("Error receiving from multicast socket: "+e.getMessage()); + continue; + } + + ObjectInputStream objStream; + try { + objStream = new ObjectInputStream( + new ByteArrayInputStream(pkt.getData())); + } catch (IOException e) { + log.warning("Failed to create input stream: "+e.getMessage()); + continue; + } + + City city; + try { + city = (City) objStream.readObject(); + } catch (Exception e) { + log.warning("Failed to deserialize data: "+e.getMessage()); + continue; + } + + InetAddress remote = pkt.getAddress(); + if (servers.put(city, remote) == null) { + log.info("Added remote server "+city.toString()+" "+remote.toString()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica2/Announcer.java b/src/main/java/derms/replica2/Announcer.java new file mode 100644 index 0000000..508349e --- /dev/null +++ b/src/main/java/derms/replica2/Announcer.java @@ -0,0 +1,69 @@ +package derms.replica2; + +import java.io.IOException; +import java.net.*; +import java.util.logging.Logger; + +class Announcer implements Runnable { + static final long intervalMillis = 3000; + + private SocketAddress group; + private InetAddress localAddr; + private City city; + private Logger log; + + Announcer(SocketAddress group, InetAddress localAddr, City city) throws IOException { + this.group = group; + this.localAddr = localAddr; + this.city = city; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + NetworkInterface netInterface = null; + try { + netInterface = NetworkInterface.getByInetAddress(localAddr); + if (netInterface == null) { + throw new Exception("netInterface is null"); + } + } catch (Exception e) { + log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + MulticastSocket sock = null; + try { + sock = new MulticastSocket(); + sock.joinGroup(group, netInterface); + } catch (Exception e) { + log.severe("Failed to open multicast socket: "+e.getMessage()); + return; + } + + log.info("Announcing from "+localAddr.toString()+" ("+netInterface.getName()+") to "+group.toString()); + + DatagramPacket pkt = null; + try { + pkt = ObjectPacket.create(city, group); + } catch (IOException e) { + log.severe("Failed to create packet: "+e.getMessage()); + sock.close(); + return; + } + + try { + for (;;) { + sock.send(pkt); + Thread.sleep(intervalMillis); + } + } catch (IOException e) { + log.severe("Failed to send to multicast socket "+group.toString()+": "+e.getMessage()); + } catch (InterruptedException e) { + log.info("Interrupted."); + } finally { + log.info("Shutting down..."); + sock.close(); + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica2/City.java b/src/main/java/derms/replica2/City.java new file mode 100644 index 0000000..74535ae --- /dev/null +++ b/src/main/java/derms/replica2/City.java @@ -0,0 +1,38 @@ +package derms.replica2; + +import java.io.Serializable; + +class City implements Serializable { + static final int codeLen = 3; + + private String code; + + City(String code) throws IllegalArgumentException { + if (code.length() != codeLen) + throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters"); + this.code = code; + } + + City() { + this("XXX"); + } + + @Override + public String toString() { + return code; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || this.getClass() != obj.getClass()) { + return false; + } + City other = (City) obj; + return this.code.equals(other.code); + } + + @Override + public int hashCode() { + return code.hashCode(); + } +} diff --git a/src/main/java/derms/replica2/CoordinatorID.java b/src/main/java/derms/replica2/CoordinatorID.java new file mode 100644 index 0000000..523ebcf --- /dev/null +++ b/src/main/java/derms/replica2/CoordinatorID.java @@ -0,0 +1,39 @@ +package derms.replica2; + +import java.io.Serializable; + +class CoordinatorID implements Serializable { + String city; + short num; + + CoordinatorID(String city, short num) { + this.city = city; + this.num = num; + } + + static CoordinatorID parse(String str) throws IllegalArgumentException { + if (str.length() != City.codeLen+ID.nDigits) + throw new IllegalArgumentException("illegal coordinator ID: " + str); + + try { + String city = str.substring(0, City.codeLen); + short num = Short.parseShort(str.substring(City.codeLen)); + return new CoordinatorID(city, num); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("illegal coordinator ID '" + str + "': " + e.getMessage()); + } + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() != this.getClass()) + return false; + CoordinatorID other = (CoordinatorID) obj; + return other.city.equals(this.city) && other.num == this.num; + } + + @Override + public String toString() { + return city+"C"+num; + } +} diff --git a/src/main/java/derms/replica2/CoordinatorServer.java b/src/main/java/derms/replica2/CoordinatorServer.java new file mode 100644 index 0000000..0683638 --- /dev/null +++ b/src/main/java/derms/replica2/CoordinatorServer.java @@ -0,0 +1,170 @@ +package derms.replica2; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +class CoordinatorServer { + static final Duration timeout = Duration.ofSeconds(5); + + private City city; + private Resources resources; + private Servers servers; + private Logger log; + + CoordinatorServer(City city, Resources resources, Servers servers) throws IOException { + super(); + this.city = city; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + CoordinatorServer() throws IOException { + this(new City(), new Resources(), new Servers()); + } + + void requestResource(CoordinatorID cid, ResourceID rid, int duration) + throws ServerCommunicationError, NoSuchResourceException, + AlreadyBorrowedException, InvalidDurationException + { + log.info("Request for "+rid+" from "+cid); + + InetAddress server = servers.get(new City(rid.city)); + if (server == null) { + throw new ServerCommunicationError("requestResource(): no connection to server "+rid.city.toString()); + } + + RequestResource.Client client = new RequestResource.Client(cid, rid, duration); + RequestResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + throw new ServerCommunicationError("requestResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info("Request "+rid+" from "+cid+" - success"); + break; + case NO_SUCH_RESOURCE: + log.warning(response.message); + throw new NoSuchResourceException(response.message); + case ALREADY_BORROWED: + log.warning(response.message); + throw new AlreadyBorrowedException(response.message); + case INVALID_DURATION: + log.warning(response.message); + throw new InvalidDurationException(response.message); + default: + log.warning("Unsuccessful response from server: "+response.message); + throw new ServerCommunicationError("requestResource(): failed to borrow resource: "+response.message); + } + } + + Resource[] findResource(CoordinatorID cid, ResourceType rname) throws ServerCommunicationError { + log.info("Find Resource "+rname+" from "+cid); + FindResource.Request request = new FindResource.Request(cid, rname); + Collection response = ConcurrentHashMap.newKeySet(); + ExecutorService pool = Executors.newFixedThreadPool(servers.size()); + try { + for (InetAddress server : servers.all()) { + pool.execute(new FindResource.Client(request, server, response)); + } + } catch (IOException e) { + String msg = "Failed to start FindResource Client: "+e.getMessage(); + log.severe(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + log.fine("Started worker threads"); + pool.shutdown(); + boolean terminated; + try { + terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + String msg = "findResource() interrupted: "+e.getMessage(); + log.warning(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + if (!terminated) { + String msg = "Request timed out: no response after "+timeout.toString(); + log.warning(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + Resource[] arr = new Resource[0]; + arr = response.toArray(arr); + log.info("Find resource "+rname+" from "+cid+" - success. Response length: "+arr.length); + return arr; + } + + void returnResource(CoordinatorID cid, ResourceID rid) + throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException + { + log.info("Return resource "+rid+" from "+cid); + InetAddress server = servers.get(new City(rid.city)); + if (server == null) { + String msg = "no connection to server "+rid.city; + log.warning(msg); + throw new ServerCommunicationError("returnResource(): "+msg); + } + log.fine("server address: "+server); + + ReturnResource.Client client = new ReturnResource.Client(cid, rid); + ReturnResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + log.warning(e.getMessage()); + throw new ServerCommunicationError("returnResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info(cid+" return "+rid+" - success"); + break; + case NO_SUCH_RESOURCE: + log.warning(response.message); + throw new NoSuchResourceException(response.message); + case NOT_BORROWED: + log.warning(response.message); + throw new NotBorrowedException(response.message); + default: + String msg = "Failed to return resource: "+response.message; + log.warning(msg); + throw new ServerCommunicationError("returnResource(): "+msg); + } + } + + void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException { + log.info(cid+": swap "+oldRID+", "+newRID); + + InetAddress server = servers.get(new City(oldRID.city)); + if (server == null) { + String msg = "no connection to server "+oldRID.city; + log.warning(msg); + throw new ServerCommunicationError("swapResource(): "+msg); + } + log.fine("server address: "+server); + + SwapResource.Client client = new SwapResource.Client(cid, oldRID, newRID); + SwapResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + throw new ServerCommunicationError("swapResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info(cid+": swap "+oldRID+", "+newRID+" - success"); + break; + case NO_SUCH_RESOURCE: + throw new NoSuchResourceException(response.message); + default: + throw new ServerCommunicationError("swapResource(): "+response.message); + } + } +} diff --git a/src/main/java/derms/replica2/DermsLogger.java b/src/main/java/derms/replica2/DermsLogger.java new file mode 100644 index 0000000..3e031f7 --- /dev/null +++ b/src/main/java/derms/replica2/DermsLogger.java @@ -0,0 +1,23 @@ +package derms.replica2; + +import java.io.IOException; +import java.util.logging.FileHandler; +import java.util.logging.Handler; +import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; + +class DermsLogger { + static final String logFile = "server.log"; + + private static Logger log = null; + + static Logger getLogger(Class clazz) throws IOException { + if (log == null) { + log = Logger.getLogger(clazz.getName()); + Handler fileHandler = new FileHandler(logFile); + fileHandler.setFormatter(new SimpleFormatter()); + log.addHandler(fileHandler); + } + return log; + } +} diff --git a/src/main/java/derms/replica2/FindResource.java b/src/main/java/derms/replica2/FindResource.java new file mode 100644 index 0000000..8ba642f --- /dev/null +++ b/src/main/java/derms/replica2/FindResource.java @@ -0,0 +1,160 @@ +package derms.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +class FindResource { + static final int port = 5558; + + static class Client implements Runnable { + private InetAddress serverAddr; + private Request request; + private Collection response; + private Logger log; + + Client(Request request, InetAddress serverAddr, Collection response) throws IOException { + this.serverAddr = serverAddr; + this.request = request; + this.response = response; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + log.severe("Failed to create request packet: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(requestPkt); + } catch (Exception e) { + log.severe("Failed to send request: "+e.getMessage()); + sock.close(); + return; + } + + Resource[] resources; + try { + resources = ResourceTransfer.receive(sock); + } catch (IOException e) { + log.severe(e.getMessage()); + return; + } finally { + sock.close(); + } + + for (Resource r : resources) { + response.add(r); + } + } + } + + static class Server implements Runnable { + private static final int bufsize = 4096; + + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket: "+e.getMessage()); + return; + } + + log.info("Running on "+localAddr.toString()+":"+port); + + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.info("Got request"); + + Request request; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); + } + } finally { + sock.close(); + } + } + } + + static class Request implements Serializable { + private CoordinatorID cid; + private ResourceType rname; + + Request(CoordinatorID cid, ResourceType rname) { + this.cid = cid; + this.rname = rname; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { + this.request = request; + this.client = client; + this.resources = resources; + this.log = log; + } + + @Override + public void run() { + List borrowedResources = resources.borrowed(request.cid, request.rname); + log.info(""+borrowedResources.size()+" "+request.rname+" resources borrowed by "+request.cid); + try { + Resource[] arr = new Resource[0]; + ResourceTransfer.send(borrowedResources.toArray(arr), client); + } catch (IOException e) { + log.severe("Failed to send response: "+e.getMessage()); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica2/Hosts.java b/src/main/java/derms/replica2/Hosts.java new file mode 100644 index 0000000..1392b15 --- /dev/null +++ b/src/main/java/derms/replica2/Hosts.java @@ -0,0 +1,26 @@ +package derms.replica2; + +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +class Hosts { + private static Map hosts = null; + + static String get(City city) throws UnknownHostException { + if (hosts == null) + init(); + + String host = hosts.get(city); + if (host == null) + throw new UnknownHostException("unknown host: "+city); + return host; + } + + private static void init() { + hosts = new HashMap(); + hosts.put(new City("MTL"), "alpine1"); + hosts.put(new City("QUE"), "alpine2"); + hosts.put(new City("SHE"), "alpine3"); + } +} diff --git a/src/main/java/derms/replica2/ID.java b/src/main/java/derms/replica2/ID.java new file mode 100644 index 0000000..a61888b --- /dev/null +++ b/src/main/java/derms/replica2/ID.java @@ -0,0 +1,5 @@ +package derms.replica2; + +class ID { + static final int nDigits = 4; +} diff --git a/src/main/java/derms/replica2/InvalidDurationException.java b/src/main/java/derms/replica2/InvalidDurationException.java new file mode 100644 index 0000000..2596cb9 --- /dev/null +++ b/src/main/java/derms/replica2/InvalidDurationException.java @@ -0,0 +1,7 @@ +package derms.replica2; + +class InvalidDurationException extends Exception { + InvalidDurationException (String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica2/NoSuchResourceException.java b/src/main/java/derms/replica2/NoSuchResourceException.java new file mode 100644 index 0000000..c1eb8c8 --- /dev/null +++ b/src/main/java/derms/replica2/NoSuchResourceException.java @@ -0,0 +1,7 @@ +package derms.replica2; + +class NoSuchResourceException extends Exception { + NoSuchResourceException(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica2/NotBorrowedException.java b/src/main/java/derms/replica2/NotBorrowedException.java new file mode 100644 index 0000000..aff0661 --- /dev/null +++ b/src/main/java/derms/replica2/NotBorrowedException.java @@ -0,0 +1,7 @@ +package derms.replica2; + +class NotBorrowedException extends Exception { + NotBorrowedException (String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica2/ObjectPacket.java b/src/main/java/derms/replica2/ObjectPacket.java new file mode 100644 index 0000000..27852b0 --- /dev/null +++ b/src/main/java/derms/replica2/ObjectPacket.java @@ -0,0 +1,33 @@ +package derms.replica2; + +import java.io.*; +import java.net.DatagramPacket; +import java.net.SocketAddress; + +class ObjectPacket { + static DatagramPacket create(Serializable obj, SocketAddress remoteAddr) throws IOException { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ObjectOutputStream objStream = new ObjectOutputStream(byteStream); + objStream.writeObject(obj); + objStream.flush(); + byte[] buf = byteStream.toByteArray(); + objStream.close(); + return new DatagramPacket(buf, buf.length, remoteAddr); + } + + static E deserialize(DatagramPacket pkt, Class clazz) throws IOException { + ObjectInputStream objectStream; + try { + objectStream = new ObjectInputStream( + new ByteArrayInputStream(pkt.getData())); + } catch (Exception e) { + throw new IOException("failed to create input stream: "+e.getMessage()); + } + + try { + return clazz.cast(objectStream.readObject()); + } catch (Exception e) { + throw new IOException(e.getMessage()); + } + } +} \ No newline at end of file diff --git a/src/main/java/derms/replica2/Replica2.java b/src/main/java/derms/replica2/Replica2.java new file mode 100644 index 0000000..25609cc --- /dev/null +++ b/src/main/java/derms/replica2/Replica2.java @@ -0,0 +1,237 @@ +package derms.replica2; + +import derms.Replica; +import derms.Request; +import derms.Response; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class Replica2 implements Replica { + static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); + + final City city; + final InetAddress localAddr; + final Resources resources; + final Servers servers; + private final Logger log; + private final ResponderServer responderServer; + private final CoordinatorServer coordinatorServer; + private boolean alive; + + public Replica2(City city) throws IOException { + this.city = city; + this.localAddr = InetAddress.getLocalHost(); + this.resources = new Resources(); + this.servers = new Servers(); + this.log = DermsLogger.getLogger(getClass()); + + try { + this.responderServer = new ResponderServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create ResponderServer: "+e.getMessage()); + } + log.info("Created ResponderServer"); + + try { + this.coordinatorServer = new CoordinatorServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create CoordinatorServer: "+e.getMessage()); + } + log.info("Created CoordinatorServer"); + + log.info("Running"); + log.config("Local address is "+localAddr.toString()); + + ExecutorService pool = Executors.newCachedThreadPool(); + + try { + pool.execute(new ResourceAvailability.Server(localAddr, resources)); + } catch (IOException e) { + String msg = "Failed to start ResourceAvailability Server: "+e.getMessage(); + log.severe(msg); + throw e; + } + try { + pool.execute(new RequestResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start RequestResource Server: "+e.getMessage()); + throw e; + } + try { + pool.execute(new FindResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start FindResource Server: "+e.getMessage()); + throw e; + } + try { + pool.execute(new ReturnResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start ReturnResource Server: "+e.getMessage()); + throw e; + } + try { + pool.execute(new SwapResource.Server(localAddr, resources, servers)); + } catch (IOException e) { + log.severe("Failed to start SwapResource Server: "+e.getMessage()); + throw e; + } + + try { + pool.execute(new Announcer(announceGroup, localAddr, city)); + } catch (IOException e) { + log.severe("Failed to start Announcer: "+e.getMessage()); + throw e; + } + try { + pool.execute(new AnnounceListener(announceGroup, localAddr, servers)); + } catch (IOException e) { + log.severe("Failed to start AnnounceListener: "+e.getMessage()); + throw e; + } + + this.alive = true; + } + + @Override + public boolean isAlive() { return alive; } + + @Override + public void startProcess() { + // TODO + log.info(getClass().getSimpleName() + " started."); + } + + @Override + public void processRequest(Request request) { + log.info(request.toString()); + + String status = ""; + try { + switch (request.getFunction()) { + case "addResource": + status = addResource(request); + break; + case "removeResource": + status = removeResource(request); + break; + case "listResourceAvailability": + status = listResourceAvailability(request); + break; + case "requestResource": + status = requestResource(request); + break; + case "findResource": + status = findResource(request); + break; + case "returnResource": + status = returnResource(request); + break; + case "swapResource": + status = swapResource(request); + break; + default: + status = "Failure: unknown function '" + request.getFunction() + "'"; + } + } catch (Exception e) { + log.warning(e.getMessage()); + status = "Failure: " + request.getFunction() + ": " + e.getMessage(); + } + + Response response = new Response(request.getSequenceNumber(), status); + log.info("Processed request " + request + "; response: " + response); + replicaManager.sendResponseToFE(response); + } + + @Override + public void restart() { + // TODO + shutdown(); + startProcess(); + } + + @Override + public int getId() { return 2; } + + private void shutdown() { + // TODO + } + + private String addResource(Request request) { + Resource resource = new Resource( + ResourceID.parse(request.getResourceID()), + ResourceType.parse(request.getResourceType()), + request.getDuration()); + responderServer.addResource(resource); + return "Successfully added resource " + resource; + } + + private String removeResource(Request request) { + try { + responderServer.removeResource( + ResourceID.parse(request.getResourceID()), + request.getDuration()); + return "Successfully removed resource " + request.getResourceID(); + } catch (NoSuchResourceException e) { + String msg = "Error removing " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } + + private String listResourceAvailability(Request request) { + // TODO + throw new NotImplementedException(); + } + + private String requestResource(Request request) { + try { + coordinatorServer.requestResource( + CoordinatorID.parse(request.getClientID()), + ResourceID.parse(request.getResourceID()), + request.getDuration()); + return "Successfully borrowed " + request.getResourceID(); + } catch (NoSuchResourceException | AlreadyBorrowedException | InvalidDurationException |ServerCommunicationError e) { + String msg = "Failed to borrow resource " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } + + private String findResource(Request request) { + // TODO + throw new NotImplementedException(); + } + + private String returnResource(Request request) { + try { + coordinatorServer.returnResource( + CoordinatorID.parse(request.getClientID()), + ResourceID.parse(request.getResourceID())); + return "Successfully returned resource " + request.getResourceID(); + } catch (NoSuchResourceException | NotBorrowedException | ServerCommunicationError e) { + String msg = "Failed to borrow resource " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } + + private String swapResource(Request request) { + try { + coordinatorServer.swapResource( + CoordinatorID.parse(request.getClientID()), + ResourceID.parse(request.getOldResourceID()), + ResourceID.parse(request.getResourceID())); + return "Successfully swapped " + request.getOldResourceID() + " for " + request.getResourceID(); + } catch (NoSuchResourceException | ServerCommunicationError e) { + String msg = "Failed to swap " + request.getOldResourceID() + " for " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } +} diff --git a/src/main/java/derms/replica2/RequestResource.java b/src/main/java/derms/replica2/RequestResource.java new file mode 100644 index 0000000..6a905db --- /dev/null +++ b/src/main/java/derms/replica2/RequestResource.java @@ -0,0 +1,223 @@ +package derms.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +class RequestResource { + static final int port = 5557; + static final int bufsize = 4096; + + static class Client { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) { + this.coordinatorID = coordinatorID; + this.resourceID = resourceID; + this.duration = duration; + } + + Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(coordinatorID, resourceID, duration); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Request Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufsize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString() + +": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.info("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + private Request(CoordinatorID cid, ResourceID rid, int duration) { + this.coordinatorID = cid; + this.resourceID = rid; + this.duration = duration; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { + this.request = request; + this.client = client; + this.resources = resources; + this.log = log; + } + + @Override + public void run() { + Response response = borrow(); + + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + log.severe("Failed to create response packet: "+e.getMessage()); + sock.close(); + return; + } + try { + sock.send(pkt); + } catch (Exception e) { + log.severe("Failed to send response: "+e.getMessage()); + } + sock.close(); + } + + private Response borrow() { + try { + Resource resource = resources.getByID(request.resourceID); + synchronized (resource) { + if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) { + return new Response(Response.Status.ALREADY_BORROWED, + request.coordinatorID+" cannot borrow "+request.resourceID + +"; already borrowed by "+resource.borrower); + } else if (request.duration <= 0) { + return new Response(Response.Status.INVALID_DURATION, + "duration "+request.duration+" less than 1"); + } else if (request.duration > resource.duration) { + return new Response(Response.Status.INVALID_DURATION, + "cannot borrow "+resource.id+" for duration of "+request.duration + +"; only "+resource.duration+" remaining"); + } + + if (resource.borrower.equals(request.coordinatorID)) { + // Resource is already borrowed. Add to existing duration. + resource.borrowDuration += request.duration; + } else { + resource.borrowDuration = request.duration; + } + resource.borrower = request.coordinatorID; + resource.isBorrowed = true; + resource.duration -= request.duration; + + return new Response(Response.Status.SUCCESS, request.coordinatorID + +" successfully borrowed "+request.resourceID); + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); + } + } + } + + static class Response implements Serializable { + Status status; + String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + private Response() { + this(Status.SUCCESS, ""); + } + + public enum Status { + SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION + } + } +} + diff --git a/src/main/java/derms/replica2/Resource.java b/src/main/java/derms/replica2/Resource.java new file mode 100644 index 0000000..31d40bc --- /dev/null +++ b/src/main/java/derms/replica2/Resource.java @@ -0,0 +1,39 @@ +package derms.replica2; + +import java.io.Serializable; + +class Resource implements Serializable { + ResourceID id; + ResourceType type; + int duration; + boolean isBorrowed; + CoordinatorID borrower; + int borrowDuration; + + Resource(ResourceID id, ResourceType type, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) { + this.id = id; + this.type = type; + this.duration = duration; + this.isBorrowed = isBorrowed; + this.borrower = borrower; + this.borrowDuration = borrowDuration; + } + + Resource(ResourceID id, ResourceType type, int duration) { + this(id, type, duration, false, new CoordinatorID(), -1); + } + + Resource() { + this(new ResourceID(), ResourceType.AMBULANCE, 0); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return id+" "+duration; + } +} diff --git a/src/main/java/derms/replica2/ResourceAvailability.java b/src/main/java/derms/replica2/ResourceAvailability.java new file mode 100644 index 0000000..ce4c698 --- /dev/null +++ b/src/main/java/derms/replica2/ResourceAvailability.java @@ -0,0 +1,128 @@ +package derms.replica2; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.logging.Logger; + +class ResourceAvailability { + static final int port = 5556; + + static class Client implements Runnable { + private InetAddress serverAddr; + private ResourceType request; + private Collection resources; + private Logger log; + + Client(InetAddress serverAddr, ResourceType request, Collection response) throws IOException { + this.serverAddr = serverAddr; + this.request = request; + this.resources = response; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Error binding socket: "+e.getMessage()); + return; + } + log.fine("Created socket"); + + DatagramPacket reqPkt; + try { + reqPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + log.severe("Error creating request: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(reqPkt); + } catch (IOException e) { + log.severe("Error sending request: "+e.getMessage()); + sock.close(); + return; + } + log.fine("Sent request"); + + Resource[] response; + try { + response = ResourceTransfer.receive(sock); + } catch (IOException e) { + log.severe(e.getMessage()); + return; + } finally { + sock.close(); + } + + for (Resource resource : response) { + resources.add(resource); + } + } + } + + static class Server implements Runnable { + static final int bufsize = 1024; + + private InetAddress localAddr; + private Resources resources; + private Logger log; + + Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + + DatagramPacket request = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(request); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + + ResourceType requestedName = null; + try { + requestedName = ObjectPacket.deserialize(request, ResourceType.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + log.info("Got request: "+requestedName); + + Resource[] response = resources.getByName(requestedName); + try { + ResourceTransfer.send(response, request.getSocketAddress()); + } catch (IOException e) { + log.warning("Error transfering resources: "+e.getMessage()); + } + } + } finally { + sock.close(); + } + } + } +} diff --git a/src/main/java/derms/replica2/ResourceID.java b/src/main/java/derms/replica2/ResourceID.java new file mode 100644 index 0000000..24da3d3 --- /dev/null +++ b/src/main/java/derms/replica2/ResourceID.java @@ -0,0 +1,49 @@ +package derms.replica2; + +import java.io.Serializable; + +class ResourceID implements Serializable { + String city; + short num; + + ResourceID (String city, short num) { + this.city = city; + this.num = num; + } + + ResourceID() { + this("XXX", (short) 1111); + } + + static ResourceID parse(String s) throws IllegalArgumentException { + if (s.length() != City.codeLen+ID.nDigits) { + throw new IllegalArgumentException("invalid resource ID: "+s); + } + try { + String cityCode = s.substring(0, City.codeLen); + short num = Short.parseShort(s.substring(City.codeLen)); + return new ResourceID(cityCode, num); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid resource ID: "+e.getMessage()); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + ResourceID other = (ResourceID) obj; + return (this.city.equals(other.city)) && (this.num == other.num); + } + + @Override + public int hashCode() { + return city.hashCode() * num; + } + + @Override + public String toString() { + return city+num; + } +} diff --git a/src/main/java/derms/replica2/ResourceTransfer.java b/src/main/java/derms/replica2/ResourceTransfer.java new file mode 100644 index 0000000..0ed17c5 --- /dev/null +++ b/src/main/java/derms/replica2/ResourceTransfer.java @@ -0,0 +1,50 @@ +package derms.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; + +class ResourceTransfer { + static final int bufsize = 1024; + + static void send(Resource[] resources, SocketAddress remoteAddr) throws IOException { + DatagramSocket sock = new DatagramSocket(); + + for (Resource resource : resources) { + DatagramPacket pkt = ObjectPacket.create(resource, remoteAddr); + sock.send(pkt); + } + + DatagramPacket pkt = ObjectPacket.create(new EndOfTransmission(), remoteAddr); + sock.send(pkt); + sock.close(); + } + + static Resource[] receive(DatagramSocket sock) throws IOException { + List resources = new ArrayList(); + byte[] buf = new byte[bufsize]; + DatagramPacket response = new DatagramPacket(buf, buf.length); + + for (;;) { + sock.receive(response); + + Object obj = ObjectPacket.deserialize(response, Object.class); + if (obj.getClass() == EndOfTransmission.class) { + break; + } + try { + resources.add((Resource) obj); + } catch (Exception e) { + throw new IOException("expected Resource; got "+obj.getClass().toString()); + } + } + Resource[] arr = new Resource[0]; + return resources.toArray(arr); + } + + private static class EndOfTransmission implements Serializable {} +} \ No newline at end of file diff --git a/src/main/java/derms/replica2/ResourceType.java b/src/main/java/derms/replica2/ResourceType.java new file mode 100644 index 0000000..92aba32 --- /dev/null +++ b/src/main/java/derms/replica2/ResourceType.java @@ -0,0 +1,18 @@ +package derms.replica2; + +import java.io.Serializable; + +enum ResourceType implements Serializable { + AMBULANCE, + FIRETRUCK, + PERSONNEL; + + static ResourceType parse(String s) throws IllegalArgumentException { + switch (s) { + case "AMBULANCE": return ResourceType.AMBULANCE; + case "FIRETRUCK": return ResourceType.FIRETRUCK; + case "PERSONNEL": return ResourceType.PERSONNEL; + } + throw new IllegalArgumentException("invalid resource name: "+s); + } +} diff --git a/src/main/java/derms/replica2/Resources.java b/src/main/java/derms/replica2/Resources.java new file mode 100644 index 0000000..634e315 --- /dev/null +++ b/src/main/java/derms/replica2/Resources.java @@ -0,0 +1,74 @@ +package derms.replica2; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; + +class Resources { + private Map> resources; + + Resources() { + this.resources = new ConcurrentHashMap>(); + } + + List borrowed(CoordinatorID borrower, ResourceType name) { + List borrowed = new ArrayList(); + Resource[] namedResources = getByName(name); + for (Resource r : namedResources) { + if (r.isBorrowed && r.borrower.equals(borrower)) { + borrowed.add(r); + } + } + return borrowed; + } + + Resource getByID(ResourceID id) throws NoSuchElementException { + for (Map rids : resources.values()) { + Resource resource = rids.get(id); + if (resource != null) { + return resource; + } + } + throw new NoSuchElementException("No such resource "+id); + } + + Resource[] getByName(ResourceType name) { + Map rids = resources.get(name); + if (rids == null) { + return new Resource[0]; + } + Resource[] r = new Resource[0]; + return rids.values().toArray(r); + } + + void add(Resource r) { + Map rids; + synchronized (resources) { + rids = resources.get(r.type); + if (rids == null) { + rids = new ConcurrentHashMap(); + resources.put(r.type, rids); + } + } + synchronized (rids) { + Resource existing = rids.get(r.id); + if (existing != null) { + existing.duration += r.duration; + } else { + rids.put(r.id, r); + } + } + } + + void removeByID(ResourceID id) throws NoSuchElementException { + for (Map rids : resources.values()) { + if (rids.containsKey(id)) { + rids.remove(id); + return; + } + } + throw new NoSuchElementException("No such resource "+id); + } +} diff --git a/src/main/java/derms/replica2/ResponderID.java b/src/main/java/derms/replica2/ResponderID.java new file mode 100644 index 0000000..480b471 --- /dev/null +++ b/src/main/java/derms/replica2/ResponderID.java @@ -0,0 +1,16 @@ +package derms.replica2; + +class ResponderID { + City city; + short num; + + ResponderID(City city, short num) { + this.city = city; + this.num = num; + } + + @Override + public String toString() { + return ""+city+"R"+num; + } +} diff --git a/src/main/java/derms/replica2/ResponderServer.java b/src/main/java/derms/replica2/ResponderServer.java new file mode 100644 index 0000000..02ff6b6 --- /dev/null +++ b/src/main/java/derms/replica2/ResponderServer.java @@ -0,0 +1,94 @@ +package derms.replica2; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Collection; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +class ResponderServer { + static final Duration timeout = Duration.ofSeconds(5); + + private City city; + private Resources resources; + private Servers servers; + private Logger log; + + ResponderServer(City city, Resources resources, Servers servers) throws IOException { + this.city = city; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + ResponderServer() throws IOException { + this(new City(), new Resources(), new Servers()); + } + + void addResource(Resource r) { + resources.add(r); + log.info("Added resource "+r+" - success"); + } + + void removeResource(ResourceID rid, int duration) throws NoSuchResourceException { + log.info("Remove duration "+duration+" from "+rid); + try { + Resource resource = resources.getByID(rid); + synchronized (resource) { + if (duration < 0 || duration >= resource.duration) { + resources.removeByID(rid); + log.info("Removed resource "+rid+" - success"); + } else { + resource.duration -= duration; + log.info("Removed duration from resource "+rid+". New duration: "+resource.duration+" - success"); + } + } + } catch (NoSuchElementException e) { + // Wanted to remove duration from resource. + if (duration >= 0) { + String msg = "Cannot remove duration from "+rid+": resource does not exist."; + log.warning(msg); + throw new NoSuchResourceException(msg); + } + + // Duration is negative---wanted to remove resource completely. + // Success because it is already removed. + log.info("Not removing "+rid+": resource does not exist."); + } + } + + Resource[] listResourceAvailability(ResourceType rname) throws ServerCommunicationError { + log.info("Request for available "+rname); + Collection availableResources = ConcurrentHashMap.newKeySet(); + ExecutorService pool = Executors.newFixedThreadPool(servers.size()); + try { + for (InetAddress serverAddr : servers.all()) { + pool.execute(new ResourceAvailability.Client(serverAddr, rname, availableResources)); + } + } catch (IOException e) { + String msg = "Failed to start ResourceAvailability Client: "+e.getMessage(); + log.severe(msg); + throw new ServerCommunicationError("ResourceAvailability: "+msg); + } + + log.fine("Started worker threads"); + pool.shutdown(); + boolean terminated; + try { + terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new ServerCommunicationError("listResourceAvailability(): listResourceAvailability() interrupted: "+e.getMessage()); + } + if (!terminated) { + throw new ServerCommunicationError("ResourceAvailability: request timed out: no response after "+timeout.toString()); + } + log.info("Response length "+availableResources.size()); + Resource[] arr = new Resource[0]; + return availableResources.toArray(arr); + } +} diff --git a/src/main/java/derms/replica2/ReturnResource.java b/src/main/java/derms/replica2/ReturnResource.java new file mode 100644 index 0000000..6c42b8a --- /dev/null +++ b/src/main/java/derms/replica2/ReturnResource.java @@ -0,0 +1,195 @@ +package derms.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +class ReturnResource { + public static final int port = 5559; + public static final int bufsize = 4096; + + static class Client { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + + Client(CoordinatorID cid, ResourceID rid) { + this.coordinatorID = cid; + this.resourceID = rid; + } + + Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(coordinatorID, resourceID); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Return Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Return Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufsize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Return Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Return Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString() + +": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.fine("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources)); + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + + private Request(CoordinatorID cid, ResourceID rid) { + this.coordinatorID = cid; + this.resourceID = rid; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + + private RequestHandler(Request request, SocketAddress client, Resources resources) { + this.request = request; + this.client = client; + this.resources = resources; + } + + @Override + public void run() { + Response response = returnResource(); + + DatagramSocket sock; + try { + sock= new DatagramSocket(); + } catch (Exception e) { + System.err.println("Request Resource Server: failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + System.err.println("Request Resource Server: failed to create response packet: "+e.getMessage()); + sock.close(); + return; + } + try { + sock.send(pkt); + } catch (Exception e) { + System.err.println("Request Resource Server: failed to send response: "+e.getMessage()); + } + sock.close(); + } + + private Response returnResource() { + try { + Resource resource = resources.getByID(request.resourceID); + synchronized (resource) { + if (!resource.isBorrowed || !resource.borrower.equals(request.coordinatorID)) { + return new Response(Response.Status.NOT_BORROWED, + request.resourceID+" is not borrowed by "+request.coordinatorID); + } + resource.isBorrowed = false; + resource.borrower = new CoordinatorID(); + resource.borrowDuration = -1; + return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id); + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); + } + } + } + + static class Response implements Serializable { + Status status; + String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + enum Status { + SUCCESS, NO_SUCH_RESOURCE, NOT_BORROWED + } + } +} diff --git a/src/main/java/derms/replica2/ServerCommunicationError.java b/src/main/java/derms/replica2/ServerCommunicationError.java new file mode 100644 index 0000000..2bf4d33 --- /dev/null +++ b/src/main/java/derms/replica2/ServerCommunicationError.java @@ -0,0 +1,7 @@ +package derms.replica2; + +class ServerCommunicationError extends Exception { + ServerCommunicationError(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica2/Servers.java b/src/main/java/derms/replica2/Servers.java new file mode 100644 index 0000000..498b7ce --- /dev/null +++ b/src/main/java/derms/replica2/Servers.java @@ -0,0 +1,34 @@ +package derms.replica2; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class Servers { + private Map servers = new ConcurrentHashMap(); + + /** Returns the address of the server located in the specified city, or null if there is no server in the city. */ + InetAddress get(City city) { + return servers.get(city); + } + + /** + * Associates the specified server address with the specified city. + * If there was already a server associated with the city, the old value is replaced. + * @param city the city where the server is located + * @param addr the address of the server + * @return the previous server address, or null if there was no server associated with this city. + */ + InetAddress put(City city, InetAddress addr) { + return servers.put(city, addr); + } + + Collection all() { + return servers.values(); + } + + int size() { + return servers.size(); + } +} diff --git a/src/main/java/derms/replica2/SwapResource.java b/src/main/java/derms/replica2/SwapResource.java new file mode 100644 index 0000000..cc65f29 --- /dev/null +++ b/src/main/java/derms/replica2/SwapResource.java @@ -0,0 +1,262 @@ +package derms.replica2; + +import java.io.IOException; +import java.io.Serializable; +import java.net.*; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +class SwapResource { + static final int port = 5560; + static final int bufSize = 4096; + + static class Client { + private CoordinatorID cid; + private ResourceID oldRID; + private ResourceID newRID; + + Client(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { + this.cid = cid; + this.oldRID = oldRID; + this.newRID = newRID; + } + + Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(cid, oldRID, newRID); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Swap Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Swap Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufSize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Swap Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Swap Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private Servers servers; + private ExecutorService pool; + private Logger log; + + Server(InetAddress localAddr, Resources resources, Servers servers) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.servers = servers; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr+": "+e.getMessage()); + return; + } + log.info("Listening on "+localAddr+":"+port); + + DatagramPacket requestPkt = new DatagramPacket(new byte[bufSize], bufSize); + + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.fine("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + SocketAddress client = requestPkt.getSocketAddress(); + try { + RequestHandler handler = new RequestHandler(request, client, resources, servers); + pool.execute(handler); + } catch (IOException e) { + log.warning("Failed to create request handler: "+e.getMessage()); + continue; + } + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID cid; + private ResourceID oldRID; + private ResourceID newRID; + + private Request(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { + this.cid = cid; + this.oldRID = oldRID; + this.newRID = newRID; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Servers servers; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Servers servers) throws IOException { + this.request = request; + this.client = client; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + Response response = swapResources(); + + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + log.severe("failed to create response: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(pkt); + } catch (Exception e) { + log.severe("failed to send response: "+e.getMessage()); + } finally { + sock.close(); + } + } + + private Response swapResources() { + try { + Resource resource = resources.getByID(request.oldRID); + synchronized (resource) { + if (!resource.borrower.equals(request.cid)) { + return new Response(Response.Status.NOT_BORROWED, "resource "+request.oldRID+" not borrowed by "+request.cid); + } + try { + acquireNewResource(resource.borrowDuration); + returnOldResource(resource); + return new Response(Response.Status.SUCCESS, request.cid+" success fully swapped "+request.oldRID+" for "+request.newRID); + } catch (UnknownHostException e) { + return new Response(Response.Status.UNKNOWN_HOST, e.getMessage()); + } catch (IOException e) { + return new Response(Response.Status.FAILURE, e.getMessage()); + } catch (CannotBorrow e) { + return new Response(Response.Status.CANNOT_BORROW, e.getMessage()); + } + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.oldRID); + } + } + + private void acquireNewResource(int borrowDuration) throws UnknownHostException, IOException, CannotBorrow { + RequestResource.Client requestClient = new RequestResource.Client(request.cid, request.newRID, borrowDuration); + City city = new City(request.newRID.city); + InetAddress requestResourceServer = servers.get(city); + if (requestResourceServer == null) { + throw new UnknownHostException(city.toString()); + } + RequestResource.Response response = requestClient.sendRequest(requestResourceServer); + // TODO: make exception handling more granular---pass through status from Request. + if (response.status != RequestResource.Response.Status.SUCCESS) { + throw new CannotBorrow(request.cid, request.newRID, response.message); + } + } + + private void returnOldResource(Resource r) { + r.isBorrowed = false; + r.borrower = new CoordinatorID(); + r.borrowDuration = -1; + } + } + + static class Response implements Serializable { + Status status; + String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + enum Status { + SUCCESS, + FAILURE, + NO_SUCH_RESOURCE, + NOT_BORROWED, + CANNOT_BORROW, + UNKNOWN_HOST + } + } + + private static class CannotBorrow extends Exception { + CoordinatorID attemptedBorrower; + ResourceID rid; + String message; + + private CannotBorrow(CoordinatorID attemptedBorrower, ResourceID rid, String message) { + this.attemptedBorrower = attemptedBorrower; + this.rid = rid; + this.message = message; + } + + @Override + public String getMessage() { + return attemptedBorrower+" failed to borrow "+rid+": "+message; + } + } +} \ No newline at end of file -- cgit v1.2.3