summaryrefslogtreecommitdiffstats
path: root/src/main/java/derms/replica/replica1
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/derms/replica/replica1')
-rw-r--r--src/main/java/derms/replica/replica1/AlreadyBorrowedException.java7
-rw-r--r--src/main/java/derms/replica/replica1/AnnounceListener.java81
-rw-r--r--src/main/java/derms/replica/replica1/Announcer.java69
-rw-r--r--src/main/java/derms/replica/replica1/City.java38
-rw-r--r--src/main/java/derms/replica/replica1/CoordinatorID.java34
-rw-r--r--src/main/java/derms/replica/replica1/CoordinatorServer.java170
-rw-r--r--src/main/java/derms/replica/replica1/DermsLogger.java23
-rw-r--r--src/main/java/derms/replica/replica1/FindResource.java160
-rw-r--r--src/main/java/derms/replica/replica1/Hosts.java26
-rw-r--r--src/main/java/derms/replica/replica1/ID.java5
-rw-r--r--src/main/java/derms/replica/replica1/InvalidDurationException.java7
-rw-r--r--src/main/java/derms/replica/replica1/NoSuchResourceException.java7
-rw-r--r--src/main/java/derms/replica/replica1/NotBorrowedException.java7
-rw-r--r--src/main/java/derms/replica/replica1/ObjectPacket.java33
-rw-r--r--src/main/java/derms/replica/replica1/RequestResource.java223
-rw-r--r--src/main/java/derms/replica/replica1/Resource.java39
-rw-r--r--src/main/java/derms/replica/replica1/ResourceAvailability.java128
-rw-r--r--src/main/java/derms/replica/replica1/ResourceID.java49
-rw-r--r--src/main/java/derms/replica/replica1/ResourceName.java18
-rw-r--r--src/main/java/derms/replica/replica1/ResourceTransfer.java50
-rw-r--r--src/main/java/derms/replica/replica1/Resources.java74
-rw-r--r--src/main/java/derms/replica/replica1/ResponderID.java16
-rw-r--r--src/main/java/derms/replica/replica1/ResponderServer.java94
-rw-r--r--src/main/java/derms/replica/replica1/ReturnResource.java195
-rw-r--r--src/main/java/derms/replica/replica1/ServerCommunicationError.java7
-rw-r--r--src/main/java/derms/replica/replica1/Servers.java34
-rw-r--r--src/main/java/derms/replica/replica1/StationServer.java136
-rw-r--r--src/main/java/derms/replica/replica1/SwapResource.java262
28 files changed, 0 insertions, 1992 deletions
diff --git a/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java b/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java
deleted file mode 100644
index 634f154..0000000
--- a/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package derms.replica.replica1;
-
-public class AlreadyBorrowedException extends Exception {
- public AlreadyBorrowedException(String message) {
- super(message);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/AnnounceListener.java b/src/main/java/derms/replica/replica1/AnnounceListener.java
deleted file mode 100644
index 9b5980d..0000000
--- a/src/main/java/derms/replica/replica1/AnnounceListener.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.net.*;
-import java.util.logging.Logger;
-
-public class AnnounceListener implements Runnable {
- private static final int bufsize = 1024;
-
- private InetSocketAddress groupAddr;
- private InetAddress localAddr;
- private Servers servers;
- private Logger log;
-
- public AnnounceListener(InetSocketAddress groupAddr, InetAddress localAddr, Servers servers) throws IOException {
- this.groupAddr = groupAddr;
- this.localAddr = localAddr;
- this.servers = servers;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- NetworkInterface netInterface = null;
- try {
- netInterface = NetworkInterface.getByInetAddress(localAddr);
- if (netInterface == null) {
- throw new Exception("netInterface is null");
- }
- } catch (Exception e) {
- log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage());
- return;
- }
-
- MulticastSocket sock = null;
- try {
- sock = new MulticastSocket(groupAddr.getPort());
- sock.joinGroup(groupAddr, netInterface);
- } catch (Exception e) {
- log.severe("Failed to open multicast socket: "+e.getMessage());
- return;
- }
-
- log.info("Listening to "+groupAddr.toString()+" from "+localAddr.toString()+" ("+netInterface.getName()+")");
- byte[] buf = new byte[bufsize];
- DatagramPacket pkt = new DatagramPacket(buf, buf.length);
-
- for (;;) {
- try {
- sock.receive(pkt);
- } catch (Exception e) {
- log.warning("Error receiving from multicast socket: "+e.getMessage());
- continue;
- }
-
- ObjectInputStream objStream;
- try {
- objStream = new ObjectInputStream(
- new ByteArrayInputStream(pkt.getData()));
- } catch (IOException e) {
- log.warning("Failed to create input stream: "+e.getMessage());
- continue;
- }
-
- City city;
- try {
- city = (City) objStream.readObject();
- } catch (Exception e) {
- log.warning("Failed to deserialize data: "+e.getMessage());
- continue;
- }
-
- InetAddress remote = pkt.getAddress();
- if (servers.put(city, remote) == null) {
- log.info("Added remote server "+city.toString()+" "+remote.toString());
- }
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/Announcer.java b/src/main/java/derms/replica/replica1/Announcer.java
deleted file mode 100644
index 93bbe24..0000000
--- a/src/main/java/derms/replica/replica1/Announcer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.net.*;
-import java.util.logging.Logger;
-
-public class Announcer implements Runnable {
- public static final long intervalMillis = 3000;
-
- private SocketAddress group;
- private InetAddress localAddr;
- private City city;
- private Logger log;
-
- public Announcer(SocketAddress group, InetAddress localAddr, City city) throws IOException {
- this.group = group;
- this.localAddr = localAddr;
- this.city = city;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- NetworkInterface netInterface = null;
- try {
- netInterface = NetworkInterface.getByInetAddress(localAddr);
- if (netInterface == null) {
- throw new Exception("netInterface is null");
- }
- } catch (Exception e) {
- log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage());
- return;
- }
-
- MulticastSocket sock = null;
- try {
- sock = new MulticastSocket();
- sock.joinGroup(group, netInterface);
- } catch (Exception e) {
- log.severe("Failed to open multicast socket: "+e.getMessage());
- return;
- }
-
- log.info("Announcing from "+localAddr.toString()+" ("+netInterface.getName()+") to "+group.toString());
-
- DatagramPacket pkt = null;
- try {
- pkt = ObjectPacket.create(city, group);
- } catch (IOException e) {
- log.severe("Failed to create packet: "+e.getMessage());
- sock.close();
- return;
- }
-
- try {
- for (;;) {
- sock.send(pkt);
- Thread.sleep(intervalMillis);
- }
- } catch (IOException e) {
- log.severe("Failed to send to multicast socket "+group.toString()+": "+e.getMessage());
- } catch (InterruptedException e) {
- log.info("Interrupted.");
- } finally {
- log.info("Shutting down...");
- sock.close();
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/City.java b/src/main/java/derms/replica/replica1/City.java
deleted file mode 100644
index 548f1fa..0000000
--- a/src/main/java/derms/replica/replica1/City.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.Serializable;
-
-public class City implements Serializable {
- public static final int codeLen = 3;
-
- private String code;
-
- public City(String code) throws IllegalArgumentException {
- if (code.length() != codeLen)
- throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters");
- this.code = code;
- }
-
- public City() {
- this("XXX");
- }
-
- @Override
- public String toString() {
- return code;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || this.getClass() != obj.getClass()) {
- return false;
- }
- City other = (City) obj;
- return this.code.equals(other.code);
- }
-
- @Override
- public int hashCode() {
- return code.hashCode();
- }
-}
diff --git a/src/main/java/derms/replica/replica1/CoordinatorID.java b/src/main/java/derms/replica/replica1/CoordinatorID.java
deleted file mode 100644
index c09124c..0000000
--- a/src/main/java/derms/replica/replica1/CoordinatorID.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.Serializable;
-
-public class CoordinatorID implements Serializable {
- public String city;
- public short num;
-
- public CoordinatorID(String city, short num) {
- this.city = city;
- this.num = num;
- }
-
- public CoordinatorID(String city, int num) {
- this(city, (short) num);
- }
-
- public CoordinatorID() {
- this("XXX", 0);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() != this.getClass())
- return false;
- CoordinatorID other = (CoordinatorID) obj;
- return other.city.equals(this.city) && other.num == this.num;
- }
-
- @Override
- public String toString() {
- return city+"C"+num;
- }
-}
diff --git a/src/main/java/derms/replica/replica1/CoordinatorServer.java b/src/main/java/derms/replica/replica1/CoordinatorServer.java
deleted file mode 100644
index bdcf242..0000000
--- a/src/main/java/derms/replica/replica1/CoordinatorServer.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-public class CoordinatorServer {
- public static final Duration timeout = Duration.ofSeconds(5);
-
- private City city;
- private Resources resources;
- private Servers servers;
- private Logger log;
-
- public CoordinatorServer(City city, Resources resources, Servers servers) throws IOException {
- super();
- this.city = city;
- this.resources = resources;
- this.servers = servers;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- public CoordinatorServer() throws IOException {
- this(new City(), new Resources(), new Servers());
- }
-
- public void requestResource(CoordinatorID cid, ResourceID rid, int duration)
- throws ServerCommunicationError, NoSuchResourceException,
- AlreadyBorrowedException, InvalidDurationException
- {
- log.info("Request for "+rid+" from "+cid);
-
- InetAddress server = servers.get(new City(rid.city));
- if (server == null) {
- throw new ServerCommunicationError("requestResource(): no connection to server "+rid.city.toString());
- }
-
- RequestResource.Client client = new RequestResource.Client(cid, rid, duration);
- RequestResource.Response response;
- try {
- response = client.sendRequest(server);
- } catch (IOException e) {
- throw new ServerCommunicationError("requestResource(): "+e.getMessage());
- }
- switch (response.status) {
- case SUCCESS:
- log.info("Request "+rid+" from "+cid+" - success");
- break;
- case NO_SUCH_RESOURCE:
- log.warning(response.message);
- throw new NoSuchResourceException(response.message);
- case ALREADY_BORROWED:
- log.warning(response.message);
- throw new AlreadyBorrowedException(response.message);
- case INVALID_DURATION:
- log.warning(response.message);
- throw new InvalidDurationException(response.message);
- default:
- log.warning("Unsuccessful response from server: "+response.message);
- throw new ServerCommunicationError("requestResource(): failed to borrow resource: "+response.message);
- }
- }
-
- public Resource[] findResource(CoordinatorID cid, ResourceName rname) throws ServerCommunicationError {
- log.info("Find Resource "+rname+" from "+cid);
- FindResource.Request request = new FindResource.Request(cid, rname);
- Collection<Resource> response = ConcurrentHashMap.newKeySet();
- ExecutorService pool = Executors.newFixedThreadPool(servers.size());
- try {
- for (InetAddress server : servers.all()) {
- pool.execute(new FindResource.Client(request, server, response));
- }
- } catch (IOException e) {
- String msg = "Failed to start FindResource Client: "+e.getMessage();
- log.severe(msg);
- throw new ServerCommunicationError("findResource(): "+msg);
- }
- log.fine("Started worker threads");
- pool.shutdown();
- boolean terminated;
- try {
- terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- String msg = "findResource() interrupted: "+e.getMessage();
- log.warning(msg);
- throw new ServerCommunicationError("findResource(): "+msg);
- }
- if (!terminated) {
- String msg = "Request timed out: no response after "+timeout.toString();
- log.warning(msg);
- throw new ServerCommunicationError("findResource(): "+msg);
- }
- Resource[] arr = new Resource[0];
- arr = response.toArray(arr);
- log.info("Find resource "+rname+" from "+cid+" - success. Response length: "+arr.length);
- return arr;
- }
-
- public void returnResource(CoordinatorID cid, ResourceID rid)
- throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException
- {
- log.info("Return resource "+rid+" from "+cid);
- InetAddress server = servers.get(new City(rid.city));
- if (server == null) {
- String msg = "no connection to server "+rid.city;
- log.warning(msg);
- throw new ServerCommunicationError("returnResource(): "+msg);
- }
- log.fine("server address: "+server);
-
- ReturnResource.Client client = new ReturnResource.Client(cid, rid);
- ReturnResource.Response response;
- try {
- response = client.sendRequest(server);
- } catch (IOException e) {
- log.warning(e.getMessage());
- throw new ServerCommunicationError("returnResource(): "+e.getMessage());
- }
- switch (response.status) {
- case SUCCESS:
- log.info(cid+" return "+rid+" - success");
- break;
- case NO_SUCH_RESOURCE:
- log.warning(response.message);
- throw new NoSuchResourceException(response.message);
- case NOT_BORROWED:
- log.warning(response.message);
- throw new NotBorrowedException(response.message);
- default:
- String msg = "Failed to return resource: "+response.message;
- log.warning(msg);
- throw new ServerCommunicationError("returnResource(): "+msg);
- }
- }
-
- public void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException {
- log.info(cid+": swap "+oldRID+", "+newRID);
-
- InetAddress server = servers.get(new City(oldRID.city));
- if (server == null) {
- String msg = "no connection to server "+oldRID.city;
- log.warning(msg);
- throw new ServerCommunicationError("swapResource(): "+msg);
- }
- log.fine("server address: "+server);
-
- SwapResource.Client client = new SwapResource.Client(cid, oldRID, newRID);
- SwapResource.Response response;
- try {
- response = client.sendRequest(server);
- } catch (IOException e) {
- throw new ServerCommunicationError("swapResource(): "+e.getMessage());
- }
- switch (response.status) {
- case SUCCESS:
- log.info(cid+": swap "+oldRID+", "+newRID+" - success");
- break;
- case NO_SUCH_RESOURCE:
- throw new NoSuchResourceException(response.message);
- default:
- throw new ServerCommunicationError("swapResource(): "+response.message);
- }
- }
-}
diff --git a/src/main/java/derms/replica/replica1/DermsLogger.java b/src/main/java/derms/replica/replica1/DermsLogger.java
deleted file mode 100644
index 9ff8249..0000000
--- a/src/main/java/derms/replica/replica1/DermsLogger.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.util.logging.FileHandler;
-import java.util.logging.Handler;
-import java.util.logging.Logger;
-import java.util.logging.SimpleFormatter;
-
-public class DermsLogger {
- public static final String logFile = "server.log";
-
- private static Logger log = null;
-
- public static Logger getLogger(Class clazz) throws IOException {
- if (log == null) {
- log = Logger.getLogger(clazz.getName());
- Handler fileHandler = new FileHandler(logFile);
- fileHandler.setFormatter(new SimpleFormatter());
- log.addHandler(fileHandler);
- }
- return log;
- }
-}
diff --git a/src/main/java/derms/replica/replica1/FindResource.java b/src/main/java/derms/replica/replica1/FindResource.java
deleted file mode 100644
index c9a0552..0000000
--- a/src/main/java/derms/replica/replica1/FindResource.java
+++ /dev/null
@@ -1,160 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.*;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Logger;
-
-public class FindResource {
- public static final int port = 5558;
-
- public static class Client implements Runnable {
- private InetAddress serverAddr;
- private Request request;
- private Collection<Resource> response;
- private Logger log;
-
- public Client(Request request, InetAddress serverAddr, Collection<Resource> response) throws IOException {
- this.serverAddr = serverAddr;
- this.request = request;
- this.response = response;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- DatagramSocket sock;
- try {
- sock = new DatagramSocket();
- } catch (Exception e) {
- log.severe("Failed to open socket: "+e.getMessage());
- return;
- }
-
- DatagramPacket requestPkt;
- try {
- requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
- } catch (IOException e) {
- log.severe("Failed to create request packet: "+e.getMessage());
- sock.close();
- return;
- }
-
- try {
- sock.send(requestPkt);
- } catch (Exception e) {
- log.severe("Failed to send request: "+e.getMessage());
- sock.close();
- return;
- }
-
- Resource[] resources;
- try {
- resources = ResourceTransfer.receive(sock);
- } catch (IOException e) {
- log.severe(e.getMessage());
- return;
- } finally {
- sock.close();
- }
-
- for (Resource r : resources) {
- response.add(r);
- }
- }
- }
-
- public static class Server implements Runnable {
- private static final int bufsize = 4096;
-
- private InetAddress localAddr;
- private Resources resources;
- private ExecutorService pool;
- private Logger log;
-
- public Server(InetAddress localAddr, Resources resources) throws IOException {
- this.localAddr = localAddr;
- this.resources = resources;
- this.pool = Executors.newWorkStealingPool();
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- DatagramSocket sock = null;
- try {
- sock = new DatagramSocket(port, localAddr);
- } catch (Exception e) {
- log.severe("Failed to bind socket: "+e.getMessage());
- return;
- }
-
- log.info("Running on "+localAddr.toString()+":"+port);
-
- DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
- try {
- for (;;) {
- try {
- sock.receive(requestPkt);
- } catch (Exception e) {
- log.warning("Error receiving from socket: "+e.getMessage());
- continue;
- }
- log.info("Got request");
-
- Request request;
- try {
- request = ObjectPacket.deserialize(requestPkt, Request.class);
- } catch (IOException e) {
- log.warning("Failed to deserialize request: "+e.getMessage());
- continue;
- }
-
- pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log));
- }
- } finally {
- sock.close();
- }
- }
- }
-
- public static class Request implements Serializable {
- private CoordinatorID cid;
- private ResourceName rname;
-
- public Request(CoordinatorID cid, ResourceName rname) {
- this.cid = cid;
- this.rname = rname;
- }
- }
-
- private static class RequestHandler implements Runnable {
- private Request request;
- private SocketAddress client;
- private Resources resources;
- private Logger log;
-
- private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) {
- this.request = request;
- this.client = client;
- this.resources = resources;
- this.log = log;
- }
-
- @Override
- public void run() {
- List<Resource> borrowedResources = resources.borrowed(request.cid, request.rname);
- log.info(""+borrowedResources.size()+" "+request.rname+" resources borrowed by "+request.cid);
- try {
- Resource[] arr = new Resource[0];
- ResourceTransfer.send(borrowedResources.toArray(arr), client);
- } catch (IOException e) {
- log.severe("Failed to send response: "+e.getMessage());
- }
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/Hosts.java b/src/main/java/derms/replica/replica1/Hosts.java
deleted file mode 100644
index e27413d..0000000
--- a/src/main/java/derms/replica/replica1/Hosts.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package derms.replica.replica1;
-
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class Hosts {
- private static Map<City, String> hosts = null;
-
- public static String get(City city) throws UnknownHostException {
- if (hosts == null)
- init();
-
- String host = hosts.get(city);
- if (host == null)
- throw new UnknownHostException("unknown host: "+city);
- return host;
- }
-
- private static void init() {
- hosts = new HashMap<City, String>();
- hosts.put(new City("MTL"), "alpine1");
- hosts.put(new City("QUE"), "alpine2");
- hosts.put(new City("SHE"), "alpine3");
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ID.java b/src/main/java/derms/replica/replica1/ID.java
deleted file mode 100644
index 8d9f9aa..0000000
--- a/src/main/java/derms/replica/replica1/ID.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package derms.replica.replica1;
-
-public class ID {
- public static final int nDigits = 4;
-}
diff --git a/src/main/java/derms/replica/replica1/InvalidDurationException.java b/src/main/java/derms/replica/replica1/InvalidDurationException.java
deleted file mode 100644
index ba3c75a..0000000
--- a/src/main/java/derms/replica/replica1/InvalidDurationException.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package derms.replica.replica1;
-
-public class InvalidDurationException extends Exception {
- public InvalidDurationException (String message) {
- super(message);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/NoSuchResourceException.java b/src/main/java/derms/replica/replica1/NoSuchResourceException.java
deleted file mode 100644
index 3979570..0000000
--- a/src/main/java/derms/replica/replica1/NoSuchResourceException.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package derms.replica.replica1;
-
-public class NoSuchResourceException extends Exception {
- public NoSuchResourceException(String message) {
- super(message);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/NotBorrowedException.java b/src/main/java/derms/replica/replica1/NotBorrowedException.java
deleted file mode 100644
index 5c0257a..0000000
--- a/src/main/java/derms/replica/replica1/NotBorrowedException.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package derms.replica.replica1;
-
-public class NotBorrowedException extends Exception {
- public NotBorrowedException (String message) {
- super(message);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ObjectPacket.java b/src/main/java/derms/replica/replica1/ObjectPacket.java
deleted file mode 100644
index 9d7de45..0000000
--- a/src/main/java/derms/replica/replica1/ObjectPacket.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.*;
-import java.net.DatagramPacket;
-import java.net.SocketAddress;
-
-public class ObjectPacket {
- public static DatagramPacket create(Serializable obj, SocketAddress remoteAddr) throws IOException {
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- ObjectOutputStream objStream = new ObjectOutputStream(byteStream);
- objStream.writeObject(obj);
- objStream.flush();
- byte[] buf = byteStream.toByteArray();
- objStream.close();
- return new DatagramPacket(buf, buf.length, remoteAddr);
- }
-
- public static <E> E deserialize(DatagramPacket pkt, Class<E> clazz) throws IOException {
- ObjectInputStream objectStream;
- try {
- objectStream = new ObjectInputStream(
- new ByteArrayInputStream(pkt.getData()));
- } catch (Exception e) {
- throw new IOException("failed to create input stream: "+e.getMessage());
- }
-
- try {
- return clazz.cast(objectStream.readObject());
- } catch (Exception e) {
- throw new IOException(e.getMessage());
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/RequestResource.java b/src/main/java/derms/replica/replica1/RequestResource.java
deleted file mode 100644
index f7c4cbd..0000000
--- a/src/main/java/derms/replica/replica1/RequestResource.java
+++ /dev/null
@@ -1,223 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.*;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Logger;
-
-public class RequestResource {
- public static final int port = 5557;
- public static final int bufsize = 4096;
-
- public static class Client {
- private CoordinatorID coordinatorID;
- private ResourceID resourceID;
- private int duration;
-
- public Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) {
- this.coordinatorID = coordinatorID;
- this.resourceID = resourceID;
- this.duration = duration;
- }
-
- public Response sendRequest(InetAddress serverAddr) throws IOException {
- Request request = new Request(coordinatorID, resourceID, duration);
- DatagramSocket sock;
- try {
- sock = new DatagramSocket();
- } catch (Exception e) {
- throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage());
- }
-
- DatagramPacket requestPkt;
- try {
- requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
- } catch (IOException e) {
- sock.close();
- throw new IOException("Request Resource Client: failed to create request: "+e.getMessage());
- }
-
- sock.send(requestPkt);
-
- byte[] buf = new byte[bufsize];
- DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
- try {
- sock.receive(responsePkt);
- } catch (Exception e) {
- sock.close();
- throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage());
- }
-
- try {
- return ObjectPacket.deserialize(responsePkt, Response.class);
- } catch (IOException e) {
- throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage());
- } finally {
- sock.close();
- }
- }
- }
-
- public static class Server implements Runnable {
- private InetAddress localAddr;
- private Resources resources;
- private ExecutorService pool;
- private Logger log;
-
- public Server(InetAddress localAddr, Resources resources) throws IOException {
- this.localAddr = localAddr;
- this.resources = resources;
- pool = Executors.newWorkStealingPool();
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- DatagramSocket sock = null;
- try {
- sock = new DatagramSocket(port, localAddr);
- } catch (Exception e) {
- log.severe("Failed to bind socket to "+localAddr.toString()
- +": "+e.getMessage());
- return;
- }
-
- log.info("Listening on "+localAddr.toString()+":"+port);
- DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
- try {
- for (;;) {
- try {
- sock.receive(requestPkt);
- } catch (Exception e) {
- log.warning("Error receiving from socket: "+e.getMessage());
- continue;
- }
- log.info("Got request");
-
- Request request = null;
- try {
- request = ObjectPacket.deserialize(requestPkt, Request.class);
- } catch (IOException e) {
- log.warning("Failed to deserialize request: "+e.getMessage());
- continue;
- }
-
- pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log));
- }
- } finally {
- sock.close();
- }
- }
- }
-
- private static class Request implements Serializable {
- private CoordinatorID coordinatorID;
- private ResourceID resourceID;
- private int duration;
-
- private Request(CoordinatorID cid, ResourceID rid, int duration) {
- this.coordinatorID = cid;
- this.resourceID = rid;
- this.duration = duration;
- }
- }
-
- private static class RequestHandler implements Runnable {
- private Request request;
- private SocketAddress client;
- private Resources resources;
- private Logger log;
-
- private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) {
- this.request = request;
- this.client = client;
- this.resources = resources;
- this.log = log;
- }
-
- @Override
- public void run() {
- Response response = borrow();
-
- DatagramSocket sock;
- try {
- sock = new DatagramSocket();
- } catch (Exception e) {
- log.severe("Failed to open socket: "+e.getMessage());
- return;
- }
-
- DatagramPacket pkt;
- try {
- pkt = ObjectPacket.create(response, client);
- } catch (IOException e) {
- log.severe("Failed to create response packet: "+e.getMessage());
- sock.close();
- return;
- }
- try {
- sock.send(pkt);
- } catch (Exception e) {
- log.severe("Failed to send response: "+e.getMessage());
- }
- sock.close();
- }
-
- private Response borrow() {
- try {
- Resource resource = resources.getByID(request.resourceID);
- synchronized (resource) {
- if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) {
- return new Response(Response.Status.ALREADY_BORROWED,
- request.coordinatorID+" cannot borrow "+request.resourceID
- +"; already borrowed by "+resource.borrower);
- } else if (request.duration <= 0) {
- return new Response(Response.Status.INVALID_DURATION,
- "duration "+request.duration+" less than 1");
- } else if (request.duration > resource.duration) {
- return new Response(Response.Status.INVALID_DURATION,
- "cannot borrow "+resource.id+" for duration of "+request.duration
- +"; only "+resource.duration+" remaining");
- }
-
- if (resource.borrower.equals(request.coordinatorID)) {
- // Resource is already borrowed. Add to existing duration.
- resource.borrowDuration += request.duration;
- } else {
- resource.borrowDuration = request.duration;
- }
- resource.borrower = request.coordinatorID;
- resource.isBorrowed = true;
- resource.duration -= request.duration;
-
- return new Response(Response.Status.SUCCESS, request.coordinatorID
- +" successfully borrowed "+request.resourceID);
- }
- } catch (NoSuchElementException e) {
- return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID);
- }
- }
- }
-
- public static class Response implements Serializable {
- public Status status;
- public String message;
-
- private Response(Status status, String message) {
- this.status = status;
- this.message = message;
- }
-
- private Response() {
- this(Status.SUCCESS, "");
- }
-
- public enum Status {
- SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION
- }
- }
-}
-
diff --git a/src/main/java/derms/replica/replica1/Resource.java b/src/main/java/derms/replica/replica1/Resource.java
deleted file mode 100644
index 4abe41b..0000000
--- a/src/main/java/derms/replica/replica1/Resource.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.Serializable;
-
-public class Resource implements Serializable {
- public ResourceID id;
- public ResourceName name;
- public int duration;
- public boolean isBorrowed;
- public CoordinatorID borrower;
- public int borrowDuration;
-
- public Resource(ResourceID id, ResourceName name, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) {
- this.id = id;
- this.name = name;
- this.duration = duration;
- this.isBorrowed = isBorrowed;
- this.borrower = borrower;
- this.borrowDuration = borrowDuration;
- }
-
- public Resource(ResourceID id, ResourceName name, int duration) {
- this(id, name, duration, false, new CoordinatorID(), -1);
- }
-
- public Resource() {
- this(new ResourceID(), ResourceName.AMBULANCE, 0);
- }
-
- @Override
- public int hashCode() {
- return id.hashCode();
- }
-
- @Override
- public String toString() {
- return id+" "+duration;
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ResourceAvailability.java b/src/main/java/derms/replica/replica1/ResourceAvailability.java
deleted file mode 100644
index a6083ab..0000000
--- a/src/main/java/derms/replica/replica1/ResourceAvailability.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.logging.Logger;
-
-public class ResourceAvailability {
- public static final int port = 5556;
-
- public static class Client implements Runnable {
- private InetAddress serverAddr;
- private ResourceName request;
- private Collection<Resource> resources;
- private Logger log;
-
- public Client(InetAddress serverAddr, ResourceName request, Collection<Resource> response) throws IOException {
- this.serverAddr = serverAddr;
- this.request = request;
- this.resources = response;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- DatagramSocket sock;
- try {
- sock = new DatagramSocket();
- } catch (Exception e) {
- log.severe("Error binding socket: "+e.getMessage());
- return;
- }
- log.fine("Created socket");
-
- DatagramPacket reqPkt;
- try {
- reqPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
- } catch (IOException e) {
- log.severe("Error creating request: "+e.getMessage());
- sock.close();
- return;
- }
-
- try {
- sock.send(reqPkt);
- } catch (IOException e) {
- log.severe("Error sending request: "+e.getMessage());
- sock.close();
- return;
- }
- log.fine("Sent request");
-
- Resource[] response;
- try {
- response = ResourceTransfer.receive(sock);
- } catch (IOException e) {
- log.severe(e.getMessage());
- return;
- } finally {
- sock.close();
- }
-
- for (Resource resource : response) {
- resources.add(resource);
- }
- }
- }
-
- public static class Server implements Runnable {
- public static final int bufsize = 1024;
-
- private InetAddress localAddr;
- private Resources resources;
- private Logger log;
-
- public Server(InetAddress localAddr, Resources resources) throws IOException {
- this.localAddr = localAddr;
- this.resources = resources;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- DatagramSocket sock = null;
- try {
- sock = new DatagramSocket(port, localAddr);
- } catch (Exception e) {
- log.severe("Failed to bind socket to "+localAddr.toString()+": "+e.getMessage());
- return;
- }
-
- log.info("Listening on "+localAddr.toString()+":"+port);
-
- DatagramPacket request = new DatagramPacket(new byte[bufsize], bufsize);
- try {
- for (;;) {
- try {
- sock.receive(request);
- } catch (Exception e) {
- log.warning("Error receiving from socket: "+e.getMessage());
- continue;
- }
-
- ResourceName requestedName = null;
- try {
- requestedName = ObjectPacket.deserialize(request, ResourceName.class);
- } catch (IOException e) {
- log.warning("Failed to deserialize request: "+e.getMessage());
- continue;
- }
- log.info("Got request: "+requestedName);
-
- Resource[] response = resources.getByName(requestedName);
- try {
- ResourceTransfer.send(response, request.getSocketAddress());
- } catch (IOException e) {
- log.warning("Error transfering resources: "+e.getMessage());
- }
- }
- } finally {
- sock.close();
- }
- }
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ResourceID.java b/src/main/java/derms/replica/replica1/ResourceID.java
deleted file mode 100644
index 008b766..0000000
--- a/src/main/java/derms/replica/replica1/ResourceID.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.Serializable;
-
-public class ResourceID implements Serializable {
- public String city;
- public short num;
-
- public ResourceID (String city, short num) {
- this.city = city;
- this.num = num;
- }
-
- public ResourceID() {
- this("XXX", (short) 1111);
- }
-
- public static ResourceID parse(String s) throws IllegalArgumentException {
- if (s.length() != City.codeLen+ID.nDigits) {
- throw new IllegalArgumentException("invalid resource ID: "+s);
- }
- try {
- String cityCode = s.substring(0, City.codeLen);
- short num = Short.parseShort(s.substring(City.codeLen));
- return new ResourceID(cityCode, num);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("invalid resource ID: "+e.getMessage());
- }
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || obj.getClass() != this.getClass()) {
- return false;
- }
- ResourceID other = (ResourceID) obj;
- return (this.city.equals(other.city)) && (this.num == other.num);
- }
-
- @Override
- public int hashCode() {
- return city.hashCode() * num;
- }
-
- @Override
- public String toString() {
- return city+num;
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ResourceName.java b/src/main/java/derms/replica/replica1/ResourceName.java
deleted file mode 100644
index de4bc16..0000000
--- a/src/main/java/derms/replica/replica1/ResourceName.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.Serializable;
-
-public enum ResourceName implements Serializable {
- AMBULANCE,
- FIRETRUCK,
- PERSONNEL;
-
- public static ResourceName parse(String s) {
- switch (s) {
- case "AMBULANCE": return ResourceName.AMBULANCE;
- case "FIRETRUCK": return ResourceName.FIRETRUCK;
- case "PERSONNEL": return ResourceName.PERSONNEL;
- }
- throw new IllegalArgumentException("invalid resource name: "+s);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ResourceTransfer.java b/src/main/java/derms/replica/replica1/ResourceTransfer.java
deleted file mode 100644
index fbc695c..0000000
--- a/src/main/java/derms/replica/replica1/ResourceTransfer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-public class ResourceTransfer {
- public static final int bufsize = 1024;
-
- public static void send(Resource[] resources, SocketAddress remoteAddr) throws IOException {
- DatagramSocket sock = new DatagramSocket();
-
- for (Resource resource : resources) {
- DatagramPacket pkt = ObjectPacket.create(resource, remoteAddr);
- sock.send(pkt);
- }
-
- DatagramPacket pkt = ObjectPacket.create(new EndOfTransmission(), remoteAddr);
- sock.send(pkt);
- sock.close();
- }
-
- public static Resource[] receive(DatagramSocket sock) throws IOException {
- List<Resource> resources = new ArrayList<Resource>();
- byte[] buf = new byte[bufsize];
- DatagramPacket response = new DatagramPacket(buf, buf.length);
-
- for (;;) {
- sock.receive(response);
-
- Object obj = ObjectPacket.deserialize(response, Object.class);
- if (obj.getClass() == EndOfTransmission.class) {
- break;
- }
- try {
- resources.add((Resource) obj);
- } catch (Exception e) {
- throw new IOException("expected Resource; got "+obj.getClass().toString());
- }
- }
- Resource[] arr = new Resource[0];
- return resources.toArray(arr);
- }
-
- private static class EndOfTransmission implements Serializable {}
-} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/Resources.java b/src/main/java/derms/replica/replica1/Resources.java
deleted file mode 100644
index a6a8b3a..0000000
--- a/src/main/java/derms/replica/replica1/Resources.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package derms.replica.replica1;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Resources {
- private Map<ResourceName, Map<ResourceID, Resource>> resources;
-
- public Resources() {
- this.resources = new ConcurrentHashMap<ResourceName, Map<ResourceID, Resource>>();
- }
-
- public List<Resource> borrowed(CoordinatorID borrower, ResourceName name) {
- List<Resource> borrowed = new ArrayList<Resource>();
- Resource[] namedResources = getByName(name);
- for (Resource r : namedResources) {
- if (r.isBorrowed && r.borrower.equals(borrower)) {
- borrowed.add(r);
- }
- }
- return borrowed;
- }
-
- public Resource getByID(ResourceID id) throws NoSuchElementException {
- for (Map<ResourceID, Resource> rids : resources.values()) {
- Resource resource = rids.get(id);
- if (resource != null) {
- return resource;
- }
- }
- throw new NoSuchElementException("No such resource "+id);
- }
-
- public Resource[] getByName(ResourceName name) {
- Map<ResourceID, Resource> rids = resources.get(name);
- if (rids == null) {
- return new Resource[0];
- }
- Resource[] r = new Resource[0];
- return rids.values().toArray(r);
- }
-
- public void add(Resource r) {
- Map<ResourceID, Resource> rids;
- synchronized (resources) {
- rids = resources.get(r.name);
- if (rids == null) {
- rids = new ConcurrentHashMap<ResourceID, Resource>();
- resources.put(r.name, rids);
- }
- }
- synchronized (rids) {
- Resource existing = rids.get(r.id);
- if (existing != null) {
- existing.duration += r.duration;
- } else {
- rids.put(r.id, r);
- }
- }
- }
-
- public void removeByID(ResourceID id) throws NoSuchElementException {
- for (Map<ResourceID, Resource> rids : resources.values()) {
- if (rids.containsKey(id)) {
- rids.remove(id);
- return;
- }
- }
- throw new NoSuchElementException("No such resource "+id);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ResponderID.java b/src/main/java/derms/replica/replica1/ResponderID.java
deleted file mode 100644
index 0ccda8a..0000000
--- a/src/main/java/derms/replica/replica1/ResponderID.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package derms.replica.replica1;
-
-public class ResponderID {
- public City city;
- public short num;
-
- public ResponderID(City city, short num) {
- this.city = city;
- this.num = num;
- }
-
- @Override
- public String toString() {
- return ""+city+"R"+num;
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ResponderServer.java b/src/main/java/derms/replica/replica1/ResponderServer.java
deleted file mode 100644
index 2a6b480..0000000
--- a/src/main/java/derms/replica/replica1/ResponderServer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-public class ResponderServer {
- public static final Duration timeout = Duration.ofSeconds(5);
-
- private City city;
- private Resources resources;
- private Servers servers;
- private Logger log;
-
- public ResponderServer(City city, Resources resources, Servers servers) throws IOException {
- this.city = city;
- this.resources = resources;
- this.servers = servers;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- public ResponderServer() throws IOException {
- this(new City(), new Resources(), new Servers());
- }
-
- public void addResource(Resource r) {
- resources.add(r);
- log.info("Added resource "+r+" - success");
- }
-
- public void removeResource(ResourceID rid, int duration) throws NoSuchResourceException {
- log.info("Remove duration "+duration+" from "+rid);
- try {
- Resource resource = resources.getByID(rid);
- synchronized (resource) {
- if (duration < 0 || duration >= resource.duration) {
- resources.removeByID(rid);
- log.info("Removed resource "+rid+" - success");
- } else {
- resource.duration -= duration;
- log.info("Removed duration from resource "+rid+". New duration: "+resource.duration+" - success");
- }
- }
- } catch (NoSuchElementException e) {
- // Wanted to remove duration from resource.
- if (duration >= 0) {
- String msg = "Cannot remove duration from "+rid+": resource does not exist.";
- log.warning(msg);
- throw new NoSuchResourceException(msg);
- }
-
- // Duration is negative---wanted to remove resource completely.
- // Success because it is already removed.
- log.info("Not removing "+rid+": resource does not exist.");
- }
- }
-
- public Resource[] listResourceAvailability(ResourceName rname) throws ServerCommunicationError {
- log.info("Request for available "+rname);
- Collection<Resource> availableResources = ConcurrentHashMap.newKeySet();
- ExecutorService pool = Executors.newFixedThreadPool(servers.size());
- try {
- for (InetAddress serverAddr : servers.all()) {
- pool.execute(new ResourceAvailability.Client(serverAddr, rname, availableResources));
- }
- } catch (IOException e) {
- String msg = "Failed to start ResourceAvailability Client: "+e.getMessage();
- log.severe(msg);
- throw new ServerCommunicationError("ResourceAvailability: "+msg);
- }
-
- log.fine("Started worker threads");
- pool.shutdown();
- boolean terminated;
- try {
- terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new ServerCommunicationError("listResourceAvailability(): listResourceAvailability() interrupted: "+e.getMessage());
- }
- if (!terminated) {
- throw new ServerCommunicationError("ResourceAvailability: request timed out: no response after "+timeout.toString());
- }
- log.info("Response length "+availableResources.size());
- Resource[] arr = new Resource[0];
- return availableResources.toArray(arr);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ReturnResource.java b/src/main/java/derms/replica/replica1/ReturnResource.java
deleted file mode 100644
index e28815f..0000000
--- a/src/main/java/derms/replica/replica1/ReturnResource.java
+++ /dev/null
@@ -1,195 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.*;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Logger;
-
-public class ReturnResource {
- public static final int port = 5559;
- public static final int bufsize = 4096;
-
- public static class Client {
- private CoordinatorID coordinatorID;
- private ResourceID resourceID;
-
- public Client(CoordinatorID cid, ResourceID rid) {
- this.coordinatorID = cid;
- this.resourceID = rid;
- }
-
- public Response sendRequest(InetAddress serverAddr) throws IOException {
- Request request = new Request(coordinatorID, resourceID);
- DatagramSocket sock;
- try {
- sock = new DatagramSocket();
- } catch (Exception e) {
- throw new IOException("Return Resource Client: failed to open socket: "+e.getMessage());
- }
-
- DatagramPacket requestPkt;
- try {
- requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
- } catch (IOException e) {
- sock.close();
- throw new IOException("Return Resource Client: failed to create request: "+e.getMessage());
- }
-
- sock.send(requestPkt);
-
- byte[] buf = new byte[bufsize];
- DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
- try {
- sock.receive(responsePkt);
- } catch (Exception e) {
- sock.close();
- throw new IOException("Return Resource Client: error receiving from server: "+e.getMessage());
- }
-
- try {
- return ObjectPacket.deserialize(responsePkt, Response.class);
- } catch (IOException e) {
- throw new IOException("Return Resource Client: failed to deserialize response: "+e.getMessage());
- } finally {
- sock.close();
- }
- }
- }
-
- public static class Server implements Runnable {
- private InetAddress localAddr;
- private Resources resources;
- private ExecutorService pool;
- private Logger log;
-
- public Server(InetAddress localAddr, Resources resources) throws IOException {
- this.localAddr = localAddr;
- this.resources = resources;
- pool = Executors.newWorkStealingPool();
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- DatagramSocket sock = null;
- try {
- sock = new DatagramSocket(port, localAddr);
- } catch (Exception e) {
- log.severe("Failed to bind socket to "+localAddr.toString()
- +": "+e.getMessage());
- return;
- }
-
- log.info("Listening on "+localAddr.toString()+":"+port);
- DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize);
- try {
- for (;;) {
- try {
- sock.receive(requestPkt);
- } catch (Exception e) {
- log.warning("Error receiving from socket: "+e.getMessage());
- continue;
- }
- log.fine("Got request");
-
- Request request = null;
- try {
- request = ObjectPacket.deserialize(requestPkt, Request.class);
- } catch (IOException e) {
- log.warning("Failed to deserialize request: "+e.getMessage());
- continue;
- }
-
- pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources));
- }
- } finally {
- sock.close();
- }
- }
- }
-
- private static class Request implements Serializable {
- private CoordinatorID coordinatorID;
- private ResourceID resourceID;
-
- private Request(CoordinatorID cid, ResourceID rid) {
- this.coordinatorID = cid;
- this.resourceID = rid;
- }
- }
-
- private static class RequestHandler implements Runnable {
- private Request request;
- private SocketAddress client;
- private Resources resources;
-
- private RequestHandler(Request request, SocketAddress client, Resources resources) {
- this.request = request;
- this.client = client;
- this.resources = resources;
- }
-
- @Override
- public void run() {
- Response response = returnResource();
-
- DatagramSocket sock;
- try {
- sock= new DatagramSocket();
- } catch (Exception e) {
- System.err.println("Request Resource Server: failed to open socket: "+e.getMessage());
- return;
- }
-
- DatagramPacket pkt;
- try {
- pkt = ObjectPacket.create(response, client);
- } catch (IOException e) {
- System.err.println("Request Resource Server: failed to create response packet: "+e.getMessage());
- sock.close();
- return;
- }
- try {
- sock.send(pkt);
- } catch (Exception e) {
- System.err.println("Request Resource Server: failed to send response: "+e.getMessage());
- }
- sock.close();
- }
-
- private Response returnResource() {
- try {
- Resource resource = resources.getByID(request.resourceID);
- synchronized (resource) {
- if (!resource.isBorrowed || !resource.borrower.equals(request.coordinatorID)) {
- return new Response(Response.Status.NOT_BORROWED,
- request.resourceID+" is not borrowed by "+request.coordinatorID);
- }
- resource.isBorrowed = false;
- resource.borrower = new CoordinatorID();
- resource.borrowDuration = -1;
- return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id);
- }
- } catch (NoSuchElementException e) {
- return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID);
- }
- }
- }
-
- public static class Response implements Serializable {
- public Status status;
- public String message;
-
- private Response(Status status, String message) {
- this.status = status;
- this.message = message;
- }
-
- public enum Status {
- SUCCESS, NO_SUCH_RESOURCE, NOT_BORROWED
- }
- }
-}
diff --git a/src/main/java/derms/replica/replica1/ServerCommunicationError.java b/src/main/java/derms/replica/replica1/ServerCommunicationError.java
deleted file mode 100644
index 0f0586b..0000000
--- a/src/main/java/derms/replica/replica1/ServerCommunicationError.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package derms.replica.replica1;
-
-public class ServerCommunicationError extends Exception {
- public ServerCommunicationError(String message) {
- super(message);
- }
-}
diff --git a/src/main/java/derms/replica/replica1/Servers.java b/src/main/java/derms/replica/replica1/Servers.java
deleted file mode 100644
index ed1255a..0000000
--- a/src/main/java/derms/replica/replica1/Servers.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package derms.replica.replica1;
-
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Servers {
- private Map<City, InetAddress> servers = new ConcurrentHashMap<City, InetAddress>();
-
- /** Returns the address of the server located in the specified city, or null if there is no server in the city. */
- public InetAddress get(City city) {
- return servers.get(city);
- }
-
- /**
- * Associates the specified server address with the specified city.
- * If there was already a server associated with the city, the old value is replaced.
- * @param city the city where the server is located
- * @param addr the address of the server
- * @return the previous server address, or null if there was no server associated with this city.
- */
- public InetAddress put(City city, InetAddress addr) {
- return servers.put(city, addr);
- }
-
- public Collection<InetAddress> all() {
- return servers.values();
- }
-
- public int size() {
- return servers.size();
- }
-}
diff --git a/src/main/java/derms/replica/replica1/StationServer.java b/src/main/java/derms/replica/replica1/StationServer.java
deleted file mode 100644
index 6bfc7c2..0000000
--- a/src/main/java/derms/replica/replica1/StationServer.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Logger;
-
-public class StationServer implements Runnable {
- public static final String usage = "Usage: java StationServer <city> <local address>";
- public static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555);
-
- public City city;
- public InetAddress localAddr;
- public Resources resources;
- public Servers servers;
- private Logger log;
- private ResponderServer responderServer;
- private CoordinatorServer coordinatorServer;
-
- public StationServer(City city, InetAddress localAddr) throws IOException {
- this.city = city;
- this.localAddr = localAddr;
- this.resources = new Resources();
- this.servers = new Servers();
- this.log = DermsLogger.getLogger(getClass());
-
- try {
- this.responderServer = new ResponderServer(city, resources, servers);
- } catch (IOException e) {
- throw new IOException("Failed to create ResponderServer: "+e.getMessage());
- }
- log.info("Created ResponderServer");
-
- try {
- this.coordinatorServer = new CoordinatorServer(city, resources, servers);
- } catch (IOException e) {
- throw new IOException("Failed to create CoordinatorServer: "+e.getMessage());
- }
- log.info("Created CoordinatorServer");
- }
-
- public static void main(String cmdlineArgs[]) {
- Args args = null;
- try {
- args = new Args(cmdlineArgs);
- } catch (IllegalArgumentException e) {
- System.err.println(e.getMessage());
- System.out.println(usage);
- System.exit(1);
- }
-
- try {
- (new StationServer(args.city, args.localAddr)).run();
- } catch (Exception e) {
- System.err.println(e.getMessage());
- System.exit(1);
- }
- }
-
- @Override
- public void run() {
- log.info("Running");
- log.config("Local address is "+localAddr.toString());
-
- ExecutorService pool = Executors.newCachedThreadPool();
-
- try {
- pool.execute(new ResourceAvailability.Server(localAddr, resources));
- } catch (IOException e) {
- String msg = "Failed to start ResourceAvailability Server: "+e.getMessage();
- log.severe(msg);
- return;
- }
- try {
- pool.execute(new RequestResource.Server(localAddr, resources));
- } catch (IOException e) {
- log.severe("Failed to start RequestResource Server: "+e.getMessage());
- return;
- }
- try {
- pool.execute(new FindResource.Server(localAddr, resources));
- } catch (IOException e) {
- log.severe("Failed to start FindResource Server: "+e.getMessage());
- return;
- }
- try {
- pool.execute(new ReturnResource.Server(localAddr, resources));
- } catch (IOException e) {
- log.severe("Failed to start ReturnResource Server: "+e.getMessage());
- return;
- }
- try {
- pool.execute(new SwapResource.Server(localAddr, resources, servers));
- } catch (IOException e) {
- log.severe("Failed to start SwapResource Server: "+e.getMessage());
- return;
- }
-
- try {
- pool.execute(new Announcer(announceGroup, localAddr, city));
- } catch (IOException e) {
- log.severe("Failed to start Announcer: "+e.getMessage());
- return;
- }
- try {
- pool.execute(new AnnounceListener(announceGroup, localAddr, servers));
- } catch (IOException e) {
- log.severe("Failed to start AnnounceListener: "+e.getMessage());
- return;
- }
- }
-
- private static class Args {
- private City city;
- private InetAddress localAddr;
-
- private Args(String[] args) throws IllegalArgumentException {
- if (args.length < 1) {
- throw new IllegalArgumentException("Missing argument 'city'");
- }
- city = new City(args[0]);
-
- if (args.length < 2) {
- throw new IllegalArgumentException("Missing argument 'local host'");
- }
- try {
- localAddr = InetAddress.getByName(args[1]);
- } catch (UnknownHostException | SecurityException e) {
- throw new IllegalArgumentException("Bad value of 'local host': "+e.getMessage());
- }
- }
- }
-} \ No newline at end of file
diff --git a/src/main/java/derms/replica/replica1/SwapResource.java b/src/main/java/derms/replica/replica1/SwapResource.java
deleted file mode 100644
index 5c6502c..0000000
--- a/src/main/java/derms/replica/replica1/SwapResource.java
+++ /dev/null
@@ -1,262 +0,0 @@
-package derms.replica.replica1;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.*;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Logger;
-
-public class SwapResource {
- public static final int port = 5560;
- public static final int bufSize = 4096;
-
- public static class Client {
- private CoordinatorID cid;
- private ResourceID oldRID;
- private ResourceID newRID;
-
- public Client(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) {
- this.cid = cid;
- this.oldRID = oldRID;
- this.newRID = newRID;
- }
-
- public Response sendRequest(InetAddress serverAddr) throws IOException {
- Request request = new Request(cid, oldRID, newRID);
- DatagramSocket sock;
- try {
- sock = new DatagramSocket();
- } catch (Exception e) {
- throw new IOException("Swap Resource Client: failed to open socket: "+e.getMessage());
- }
-
- DatagramPacket requestPkt;
- try {
- requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port));
- } catch (IOException e) {
- sock.close();
- throw new IOException("Swap Resource Client: failed to create request: "+e.getMessage());
- }
-
- sock.send(requestPkt);
-
- byte[] buf = new byte[bufSize];
- DatagramPacket responsePkt = new DatagramPacket(buf, buf.length);
- try {
- sock.receive(responsePkt);
- } catch (Exception e) {
- sock.close();
- throw new IOException("Swap Resource Client: error receiving from server: "+e.getMessage());
- }
-
- try {
- return ObjectPacket.deserialize(responsePkt, Response.class);
- } catch (IOException e) {
- throw new IOException("Swap Resource Client: failed to deserialize response: "+e.getMessage());
- } finally {
- sock.close();
- }
- }
- }
-
- public static class Server implements Runnable {
- private InetAddress localAddr;
- private Resources resources;
- private Servers servers;
- private ExecutorService pool;
- private Logger log;
-
- public Server(InetAddress localAddr, Resources resources, Servers servers) throws IOException {
- this.localAddr = localAddr;
- this.resources = resources;
- this.servers = servers;
- pool = Executors.newWorkStealingPool();
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- DatagramSocket sock = null;
- try {
- sock = new DatagramSocket(port, localAddr);
- } catch (Exception e) {
- log.severe("Failed to bind socket to "+localAddr+": "+e.getMessage());
- return;
- }
- log.info("Listening on "+localAddr+":"+port);
-
- DatagramPacket requestPkt = new DatagramPacket(new byte[bufSize], bufSize);
-
- try {
- for (;;) {
- try {
- sock.receive(requestPkt);
- } catch (Exception e) {
- log.warning("Error receiving from socket: "+e.getMessage());
- continue;
- }
- log.fine("Got request");
-
- Request request = null;
- try {
- request = ObjectPacket.deserialize(requestPkt, Request.class);
- } catch (IOException e) {
- log.warning("Failed to deserialize request: "+e.getMessage());
- continue;
- }
-
- SocketAddress client = requestPkt.getSocketAddress();
- try {
- RequestHandler handler = new RequestHandler(request, client, resources, servers);
- pool.execute(handler);
- } catch (IOException e) {
- log.warning("Failed to create request handler: "+e.getMessage());
- continue;
- }
- }
- } finally {
- sock.close();
- }
- }
- }
-
- private static class Request implements Serializable {
- private CoordinatorID cid;
- private ResourceID oldRID;
- private ResourceID newRID;
-
- private Request(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) {
- this.cid = cid;
- this.oldRID = oldRID;
- this.newRID = newRID;
- }
- }
-
- private static class RequestHandler implements Runnable {
- private Request request;
- private SocketAddress client;
- private Resources resources;
- private Servers servers;
- private Logger log;
-
- private RequestHandler(Request request, SocketAddress client, Resources resources, Servers servers) throws IOException {
- this.request = request;
- this.client = client;
- this.resources = resources;
- this.servers = servers;
- this.log = DermsLogger.getLogger(this.getClass());
- }
-
- @Override
- public void run() {
- Response response = swapResources();
-
- DatagramSocket sock;
- try {
- sock = new DatagramSocket();
- } catch (Exception e) {
- log.severe("failed to open socket: "+e.getMessage());
- return;
- }
-
- DatagramPacket pkt;
- try {
- pkt = ObjectPacket.create(response, client);
- } catch (IOException e) {
- log.severe("failed to create response: "+e.getMessage());
- sock.close();
- return;
- }
-
- try {
- sock.send(pkt);
- } catch (Exception e) {
- log.severe("failed to send response: "+e.getMessage());
- } finally {
- sock.close();
- }
- }
-
- private Response swapResources() {
- try {
- Resource resource = resources.getByID(request.oldRID);
- synchronized (resource) {
- if (!resource.borrower.equals(request.cid)) {
- return new Response(Response.Status.NOT_BORROWED, "resource "+request.oldRID+" not borrowed by "+request.cid);
- }
- try {
- acquireNewResource(resource.borrowDuration);
- returnOldResource(resource);
- return new Response(Response.Status.SUCCESS, request.cid+" success fully swapped "+request.oldRID+" for "+request.newRID);
- } catch (UnknownHostException e) {
- return new Response(Response.Status.UNKNOWN_HOST, e.getMessage());
- } catch (IOException e) {
- return new Response(Response.Status.FAILURE, e.getMessage());
- } catch (CannotBorrow e) {
- return new Response(Response.Status.CANNOT_BORROW, e.getMessage());
- }
- }
- } catch (NoSuchElementException e) {
- return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.oldRID);
- }
- }
-
- private void acquireNewResource(int borrowDuration) throws UnknownHostException, IOException, CannotBorrow {
- RequestResource.Client requestClient = new RequestResource.Client(request.cid, request.newRID, borrowDuration);
- City city = new City(request.newRID.city);
- InetAddress requestResourceServer = servers.get(city);
- if (requestResourceServer == null) {
- throw new UnknownHostException(city.toString());
- }
- RequestResource.Response response = requestClient.sendRequest(requestResourceServer);
- // TODO: make exception handling more granular---pass through status from Request.
- if (response.status != RequestResource.Response.Status.SUCCESS) {
- throw new CannotBorrow(request.cid, request.newRID, response.message);
- }
- }
-
- private void returnOldResource(Resource r) {
- r.isBorrowed = false;
- r.borrower = new CoordinatorID();
- r.borrowDuration = -1;
- }
- }
-
- public static class Response implements Serializable {
- public Status status;
- public String message;
-
- private Response(Status status, String message) {
- this.status = status;
- this.message = message;
- }
-
- public enum Status {
- SUCCESS,
- FAILURE,
- NO_SUCH_RESOURCE,
- NOT_BORROWED,
- CANNOT_BORROW,
- UNKNOWN_HOST
- }
- }
-
- private static class CannotBorrow extends Exception {
- CoordinatorID attemptedBorrower;
- ResourceID rid;
- String message;
-
- private CannotBorrow(CoordinatorID attemptedBorrower, ResourceID rid, String message) {
- this.attemptedBorrower = attemptedBorrower;
- this.rid = rid;
- this.message = message;
- }
-
- @Override
- public String getMessage() {
- return attemptedBorrower+" failed to borrow "+rid+": "+message;
- }
- }
-} \ No newline at end of file