diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-23 13:51:12 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-23 13:51:12 -0500 |
| commit | 3c62b863509131e78c18ed13c6b83e4fc508848f (patch) | |
| tree | cf246e91da283edf704af936b9cef7eb8e09a6f8 | |
| parent | e3b72053e8b04f2df013da0d7d49fe33927461a9 (diff) | |
| download | soen423-3c62b863509131e78c18ed13c6b83e4fc508848f.zip | |
import replica code from assignment
37 files changed, 2641 insertions, 0 deletions
@@ -21,3 +21,4 @@ sequencer? test reliable multicast test total-order multicast test reliable unicast +adapt replica code from assignment diff --git a/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java b/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java new file mode 100644 index 0000000..634f154 --- /dev/null +++ b/src/main/java/derms/replica/replica1/AlreadyBorrowedException.java @@ -0,0 +1,7 @@ +package derms.replica.replica1; + +public class AlreadyBorrowedException extends Exception { + public AlreadyBorrowedException(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica1/AnnounceListener.java b/src/main/java/derms/replica/replica1/AnnounceListener.java new file mode 100644 index 0000000..bd58884 --- /dev/null +++ b/src/main/java/derms/replica/replica1/AnnounceListener.java @@ -0,0 +1,85 @@ +package derms.replica.replica1; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.util.logging.Logger; + +public class AnnounceListener implements Runnable { + private static final int bufsize = 1024; + + private InetSocketAddress groupAddr; + private InetAddress localAddr; + private Servers servers; + private Logger log; + + public AnnounceListener(InetSocketAddress groupAddr, InetAddress localAddr, Servers servers) throws IOException { + this.groupAddr = groupAddr; + this.localAddr = localAddr; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + NetworkInterface netInterface = null; + try { + netInterface = NetworkInterface.getByInetAddress(localAddr); + if (netInterface == null) { + throw new Exception("netInterface is null"); + } + } catch (Exception e) { + log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + MulticastSocket sock = null; + try { + sock = new MulticastSocket(groupAddr.getPort()); + sock.joinGroup(groupAddr, netInterface); + } catch (Exception e) { + log.severe("Failed to open multicast socket: "+e.getMessage()); + return; + } + + log.info("Listening to "+groupAddr.toString()+" from "+localAddr.toString()+" ("+netInterface.getName()+")"); + byte[] buf = new byte[bufsize]; + DatagramPacket pkt = new DatagramPacket(buf, buf.length); + + for (;;) { + try { + sock.receive(pkt); + } catch (Exception e) { + log.warning("Error receiving from multicast socket: "+e.getMessage()); + continue; + } + + ObjectInputStream objStream; + try { + objStream = new ObjectInputStream( + new ByteArrayInputStream(pkt.getData())); + } catch (IOException e) { + log.warning("Failed to create input stream: "+e.getMessage()); + continue; + } + + City city; + try { + city = (City) objStream.readObject(); + } catch (Exception e) { + log.warning("Failed to deserialize data: "+e.getMessage()); + continue; + } + + InetAddress remote = pkt.getAddress(); + if (servers.put(city, remote) == null) { + log.info("Added remote server "+city.toString()+" "+remote.toString()); + } + } + } +}
\ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/Announcer.java b/src/main/java/derms/replica/replica1/Announcer.java new file mode 100644 index 0000000..e9dedc4 --- /dev/null +++ b/src/main/java/derms/replica/replica1/Announcer.java @@ -0,0 +1,73 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketAddress; +import java.util.logging.Logger; + +public class Announcer implements Runnable { + public static final long intervalMillis = 3000; + + private SocketAddress group; + private InetAddress localAddr; + private City city; + private Logger log; + + public Announcer(SocketAddress group, InetAddress localAddr, City city) throws IOException { + this.group = group; + this.localAddr = localAddr; + this.city = city; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + NetworkInterface netInterface = null; + try { + netInterface = NetworkInterface.getByInetAddress(localAddr); + if (netInterface == null) { + throw new Exception("netInterface is null"); + } + } catch (Exception e) { + log.severe("Failed to get network interface bound to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + MulticastSocket sock = null; + try { + sock = new MulticastSocket(); + sock.joinGroup(group, netInterface); + } catch (Exception e) { + log.severe("Failed to open multicast socket: "+e.getMessage()); + return; + } + + log.info("Announcing from "+localAddr.toString()+" ("+netInterface.getName()+") to "+group.toString()); + + DatagramPacket pkt = null; + try { + pkt = ObjectPacket.create(city, group); + } catch (IOException e) { + log.severe("Failed to create packet: "+e.getMessage()); + sock.close(); + return; + } + + try { + for (;;) { + sock.send(pkt); + Thread.sleep(intervalMillis); + } + } catch (IOException e) { + log.severe("Failed to send to multicast socket "+group.toString()+": "+e.getMessage()); + } catch (InterruptedException e) { + log.info("Interrupted."); + } finally { + log.info("Shutting down..."); + sock.close(); + } + } +}
\ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/CLI.java b/src/main/java/derms/replica/replica1/CLI.java new file mode 100644 index 0000000..88e166a --- /dev/null +++ b/src/main/java/derms/replica/replica1/CLI.java @@ -0,0 +1,91 @@ +package derms.replica.replica1; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Scanner; + +public abstract class CLI implements Runnable { + protected Map<String, Command> commands = new HashMap<String, Command>(); + protected List<Description> cmdDescriptions = new ArrayList<Description>(); + protected List<Description> argDescriptions = new ArrayList<Description>(); + + protected CLI() { + commands.put("quit", new Quit()); + cmdDescriptions.add(new Description("quit", "Exit the program")); + + commands.put("help", new Help()); + cmdDescriptions.add(new Description("help", "List commands")); + } + + @Override + public void run() { + Scanner scanner = new Scanner(System.in); + System.out.println("Type 'help' for a list of commands."); + for (;;) { + System.out.print("Command: "); + String input = scanner.nextLine(); + String[] fields = input.split(" "); + if (fields.length < 1 || fields[0] == "") { + continue; + } + Command cmd = commands.get(fields[0]); + if (cmd == null) { + System.out.println("Invalid command '"+fields[0]+"'"); + System.out.println("Type 'help' for a list of commands."); + continue; + } + String[] args = null; + if (fields.length < 2) { + args = new String[0]; + } else { + args = Arrays.copyOfRange(fields, 1, fields.length); + } + cmd.exec(args); + } + } + + protected interface Command { + public void exec(String[] args); + } + + protected class Quit implements Command { + @Override + public void exec(String[] args) { + System.out.println("Shutting down..."); + System.exit(1); + } + } + + protected class Help implements Command { + @Override + public void exec(String[] args) { + System.out.println("\nCommands:"); + for (Description d : cmdDescriptions) { + System.out.println(d); + } + System.out.println("\nArguments:"); + for (Description d : argDescriptions) { + System.out.println(d); + } + System.out.println(); + } + } + + protected class Description { + String object; /// The thing being described + String description; + + protected Description(String object, String description) { + this.object = object; + this.description = description; + } + + @Override + public String toString() { + return object+"\n\t"+description; + } + } +} diff --git a/src/main/java/derms/replica/replica1/City.java b/src/main/java/derms/replica/replica1/City.java new file mode 100644 index 0000000..548f1fa --- /dev/null +++ b/src/main/java/derms/replica/replica1/City.java @@ -0,0 +1,38 @@ +package derms.replica.replica1; + +import java.io.Serializable; + +public class City implements Serializable { + public static final int codeLen = 3; + + private String code; + + public City(String code) throws IllegalArgumentException { + if (code.length() != codeLen) + throw new IllegalArgumentException("Invalid city: "+code+"; must be "+codeLen+" letters"); + this.code = code; + } + + public City() { + this("XXX"); + } + + @Override + public String toString() { + return code; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || this.getClass() != obj.getClass()) { + return false; + } + City other = (City) obj; + return this.code.equals(other.code); + } + + @Override + public int hashCode() { + return code.hashCode(); + } +} diff --git a/src/main/java/derms/replica/replica1/Client.java b/src/main/java/derms/replica/replica1/Client.java new file mode 100644 index 0000000..7ebbb68 --- /dev/null +++ b/src/main/java/derms/replica/replica1/Client.java @@ -0,0 +1,30 @@ +package derms.replica.replica1; + +import javax.xml.namespace.QName; +import javax.xml.ws.Service; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; + +public abstract class Client<I, C> { + public static final String namespace = "derms.samanthony.xyz"; + public static final int port = 8080; + + private final Class<I> endpointInterface; + private final QName qname; + + protected Client(Class<I> endpointInterface, Class<C> endpointClass) { + this.endpointInterface = endpointInterface; + this.qname = new QName("http://"+namespace+"/", endpointClass.getSimpleName()+"Service"); + } + + protected I connect(String host) throws MalformedURLException { + URL url = new URL("http://"+host+":"+port+"/"+endpointInterface.getSimpleName()+"?wsdl"); + return Service.create(url, qname).getPort(endpointInterface); + } + + protected I connect(City city) throws UnknownHostException, MalformedURLException { + String host = Hosts.get(city); + return connect(host); + } +} diff --git a/src/main/java/derms/replica/replica1/Coordinator.java b/src/main/java/derms/replica/replica1/Coordinator.java new file mode 100644 index 0000000..b963d58 --- /dev/null +++ b/src/main/java/derms/replica/replica1/Coordinator.java @@ -0,0 +1,22 @@ +package derms.replica.replica1; + +import javax.jws.WebService; +import javax.jws.soap.SOAPBinding; + +@WebService +@SOAPBinding(style = SOAPBinding.Style.RPC) +public interface Coordinator { + void requestResource(CoordinatorID cid, ResourceID rid, int duration) + throws ServerCommunicationError, NoSuchResourceException, + AlreadyBorrowedException, InvalidDurationException; + + Resource[] findResource(CoordinatorID cid, ResourceName rname) + throws ServerCommunicationError; + + void returnResource(CoordinatorID cid, ResourceID rid) + throws ServerCommunicationError, NoSuchResourceException, + NotBorrowedException; + + void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) + throws ServerCommunicationError, NoSuchResourceException; +} diff --git a/src/main/java/derms/replica/replica1/CoordinatorClient.java b/src/main/java/derms/replica/replica1/CoordinatorClient.java new file mode 100644 index 0000000..6016c31 --- /dev/null +++ b/src/main/java/derms/replica/replica1/CoordinatorClient.java @@ -0,0 +1,40 @@ +package derms.replica.replica1; + +import java.net.MalformedURLException; +import java.net.UnknownHostException; + +public class CoordinatorClient extends Client<Coordinator, CoordinatorServer> { + public CoordinatorID id; + public Coordinator server; + + public CoordinatorClient(CoordinatorID id) throws UnknownHostException, MalformedURLException { + super(Coordinator.class, CoordinatorServer.class); + this.id = id; + this.server = connect(new City(id.city)); + } + + public CoordinatorClient(City city, short idNum) throws UnknownHostException, MalformedURLException { + this(new CoordinatorID(city.toString(), idNum)); + } + + public void requestResource(ResourceID resourceID, int duration) + throws ServerCommunicationError, NoSuchResourceException, + AlreadyBorrowedException, InvalidDurationException + { + server.requestResource(id, resourceID, duration); + } + + public void returnResource(ResourceID resourceID) + throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException + { + server.returnResource(id, resourceID); + } + + public Resource[] findResource(ResourceName name) throws ServerCommunicationError { + return server.findResource(id, name); + } + + public void swapResource(ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException { + server.swapResource(id, oldRID, newRID); + } +}
\ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/CoordinatorClientCLI.java b/src/main/java/derms/replica/replica1/CoordinatorClientCLI.java new file mode 100644 index 0000000..c1ac48a --- /dev/null +++ b/src/main/java/derms/replica/replica1/CoordinatorClientCLI.java @@ -0,0 +1,212 @@ +package derms.replica.replica1; + +import java.net.MalformedURLException; +import java.net.UnknownHostException; + +public class CoordinatorClientCLI extends CLI { + public static final String usage = "Usage: java CoordinatorClient <city> <id number>"; + + private final CoordinatorClient client; + + private CoordinatorClientCLI(City city, short idNum) throws UnknownHostException, MalformedURLException { + client = new CoordinatorClient(city, idNum); + System.out.println("ID: "+client.id); + + commands.put("request", new Request()); + cmdDescriptions.add(new Description( + "request <resource ID> <duration>", + "Borrow a resource and reduce its duration.")); + + commands.put("find", new Find()); + cmdDescriptions.add(new Description( + "find <resource name>", + "List borrowed resources.")); + + commands.put("return", new Return()); + cmdDescriptions.add(new Description( + "return <resourceID>", + "Return a currently borrowed resource.")); + + commands.put("swap", new Swap()); + cmdDescriptions.add(new Description( + "swap <old resourceID> <new resourceID>", + "Return the old resource and borrow the new one.")); + + argDescriptions.add(new Description( + "<resource ID>", + "3-letter city code followed by 4-digit number.")); + argDescriptions.add(new Description( + "<resource name>", + "E.g., AMBULANCE.")); + argDescriptions.add(new Description( + "<duration>", + "A number representing a time period.")); + } + + public static void main(String[] cmdlineArgs) { + Args args = null; + try { + args = new Args(cmdlineArgs); + } catch (IllegalArgumentException e) { + System.err.println(e.getMessage()); + System.err.println(usage); + System.exit(1); + } + + try { + (new CoordinatorClientCLI(args.city, args.idNum)).run(); + } catch (UnknownHostException | MalformedURLException e) { + System.err.println(e.getMessage()); + System.exit(1); + } + } + + private class Request implements Command { + public void exec(String[] args) { + if (args.length < 2) { + System.out.println("invalid arguments for 'request'"); + } else { + requestResource(args[0], args[1]); + } + } + } + + private void requestResource(String resourceIDStr, String durationStr) { + ResourceID resourceID; + try { + resourceID = ResourceID.parse(resourceIDStr); + } catch (IllegalArgumentException e) { + System.out.println(e.getMessage()); + return; + } + + int duration; + try { + duration = Integer.parseInt(durationStr); + if (duration < 0) { + throw new IllegalArgumentException("invalid duration: "+durationStr); + } + } catch (Exception e) { + System.out.println(e.getMessage()); + return; + } + + try { + client.requestResource(resourceID, duration); + System.out.println("Successfully borrowed resource."); + } catch (ServerCommunicationError | NoSuchResourceException | AlreadyBorrowedException | InvalidDurationException e) { + System.out.println("Failed to borrow resource: "+e.getMessage()); + } + } + + private class Return implements Command { + public void exec(String[] args) { + if (args.length < 1) { + System.out.println("invalid arguments for 'return'"); + } else { + returnResource(args[0]); + } + } + } + + private void returnResource(String resourceIDStr) { + ResourceID resourceID; + try { + resourceID = ResourceID.parse(resourceIDStr); + } catch (IllegalArgumentException e) { + System.out.println("Invalid resource ID: "+e.getMessage()); + return; + } + + try { + client.returnResource(resourceID); + System.out.println("Successfully returned resource "+resourceIDStr); + } catch (ServerCommunicationError | NoSuchResourceException | NotBorrowedException e) { + System.out.println("Failed to return resource: "+e.getMessage()); + } + } + + private class Find implements Command { + public void exec(String[] args) { + if (args.length < 1) { + System.out.println("invalid arguments for 'find'"); + } else { + findResource(args[0]); + } + } + } + + private void findResource(String resourceNameStr) { + ResourceName resourceName; + try { + resourceName = ResourceName.parse(resourceNameStr); + } catch (Exception e) { + System.out.println("Invalid resource name: "+resourceNameStr); + return; + } + + Resource[] resources; + try { + resources = client.findResource(resourceName); + } catch (ServerCommunicationError e) { + System.out.println("Failed to find resource "+resourceName+": "+e.getMessage()); + return; + } + + System.out.println("Borrowed "+resourceNameStr+" resources:"); + for (Resource r : resources) { + String rid = r.id.toString(); + System.out.println("\t"+rid+" "+r.borrowDuration); + } + } + + private class Swap implements Command { + public void exec(String[] args) { + if (args.length < 2) { + System.out.println("invalid arguments for 'swap'"); + } else { + swapResource(args[0], args[1]); + } + } + } + + private void swapResource(String oldRIDStr, String newRIDStr) { + ResourceID oldRID; + ResourceID newRID; + try { + oldRID = ResourceID.parse(oldRIDStr); + newRID = ResourceID.parse(newRIDStr); + } catch (IllegalArgumentException e) { + System.out.println("Invalid resource id"); + return; + } + + try { + client.swapResource(oldRID, newRID); + System.out.println("Successfully swapped "+oldRID+" for "+newRID); + } catch (ServerCommunicationError | NoSuchResourceException e) { + System.out.println("Failed to swap resources: "+e.getMessage()); + } + } + + private static class Args { + private final City city; + private final short idNum; + + private Args(String[] args) throws IllegalArgumentException { + if (args.length < 1) { + throw new IllegalArgumentException("Missing argument 'city'"); + } + city = new City(args[0]); + + if (args.length < 2) { + throw new IllegalArgumentException("Missing argument 'id number'"); + } + try { + idNum = Short.parseShort(args[1]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Bad value of 'id number'"); + } + } + } +} diff --git a/src/main/java/derms/replica/replica1/CoordinatorID.java b/src/main/java/derms/replica/replica1/CoordinatorID.java new file mode 100644 index 0000000..c09124c --- /dev/null +++ b/src/main/java/derms/replica/replica1/CoordinatorID.java @@ -0,0 +1,34 @@ +package derms.replica.replica1; + +import java.io.Serializable; + +public class CoordinatorID implements Serializable { + public String city; + public short num; + + public CoordinatorID(String city, short num) { + this.city = city; + this.num = num; + } + + public CoordinatorID(String city, int num) { + this(city, (short) num); + } + + public CoordinatorID() { + this("XXX", 0); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() != this.getClass()) + return false; + CoordinatorID other = (CoordinatorID) obj; + return other.city.equals(this.city) && other.num == this.num; + } + + @Override + public String toString() { + return city+"C"+num; + } +} diff --git a/src/main/java/derms/replica/replica1/CoordinatorServer.java b/src/main/java/derms/replica/replica1/CoordinatorServer.java new file mode 100644 index 0000000..ce1cc9e --- /dev/null +++ b/src/main/java/derms/replica/replica1/CoordinatorServer.java @@ -0,0 +1,176 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import javax.jws.WebService; + +@WebService(endpointInterface = "xyz.samanthony.derms.Coordinator") +public class CoordinatorServer implements Coordinator { + public static final Duration timeout = Duration.ofSeconds(5); + + private City city; + private Resources resources; + private Servers servers; + private Logger log; + + public CoordinatorServer(City city, Resources resources, Servers servers) throws IOException { + super(); + this.city = city; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + public CoordinatorServer() throws IOException { + this(new City(), new Resources(), new Servers()); + } + + @Override + public void requestResource(CoordinatorID cid, ResourceID rid, int duration) + throws ServerCommunicationError, NoSuchResourceException, + AlreadyBorrowedException, InvalidDurationException + { + log.info("Request for "+rid+" from "+cid); + + InetAddress server = servers.get(new City(rid.city)); + if (server == null) { + throw new ServerCommunicationError("requestResource(): no connection to server "+rid.city.toString()); + } + + RequestResource.Client client = new RequestResource.Client(cid, rid, duration); + RequestResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + throw new ServerCommunicationError("requestResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info("Request "+rid+" from "+cid+" - success"); + break; + case NO_SUCH_RESOURCE: + log.warning(response.message); + throw new NoSuchResourceException(response.message); + case ALREADY_BORROWED: + log.warning(response.message); + throw new AlreadyBorrowedException(response.message); + case INVALID_DURATION: + log.warning(response.message); + throw new InvalidDurationException(response.message); + default: + log.warning("Unsuccessful response from server: "+response.message); + throw new ServerCommunicationError("requestResource(): failed to borrow resource: "+response.message); + } + } + + @Override + public Resource[] findResource(CoordinatorID cid, ResourceName rname) throws ServerCommunicationError { + log.info("Find Resource "+rname+" from "+cid); + FindResource.Request request = new FindResource.Request(cid, rname); + Collection<Resource> response = ConcurrentHashMap.newKeySet(); + ExecutorService pool = Executors.newFixedThreadPool(servers.size()); + try { + for (InetAddress server : servers.all()) { + pool.execute(new FindResource.Client(request, server, response)); + } + } catch (IOException e) { + String msg = "Failed to start FindResource Client: "+e.getMessage(); + log.severe(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + log.fine("Started worker threads"); + pool.shutdown(); + boolean terminated; + try { + terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + String msg = "findResource() interrupted: "+e.getMessage(); + log.warning(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + if (!terminated) { + String msg = "Request timed out: no response after "+timeout.toString(); + log.warning(msg); + throw new ServerCommunicationError("findResource(): "+msg); + } + Resource[] arr = new Resource[0]; + arr = response.toArray(arr); + log.info("Find resource "+rname+" from "+cid+" - success. Response length: "+arr.length); + return arr; + } + + @Override + public void returnResource(CoordinatorID cid, ResourceID rid) + throws ServerCommunicationError, NoSuchResourceException, NotBorrowedException + { + log.info("Return resource "+rid+" from "+cid); + InetAddress server = servers.get(new City(rid.city)); + if (server == null) { + String msg = "no connection to server "+rid.city; + log.warning(msg); + throw new ServerCommunicationError("returnResource(): "+msg); + } + log.fine("server address: "+server); + + ReturnResource.Client client = new ReturnResource.Client(cid, rid); + ReturnResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + log.warning(e.getMessage()); + throw new ServerCommunicationError("returnResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info(cid+" return "+rid+" - success"); + break; + case NO_SUCH_RESOURCE: + log.warning(response.message); + throw new NoSuchResourceException(response.message); + case NOT_BORROWED: + log.warning(response.message); + throw new NotBorrowedException(response.message); + default: + String msg = "Failed to return resource: "+response.message; + log.warning(msg); + throw new ServerCommunicationError("returnResource(): "+msg); + } + } + + @Override + public void swapResource(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) throws ServerCommunicationError, NoSuchResourceException { + log.info(cid+": swap "+oldRID+", "+newRID); + + InetAddress server = servers.get(new City(oldRID.city)); + if (server == null) { + String msg = "no connection to server "+oldRID.city; + log.warning(msg); + throw new ServerCommunicationError("swapResource(): "+msg); + } + log.fine("server address: "+server); + + SwapResource.Client client = new SwapResource.Client(cid, oldRID, newRID); + SwapResource.Response response; + try { + response = client.sendRequest(server); + } catch (IOException e) { + throw new ServerCommunicationError("swapResource(): "+e.getMessage()); + } + switch (response.status) { + case SUCCESS: + log.info(cid+": swap "+oldRID+", "+newRID+" - success"); + break; + case NO_SUCH_RESOURCE: + throw new NoSuchResourceException(response.message); + default: + throw new ServerCommunicationError("swapResource(): "+response.message); + } + } +} diff --git a/src/main/java/derms/replica/replica1/DermsLogger.java b/src/main/java/derms/replica/replica1/DermsLogger.java new file mode 100644 index 0000000..9ff8249 --- /dev/null +++ b/src/main/java/derms/replica/replica1/DermsLogger.java @@ -0,0 +1,23 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.util.logging.FileHandler; +import java.util.logging.Handler; +import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; + +public class DermsLogger { + public static final String logFile = "server.log"; + + private static Logger log = null; + + public static Logger getLogger(Class clazz) throws IOException { + if (log == null) { + log = Logger.getLogger(clazz.getName()); + Handler fileHandler = new FileHandler(logFile); + fileHandler.setFormatter(new SimpleFormatter()); + log.addHandler(fileHandler); + } + return log; + } +} diff --git a/src/main/java/derms/replica/replica1/FindResource.java b/src/main/java/derms/replica/replica1/FindResource.java new file mode 100644 index 0000000..7104c4c --- /dev/null +++ b/src/main/java/derms/replica/replica1/FindResource.java @@ -0,0 +1,164 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.List; +import java.util.logging.Logger; + +public class FindResource { + public static final int port = 5558; + + public static class Client implements Runnable { + private InetAddress serverAddr; + private Request request; + private Collection<Resource> response; + private Logger log; + + public Client(Request request, InetAddress serverAddr, Collection<Resource> response) throws IOException { + this.serverAddr = serverAddr; + this.request = request; + this.response = response; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + log.severe("Failed to create request packet: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(requestPkt); + } catch (Exception e) { + log.severe("Failed to send request: "+e.getMessage()); + sock.close(); + return; + } + + Resource[] resources; + try { + resources = ResourceTransfer.receive(sock); + } catch (IOException e) { + log.severe(e.getMessage()); + return; + } finally { + sock.close(); + } + + for (Resource r : resources) { + response.add(r); + } + } + } + + public static class Server implements Runnable { + private static final int bufsize = 4096; + + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket: "+e.getMessage()); + return; + } + + log.info("Running on "+localAddr.toString()+":"+port); + + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.info("Got request"); + + Request request; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); + } + } finally { + sock.close(); + } + } + } + + public static class Request implements Serializable { + private CoordinatorID cid; + private ResourceName rname; + + public Request(CoordinatorID cid, ResourceName rname) { + this.cid = cid; + this.rname = rname; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { + this.request = request; + this.client = client; + this.resources = resources; + this.log = log; + } + + @Override + public void run() { + List<Resource> borrowedResources = resources.borrowed(request.cid, request.rname); + log.info(""+borrowedResources.size()+" "+request.rname+" resources borrowed by "+request.cid); + try { + Resource[] arr = new Resource[0]; + ResourceTransfer.send(borrowedResources.toArray(arr), client); + } catch (IOException e) { + log.severe("Failed to send response: "+e.getMessage()); + } + } + } +}
\ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/Hosts.java b/src/main/java/derms/replica/replica1/Hosts.java new file mode 100644 index 0000000..fed377e --- /dev/null +++ b/src/main/java/derms/replica/replica1/Hosts.java @@ -0,0 +1,27 @@ +package derms.replica.replica1; + +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; + +public class Hosts { + private static Map<City, String> hosts = null; + + public static String get(City city) throws UnknownHostException { + if (hosts == null) + init(); + + String host = hosts.get(city); + if (host == null) + throw new UnknownHostException("unknown host: "+city); + return host; + } + + private static void init() { + hosts = new HashMap<City, String>(); + hosts.put(new City("MTL"), "alpine1"); + hosts.put(new City("QUE"), "alpine2"); + hosts.put(new City("SHE"), "alpine3"); + } +} diff --git a/src/main/java/derms/replica/replica1/ID.java b/src/main/java/derms/replica/replica1/ID.java new file mode 100644 index 0000000..8d9f9aa --- /dev/null +++ b/src/main/java/derms/replica/replica1/ID.java @@ -0,0 +1,5 @@ +package derms.replica.replica1; + +public class ID { + public static final int nDigits = 4; +} diff --git a/src/main/java/derms/replica/replica1/InvalidDurationException.java b/src/main/java/derms/replica/replica1/InvalidDurationException.java new file mode 100644 index 0000000..ba3c75a --- /dev/null +++ b/src/main/java/derms/replica/replica1/InvalidDurationException.java @@ -0,0 +1,7 @@ +package derms.replica.replica1; + +public class InvalidDurationException extends Exception { + public InvalidDurationException (String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica1/NoSuchResourceException.java b/src/main/java/derms/replica/replica1/NoSuchResourceException.java new file mode 100644 index 0000000..3979570 --- /dev/null +++ b/src/main/java/derms/replica/replica1/NoSuchResourceException.java @@ -0,0 +1,7 @@ +package derms.replica.replica1; + +public class NoSuchResourceException extends Exception { + public NoSuchResourceException(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica1/NotBorrowedException.java b/src/main/java/derms/replica/replica1/NotBorrowedException.java new file mode 100644 index 0000000..5c0257a --- /dev/null +++ b/src/main/java/derms/replica/replica1/NotBorrowedException.java @@ -0,0 +1,7 @@ +package derms.replica.replica1; + +public class NotBorrowedException extends Exception { + public NotBorrowedException (String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica1/ObjectPacket.java b/src/main/java/derms/replica/replica1/ObjectPacket.java new file mode 100644 index 0000000..28010c0 --- /dev/null +++ b/src/main/java/derms/replica/replica1/ObjectPacket.java @@ -0,0 +1,38 @@ +package derms.replica.replica1; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.SocketAddress; + +public class ObjectPacket { + public static DatagramPacket create(Serializable obj, SocketAddress remoteAddr) throws IOException { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ObjectOutputStream objStream = new ObjectOutputStream(byteStream); + objStream.writeObject(obj); + objStream.flush(); + byte[] buf = byteStream.toByteArray(); + objStream.close(); + return new DatagramPacket(buf, buf.length, remoteAddr); + } + + public static <E> E deserialize(DatagramPacket pkt, Class<E> clazz) throws IOException { + ObjectInputStream objectStream; + try { + objectStream = new ObjectInputStream( + new ByteArrayInputStream(pkt.getData())); + } catch (Exception e) { + throw new IOException("failed to create input stream: "+e.getMessage()); + } + + try { + return clazz.cast(objectStream.readObject()); + } catch (Exception e) { + throw new IOException(e.getMessage()); + } + } +}
\ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/RequestResource.java b/src/main/java/derms/replica/replica1/RequestResource.java new file mode 100644 index 0000000..f7fa4ab --- /dev/null +++ b/src/main/java/derms/replica/replica1/RequestResource.java @@ -0,0 +1,227 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; +import java.util.NoSuchElementException; + +public class RequestResource { + public static final int port = 5557; + public static final int bufsize = 4096; + + public static class Client { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + public Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) { + this.coordinatorID = coordinatorID; + this.resourceID = resourceID; + this.duration = duration; + } + + public Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(coordinatorID, resourceID, duration); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Request Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufsize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + public static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString() + +": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.info("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + private Request(CoordinatorID cid, ResourceID rid, int duration) { + this.coordinatorID = cid; + this.resourceID = rid; + this.duration = duration; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { + this.request = request; + this.client = client; + this.resources = resources; + this.log = log; + } + + @Override + public void run() { + Response response = borrow(); + + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + log.severe("Failed to create response packet: "+e.getMessage()); + sock.close(); + return; + } + try { + sock.send(pkt); + } catch (Exception e) { + log.severe("Failed to send response: "+e.getMessage()); + } + sock.close(); + } + + private Response borrow() { + try { + Resource resource = resources.getByID(request.resourceID); + synchronized (resource) { + if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) { + return new Response(Response.Status.ALREADY_BORROWED, + request.coordinatorID+" cannot borrow "+request.resourceID + +"; already borrowed by "+resource.borrower); + } else if (request.duration <= 0) { + return new Response(Response.Status.INVALID_DURATION, + "duration "+request.duration+" less than 1"); + } else if (request.duration > resource.duration) { + return new Response(Response.Status.INVALID_DURATION, + "cannot borrow "+resource.id+" for duration of "+request.duration + +"; only "+resource.duration+" remaining"); + } + + if (resource.borrower.equals(request.coordinatorID)) { + // Resource is already borrowed. Add to existing duration. + resource.borrowDuration += request.duration; + } else { + resource.borrowDuration = request.duration; + } + resource.borrower = request.coordinatorID; + resource.isBorrowed = true; + resource.duration -= request.duration; + + return new Response(Response.Status.SUCCESS, request.coordinatorID + +" successfully borrowed "+request.resourceID); + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); + } + } + } + + public static class Response implements Serializable { + public Status status; + public String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + private Response() { + this(Status.SUCCESS, ""); + } + + public enum Status { + SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION + } + } +} + diff --git a/src/main/java/derms/replica/replica1/Resource.java b/src/main/java/derms/replica/replica1/Resource.java new file mode 100644 index 0000000..4abe41b --- /dev/null +++ b/src/main/java/derms/replica/replica1/Resource.java @@ -0,0 +1,39 @@ +package derms.replica.replica1; + +import java.io.Serializable; + +public class Resource implements Serializable { + public ResourceID id; + public ResourceName name; + public int duration; + public boolean isBorrowed; + public CoordinatorID borrower; + public int borrowDuration; + + public Resource(ResourceID id, ResourceName name, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) { + this.id = id; + this.name = name; + this.duration = duration; + this.isBorrowed = isBorrowed; + this.borrower = borrower; + this.borrowDuration = borrowDuration; + } + + public Resource(ResourceID id, ResourceName name, int duration) { + this(id, name, duration, false, new CoordinatorID(), -1); + } + + public Resource() { + this(new ResourceID(), ResourceName.AMBULANCE, 0); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return id+" "+duration; + } +} diff --git a/src/main/java/derms/replica/replica1/ResourceAvailability.java b/src/main/java/derms/replica/replica1/ResourceAvailability.java new file mode 100644 index 0000000..a6083ab --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResourceAvailability.java @@ -0,0 +1,128 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.logging.Logger; + +public class ResourceAvailability { + public static final int port = 5556; + + public static class Client implements Runnable { + private InetAddress serverAddr; + private ResourceName request; + private Collection<Resource> resources; + private Logger log; + + public Client(InetAddress serverAddr, ResourceName request, Collection<Resource> response) throws IOException { + this.serverAddr = serverAddr; + this.request = request; + this.resources = response; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Error binding socket: "+e.getMessage()); + return; + } + log.fine("Created socket"); + + DatagramPacket reqPkt; + try { + reqPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + log.severe("Error creating request: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(reqPkt); + } catch (IOException e) { + log.severe("Error sending request: "+e.getMessage()); + sock.close(); + return; + } + log.fine("Sent request"); + + Resource[] response; + try { + response = ResourceTransfer.receive(sock); + } catch (IOException e) { + log.severe(e.getMessage()); + return; + } finally { + sock.close(); + } + + for (Resource resource : response) { + resources.add(resource); + } + } + } + + public static class Server implements Runnable { + public static final int bufsize = 1024; + + private InetAddress localAddr; + private Resources resources; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString()+": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + + DatagramPacket request = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(request); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + + ResourceName requestedName = null; + try { + requestedName = ObjectPacket.deserialize(request, ResourceName.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + log.info("Got request: "+requestedName); + + Resource[] response = resources.getByName(requestedName); + try { + ResourceTransfer.send(response, request.getSocketAddress()); + } catch (IOException e) { + log.warning("Error transfering resources: "+e.getMessage()); + } + } + } finally { + sock.close(); + } + } + } +} diff --git a/src/main/java/derms/replica/replica1/ResourceID.java b/src/main/java/derms/replica/replica1/ResourceID.java new file mode 100644 index 0000000..008b766 --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResourceID.java @@ -0,0 +1,49 @@ +package derms.replica.replica1; + +import java.io.Serializable; + +public class ResourceID implements Serializable { + public String city; + public short num; + + public ResourceID (String city, short num) { + this.city = city; + this.num = num; + } + + public ResourceID() { + this("XXX", (short) 1111); + } + + public static ResourceID parse(String s) throws IllegalArgumentException { + if (s.length() != City.codeLen+ID.nDigits) { + throw new IllegalArgumentException("invalid resource ID: "+s); + } + try { + String cityCode = s.substring(0, City.codeLen); + short num = Short.parseShort(s.substring(City.codeLen)); + return new ResourceID(cityCode, num); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("invalid resource ID: "+e.getMessage()); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + ResourceID other = (ResourceID) obj; + return (this.city.equals(other.city)) && (this.num == other.num); + } + + @Override + public int hashCode() { + return city.hashCode() * num; + } + + @Override + public String toString() { + return city+num; + } +} diff --git a/src/main/java/derms/replica/replica1/ResourceName.java b/src/main/java/derms/replica/replica1/ResourceName.java new file mode 100644 index 0000000..de4bc16 --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResourceName.java @@ -0,0 +1,18 @@ +package derms.replica.replica1; + +import java.io.Serializable; + +public enum ResourceName implements Serializable { + AMBULANCE, + FIRETRUCK, + PERSONNEL; + + public static ResourceName parse(String s) { + switch (s) { + case "AMBULANCE": return ResourceName.AMBULANCE; + case "FIRETRUCK": return ResourceName.FIRETRUCK; + case "PERSONNEL": return ResourceName.PERSONNEL; + } + throw new IllegalArgumentException("invalid resource name: "+s); + } +} diff --git a/src/main/java/derms/replica/replica1/ResourceTransfer.java b/src/main/java/derms/replica/replica1/ResourceTransfer.java new file mode 100644 index 0000000..fbc695c --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResourceTransfer.java @@ -0,0 +1,50 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; + +public class ResourceTransfer { + public static final int bufsize = 1024; + + public static void send(Resource[] resources, SocketAddress remoteAddr) throws IOException { + DatagramSocket sock = new DatagramSocket(); + + for (Resource resource : resources) { + DatagramPacket pkt = ObjectPacket.create(resource, remoteAddr); + sock.send(pkt); + } + + DatagramPacket pkt = ObjectPacket.create(new EndOfTransmission(), remoteAddr); + sock.send(pkt); + sock.close(); + } + + public static Resource[] receive(DatagramSocket sock) throws IOException { + List<Resource> resources = new ArrayList<Resource>(); + byte[] buf = new byte[bufsize]; + DatagramPacket response = new DatagramPacket(buf, buf.length); + + for (;;) { + sock.receive(response); + + Object obj = ObjectPacket.deserialize(response, Object.class); + if (obj.getClass() == EndOfTransmission.class) { + break; + } + try { + resources.add((Resource) obj); + } catch (Exception e) { + throw new IOException("expected Resource; got "+obj.getClass().toString()); + } + } + Resource[] arr = new Resource[0]; + return resources.toArray(arr); + } + + private static class EndOfTransmission implements Serializable {} +}
\ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/Resources.java b/src/main/java/derms/replica/replica1/Resources.java new file mode 100644 index 0000000..0eb8bf2 --- /dev/null +++ b/src/main/java/derms/replica/replica1/Resources.java @@ -0,0 +1,74 @@ +package derms.replica.replica1; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +public class Resources { + private Map<ResourceName, Map<ResourceID, Resource>> resources; + + public Resources() { + this.resources = new ConcurrentHashMap<ResourceName, Map<ResourceID, Resource>>(); + } + + public List<Resource> borrowed(CoordinatorID borrower, ResourceName name) { + List<Resource> borrowed = new ArrayList<Resource>(); + Resource[] namedResources = getByName(name); + for (Resource r : namedResources) { + if (r.isBorrowed && r.borrower.equals(borrower)) { + borrowed.add(r); + } + } + return borrowed; + } + + public Resource getByID(ResourceID id) throws NoSuchElementException { + for (Map<ResourceID, Resource> rids : resources.values()) { + Resource resource = rids.get(id); + if (resource != null) { + return resource; + } + } + throw new NoSuchElementException("No such resource "+id); + } + + public Resource[] getByName(ResourceName name) { + Map<ResourceID, Resource> rids = resources.get(name); + if (rids == null) { + return new Resource[0]; + } + Resource[] r = new Resource[0]; + return rids.values().toArray(r); + } + + public void add(Resource r) { + Map<ResourceID, Resource> rids; + synchronized (resources) { + rids = resources.get(r.name); + if (rids == null) { + rids = new ConcurrentHashMap<ResourceID, Resource>(); + resources.put(r.name, rids); + } + } + synchronized (rids) { + Resource existing = rids.get(r.id); + if (existing != null) { + existing.duration += r.duration; + } else { + rids.put(r.id, r); + } + } + } + + public void removeByID(ResourceID id) throws NoSuchElementException { + for (Map<ResourceID, Resource> rids : resources.values()) { + if (rids.containsKey(id)) { + rids.remove(id); + return; + } + } + throw new NoSuchElementException("No such resource "+id); + } +} diff --git a/src/main/java/derms/replica/replica1/Responder.java b/src/main/java/derms/replica/replica1/Responder.java new file mode 100644 index 0000000..5fb7359 --- /dev/null +++ b/src/main/java/derms/replica/replica1/Responder.java @@ -0,0 +1,16 @@ +package derms.replica.replica1; + +import javax.jws.WebService; +import javax.jws.soap.SOAPBinding; + +@WebService +@SOAPBinding(style = SOAPBinding.Style.RPC) +public interface Responder { + void addResource(Resource r); + + void removeResource(ResourceID rid, int duration) + throws NoSuchResourceException; + + Resource[] listResourceAvailability(ResourceName rname) + throws ServerCommunicationError; +} diff --git a/src/main/java/derms/replica/replica1/ResponderClient.java b/src/main/java/derms/replica/replica1/ResponderClient.java new file mode 100644 index 0000000..b21b67e --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResponderClient.java @@ -0,0 +1,34 @@ +package derms.replica.replica1; + +import java.net.MalformedURLException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +public class ResponderClient extends Client<Responder, ResponderServer> { + public ResponderID id; + + public ResponderClient(ResponderID id) { + super(Responder.class, ResponderServer.class); + this.id = id; + } + + public ResponderClient(City city, short idNum) { + this(new ResponderID(city, idNum)); + } + + public void add(ResourceID rid, ResourceName name, int duration) throws UnknownHostException, MalformedURLException { + Responder server = connect(new City(rid.city)); + server.addResource(new Resource(rid, name, duration)); + } + + public void remove(ResourceID rid, int duration) throws UnknownHostException, MalformedURLException, NoSuchResourceException { + Responder server = connect(new City(rid.city)); + server.removeResource(rid, duration); + } + + public Resource[] listResources(ResourceName name) throws UnknownHostException, MalformedURLException, ServerCommunicationError { + Responder server = connect(id.city); + return server.listResourceAvailability(name); + } +} diff --git a/src/main/java/derms/replica/replica1/ResponderClientCLI.java b/src/main/java/derms/replica/replica1/ResponderClientCLI.java new file mode 100644 index 0000000..5bfbb11 --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResponderClientCLI.java @@ -0,0 +1,151 @@ +package derms.replica.replica1; + +import java.net.MalformedURLException; +import java.net.UnknownHostException; + +public class ResponderClientCLI extends CLI { + public static final String usage = "Usage: java ResponderClientCLI <city> <id number>"; + + private final ResponderClient client; + + private ResponderClientCLI(City city, short idNum) { + client = new ResponderClient(city, idNum); + System.out.println("ID: "+client.id); + + commands.put("add", new Add()); + cmdDescriptions.add(new Description( + "add <resource ID> <resource name> <duration>", + "Add a resource to the server")); + + commands.put("remove", new Remove()); + cmdDescriptions.add(new Description( + "remove <resource ID> <duration>", + "Decrease the duration of a resource. If duration is negative, the tresource is removed entirely.")); + + commands.put("list", new List()); + cmdDescriptions.add(new Description( + "list <resource name>", + "List available resources.")); + + argDescriptions.add(new Description( + "<resource ID>", + "3-letter city code followed by 4-digit number.")); + argDescriptions.add(new Description( + "<resource name>", + "E.g., AMBULANCE.")); + argDescriptions.add(new Description( + "<duration>", + "A number representing a time period.")); + } + + public static void main(String[] cmdlineArgs) { + Args args = null; + try { + args = new Args(cmdlineArgs); + } catch (IllegalArgumentException e) { + System.err.println(e); + System.err.println(usage); + System.exit(1); + } + + (new ResponderClientCLI(args.city, args.idNum)).run(); + } + + private class Add implements Command { + public void exec(String[] args) { + if (args.length < 3) { + System.out.println("invalid arguments for 'add'"); + } else { + add(args[0], args[1], args[2]); + } + } + } + + private void add(String resourceIDStr, String resourceNameStr, String durationStr) { + ResourceID rid = ResourceID.parse(resourceIDStr); + ResourceName name = ResourceName.parse(resourceNameStr); + int duration = Integer.parseInt(durationStr); + if (duration < 0) { + throw new NumberFormatException("duration less than 0"); + } + try { + client.add(rid, name, duration); + System.out.println("Successfully added resource to server."); + } catch (Exception e) { + System.out.println("Failed to add resource: "+e.getMessage()); + } + } + + private class Remove implements Command { + public void exec(String[] args) { + if (args.length < 2) { + System.out.println("invalid arguments for 'remove'"); + } else { + remove(args[0], args[1]); + } + } + } + + private void remove(String resourceIDStr, String durationStr) { + try { + ResourceID resourceID = ResourceID.parse(resourceIDStr); + int duration = Integer.parseInt(durationStr); + client.remove(resourceID, duration); + System.out.println("Successfully removed resource from server."); + } catch (NumberFormatException e) { + System.out.println("invalid duration: "+durationStr); + } catch (IllegalArgumentException e) { + System.out.println(e.getMessage()); + } catch (Exception e) { + System.out.println("Failed to remove resource: "+e.getMessage()); + } + } + + private class List implements Command { + public void exec(String[] args) { + if (args.length < 1) { + System.out.println("invalid arguments for 'list'"); + } else { + list(args[0]); + } + } + } + + private void list(String resourceNameStr) { + try { + ResourceName name = ResourceName.parse(resourceNameStr); + Resource[] resources = client.listResources(name); + System.out.println("Available resources:"); + for (Resource resource : resources) { + System.out.println(resource.toString()); + } + } catch (IllegalArgumentException e) { + System.out.println("invalid resource name: " + resourceNameStr); + } catch (UnknownHostException | MalformedURLException e) { + System.err.println(e.getMessage()); + } catch (ServerCommunicationError e) { + System.err.println("Failed to retrieve resources from server: "+e.getMessage()); + } + } + + private static class Args { + private final City city; + private final short idNum; + + private Args(String[] args) throws IllegalArgumentException { + if (args.length < 1) { + throw new IllegalArgumentException("Missing argument 'city'"); + } + city = new City(args[0]); + + if (args.length < 2) { + throw new IllegalArgumentException("Missing argument 'id number'"); + } + try { + idNum = Short.parseShort(args[1]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Bad value of 'id number'"); + } + } + } +} diff --git a/src/main/java/derms/replica/replica1/ResponderID.java b/src/main/java/derms/replica/replica1/ResponderID.java new file mode 100644 index 0000000..0ccda8a --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResponderID.java @@ -0,0 +1,16 @@ +package derms.replica.replica1; + +public class ResponderID { + public City city; + public short num; + + public ResponderID(City city, short num) { + this.city = city; + this.num = num; + } + + @Override + public String toString() { + return ""+city+"R"+num; + } +} diff --git a/src/main/java/derms/replica/replica1/ResponderServer.java b/src/main/java/derms/replica/replica1/ResponderServer.java new file mode 100644 index 0000000..d2406fb --- /dev/null +++ b/src/main/java/derms/replica/replica1/ResponderServer.java @@ -0,0 +1,99 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import java.util.NoSuchElementException; +import javax.jws.WebService; + +@WebService(endpointInterface = "xyz.samanthony.derms.Responder") +public class ResponderServer implements Responder { + public static final Duration timeout = Duration.ofSeconds(5); + + private City city; + private Resources resources; + private Servers servers; + private Logger log; + + public ResponderServer(City city, Resources resources, Servers servers) throws IOException { + this.city = city; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + public ResponderServer() throws IOException { + this(new City(), new Resources(), new Servers()); + } + + @Override + public void addResource(Resource r) { + resources.add(r); + log.info("Added resource "+r+" - success"); + } + + @Override + public void removeResource(ResourceID rid, int duration) throws NoSuchResourceException { + log.info("Remove duration "+duration+" from "+rid); + try { + Resource resource = resources.getByID(rid); + synchronized (resource) { + if (duration < 0 || duration >= resource.duration) { + resources.removeByID(rid); + log.info("Removed resource "+rid+" - success"); + } else { + resource.duration -= duration; + log.info("Removed duration from resource "+rid+". New duration: "+resource.duration+" - success"); + } + } + } catch (NoSuchElementException e) { + // Wanted to remove duration from resource. + if (duration >= 0) { + String msg = "Cannot remove duration from "+rid+": resource does not exist."; + log.warning(msg); + throw new NoSuchResourceException(msg); + } + + // Duration is negative---wanted to remove resource completely. + // Success because it is already removed. + log.info("Not removing "+rid+": resource does not exist."); + } + } + + @Override + public Resource[] listResourceAvailability(ResourceName rname) throws ServerCommunicationError { + log.info("Request for available "+rname); + Collection<Resource> availableResources = ConcurrentHashMap.newKeySet(); + ExecutorService pool = Executors.newFixedThreadPool(servers.size()); + try { + for (InetAddress serverAddr : servers.all()) { + pool.execute(new ResourceAvailability.Client(serverAddr, rname, availableResources)); + } + } catch (IOException e) { + String msg = "Failed to start ResourceAvailability Client: "+e.getMessage(); + log.severe(msg); + throw new ServerCommunicationError("ResourceAvailability: "+msg); + } + + log.fine("Started worker threads"); + pool.shutdown(); + boolean terminated; + try { + terminated = pool.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new ServerCommunicationError("listResourceAvailability(): listResourceAvailability() interrupted: "+e.getMessage()); + } + if (!terminated) { + throw new ServerCommunicationError("ResourceAvailability: request timed out: no response after "+timeout.toString()); + } + log.info("Response length "+availableResources.size()); + Resource[] arr = new Resource[0]; + return availableResources.toArray(arr); + } +} diff --git a/src/main/java/derms/replica/replica1/ReturnResource.java b/src/main/java/derms/replica/replica1/ReturnResource.java new file mode 100644 index 0000000..a157631 --- /dev/null +++ b/src/main/java/derms/replica/replica1/ReturnResource.java @@ -0,0 +1,199 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; +import java.util.NoSuchElementException; + +public class ReturnResource { + public static final int port = 5559; + public static final int bufsize = 4096; + + public static class Client { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + + public Client(CoordinatorID cid, ResourceID rid) { + this.coordinatorID = cid; + this.resourceID = rid; + } + + public Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(coordinatorID, resourceID); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Return Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Return Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufsize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Return Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Return Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + public static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString() + +": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.fine("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources)); + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + + private Request(CoordinatorID cid, ResourceID rid) { + this.coordinatorID = cid; + this.resourceID = rid; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + + private RequestHandler(Request request, SocketAddress client, Resources resources) { + this.request = request; + this.client = client; + this.resources = resources; + } + + @Override + public void run() { + Response response = returnResource(); + + DatagramSocket sock; + try { + sock= new DatagramSocket(); + } catch (Exception e) { + System.err.println("Request Resource Server: failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + System.err.println("Request Resource Server: failed to create response packet: "+e.getMessage()); + sock.close(); + return; + } + try { + sock.send(pkt); + } catch (Exception e) { + System.err.println("Request Resource Server: failed to send response: "+e.getMessage()); + } + sock.close(); + } + + private Response returnResource() { + try { + Resource resource = resources.getByID(request.resourceID); + synchronized (resource) { + if (!resource.isBorrowed || !resource.borrower.equals(request.coordinatorID)) { + return new Response(Response.Status.NOT_BORROWED, + request.resourceID+" is not borrowed by "+request.coordinatorID); + } + resource.isBorrowed = false; + resource.borrower = new CoordinatorID(); + resource.borrowDuration = -1; + return new Response(Response.Status.SUCCESS, request.coordinatorID+" successfully returned "+resource.id); + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); + } + } + } + + public static class Response implements Serializable { + public Status status; + public String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + public enum Status { + SUCCESS, NO_SUCH_RESOURCE, NOT_BORROWED + } + } +} diff --git a/src/main/java/derms/replica/replica1/ServerCommunicationError.java b/src/main/java/derms/replica/replica1/ServerCommunicationError.java new file mode 100644 index 0000000..0f0586b --- /dev/null +++ b/src/main/java/derms/replica/replica1/ServerCommunicationError.java @@ -0,0 +1,7 @@ +package derms.replica.replica1; + +public class ServerCommunicationError extends Exception { + public ServerCommunicationError(String message) { + super(message); + } +} diff --git a/src/main/java/derms/replica/replica1/Servers.java b/src/main/java/derms/replica/replica1/Servers.java new file mode 100644 index 0000000..dc084d7 --- /dev/null +++ b/src/main/java/derms/replica/replica1/Servers.java @@ -0,0 +1,34 @@ +package derms.replica.replica1; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; + +public class Servers { + private Map<City, InetAddress> servers = new ConcurrentHashMap<City, InetAddress>(); + + /** Returns the address of the server located in the specified city, or null if there is no server in the city. */ + public InetAddress get(City city) { + return servers.get(city); + } + + /** + * Associates the specified server address with the specified city. + * If there was already a server associated with the city, the old value is replaced. + * @param city the city where the server is located + * @param addr the address of the server + * @return the previous server address, or null if there was no server associated with this city. + */ + public InetAddress put(City city, InetAddress addr) { + return servers.put(city, addr); + } + + public Collection<InetAddress> all() { + return servers.values(); + } + + public int size() { + return servers.size(); + } +} diff --git a/src/main/java/derms/replica/replica1/StationServer.java b/src/main/java/derms/replica/replica1/StationServer.java new file mode 100644 index 0000000..3187ecb --- /dev/null +++ b/src/main/java/derms/replica/replica1/StationServer.java @@ -0,0 +1,146 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; +import javax.xml.ws.Endpoint; + +public class StationServer implements Runnable { + public static final String usage = "Usage: java StationServer <city> <local address>"; + public static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); + public static final int publishPort = 8080; + + public City city; + public InetAddress localAddr; + public Resources resources; + public Servers servers; + private Logger log; + private ResponderServer responderServer; + private CoordinatorServer coordinatorServer; + + public StationServer(City city, InetAddress localAddr) throws IOException { + this.city = city; + this.localAddr = localAddr; + this.resources = new Resources(); + this.servers = new Servers(); + this.log = DermsLogger.getLogger(getClass()); + + try { + this.responderServer = new ResponderServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create ResponderServer: "+e.getMessage()); + } + log.info("Created ResponderServer"); + + try { + this.coordinatorServer = new CoordinatorServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create CoordinatorServer: "+e.getMessage()); + } + log.info("Created CoordinatorServer"); + } + + public static void main(String cmdlineArgs[]) { + Args args = null; + try { + args = new Args(cmdlineArgs); + } catch (IllegalArgumentException e) { + System.err.println(e.getMessage()); + System.out.println(usage); + System.exit(1); + } + + try { + (new StationServer(args.city, args.localAddr)).run(); + } catch (Exception e) { + System.err.println(e.getMessage()); + System.exit(1); + } + } + + @Override + public void run() { + log.info("Running"); + log.config("Local address is "+localAddr.toString()); + + ExecutorService pool = Executors.newCachedThreadPool(); + + try { + pool.execute(new ResourceAvailability.Server(localAddr, resources)); + } catch (IOException e) { + String msg = "Failed to start ResourceAvailability Server: "+e.getMessage(); + log.severe(msg); + return; + } + try { + pool.execute(new RequestResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start RequestResource Server: "+e.getMessage()); + return; + } + try { + pool.execute(new FindResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start FindResource Server: "+e.getMessage()); + return; + } + try { + pool.execute(new ReturnResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start ReturnResource Server: "+e.getMessage()); + return; + } + try { + pool.execute(new SwapResource.Server(localAddr, resources, servers)); + } catch (IOException e) { + log.severe("Failed to start SwapResource Server: "+e.getMessage()); + return; + } + + String url = "http://"+localAddr.getHostAddress()+":"+publishPort+"/"+Responder.class.getSimpleName(); + log.info("Publishing Responder to '"+url+"'..."); + Endpoint.publish(url, responderServer); + + url = "http://"+localAddr.getHostAddress()+":"+publishPort+"/"+Coordinator.class.getSimpleName(); + log.info("Publishing Coordinator to '"+url+"'..."); + Endpoint.publish(url, coordinatorServer); + + try { + pool.execute(new Announcer(announceGroup, localAddr, city)); + } catch (IOException e) { + log.severe("Failed to start Announcer: "+e.getMessage()); + return; + } + try { + pool.execute(new AnnounceListener(announceGroup, localAddr, servers)); + } catch (IOException e) { + log.severe("Failed to start AnnounceListener: "+e.getMessage()); + return; + } + } + + private static class Args { + private City city; + private InetAddress localAddr; + + private Args(String[] args) throws IllegalArgumentException { + if (args.length < 1) { + throw new IllegalArgumentException("Missing argument 'city'"); + } + city = new City(args[0]); + + if (args.length < 2) { + throw new IllegalArgumentException("Missing argument 'local host'"); + } + try { + localAddr = InetAddress.getByName(args[1]); + } catch (UnknownHostException | SecurityException e) { + throw new IllegalArgumentException("Bad value of 'local host': "+e.getMessage()); + } + } + } +}
\ No newline at end of file diff --git a/src/main/java/derms/replica/replica1/SwapResource.java b/src/main/java/derms/replica/replica1/SwapResource.java new file mode 100644 index 0000000..34d5b66 --- /dev/null +++ b/src/main/java/derms/replica/replica1/SwapResource.java @@ -0,0 +1,267 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; +import java.util.NoSuchElementException; + +public class SwapResource { + public static final int port = 5560; + public static final int bufSize = 4096; + + public static class Client { + private CoordinatorID cid; + private ResourceID oldRID; + private ResourceID newRID; + + public Client(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { + this.cid = cid; + this.oldRID = oldRID; + this.newRID = newRID; + } + + public Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(cid, oldRID, newRID); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Swap Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Swap Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufSize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Swap Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Swap Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + public static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private Servers servers; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources, Servers servers) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + this.servers = servers; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr+": "+e.getMessage()); + return; + } + log.info("Listening on "+localAddr+":"+port); + + DatagramPacket requestPkt = new DatagramPacket(new byte[bufSize], bufSize); + + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.fine("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + SocketAddress client = requestPkt.getSocketAddress(); + try { + RequestHandler handler = new RequestHandler(request, client, resources, servers); + pool.execute(handler); + } catch (IOException e) { + log.warning("Failed to create request handler: "+e.getMessage()); + continue; + } + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID cid; + private ResourceID oldRID; + private ResourceID newRID; + + private Request(CoordinatorID cid, ResourceID oldRID, ResourceID newRID) { + this.cid = cid; + this.oldRID = oldRID; + this.newRID = newRID; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Servers servers; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Servers servers) throws IOException { + this.request = request; + this.client = client; + this.resources = resources; + this.servers = servers; + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + Response response = swapResources(); + + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + log.severe("failed to create response: "+e.getMessage()); + sock.close(); + return; + } + + try { + sock.send(pkt); + } catch (Exception e) { + log.severe("failed to send response: "+e.getMessage()); + } finally { + sock.close(); + } + } + + private Response swapResources() { + try { + Resource resource = resources.getByID(request.oldRID); + synchronized (resource) { + if (!resource.borrower.equals(request.cid)) { + return new Response(Response.Status.NOT_BORROWED, "resource "+request.oldRID+" not borrowed by "+request.cid); + } + try { + acquireNewResource(resource.borrowDuration); + returnOldResource(resource); + return new Response(Response.Status.SUCCESS, request.cid+" success fully swapped "+request.oldRID+" for "+request.newRID); + } catch (UnknownHostException e) { + return new Response(Response.Status.UNKNOWN_HOST, e.getMessage()); + } catch (IOException e) { + return new Response(Response.Status.FAILURE, e.getMessage()); + } catch (CannotBorrow e) { + return new Response(Response.Status.CANNOT_BORROW, e.getMessage()); + } + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.oldRID); + } + } + + private void acquireNewResource(int borrowDuration) throws UnknownHostException, IOException, CannotBorrow { + RequestResource.Client requestClient = new RequestResource.Client(request.cid, request.newRID, borrowDuration); + City city = new City(request.newRID.city); + InetAddress requestResourceServer = servers.get(city); + if (requestResourceServer == null) { + throw new UnknownHostException(city.toString()); + } + RequestResource.Response response = requestClient.sendRequest(requestResourceServer); + // TODO: make exception handling more granular---pass through status from Request. + if (response.status != RequestResource.Response.Status.SUCCESS) { + throw new CannotBorrow(request.cid, request.newRID, response.message); + } + } + + private void returnOldResource(Resource r) { + r.isBorrowed = false; + r.borrower = new CoordinatorID(); + r.borrowDuration = -1; + } + } + + public static class Response implements Serializable { + public Status status; + public String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + public enum Status { + SUCCESS, + FAILURE, + NO_SUCH_RESOURCE, + NOT_BORROWED, + CANNOT_BORROW, + UNKNOWN_HOST + } + } + + private static class CannotBorrow extends Exception { + CoordinatorID attemptedBorrower; + ResourceID rid; + String message; + + private CannotBorrow(CoordinatorID attemptedBorrower, ResourceID rid, String message) { + this.attemptedBorrower = attemptedBorrower; + this.rid = rid; + this.message = message; + } + + @Override + public String getMessage() { + return attemptedBorrower+" failed to borrow "+rid+": "+message; + } + } +}
\ No newline at end of file |