diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-28 12:40:14 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-28 12:40:14 -0500 |
| commit | f3d0bfd50bdde4f96dc8ee603a7093d68201f114 (patch) | |
| tree | 7f1c503cf25cb5aad0fb3d5738aac3604c8cbf99 /src/main | |
| parent | 3feb480540b33f4e40fd4d41d7ea88b56f91f133 (diff) | |
| download | soen423-f3d0bfd50bdde4f96dc8ee603a7093d68201f114.zip | |
Replica2: implement Replica interface
Diffstat (limited to 'src/main')
12 files changed, 286 insertions, 180 deletions
diff --git a/src/main/java/derms/ReplicaManager.java b/src/main/java/derms/ReplicaManager.java index 9725bcb..2bab635 100644 --- a/src/main/java/derms/ReplicaManager.java +++ b/src/main/java/derms/ReplicaManager.java @@ -45,7 +45,7 @@ public class ReplicaManager { replica = new Replica1(frontEndAddress); break; case 2: - replica = new Replica2( frontEndAddress); + replica = new Replica2(); break; case 3: replica = new Replica3( frontEndAddress); diff --git a/src/main/java/derms/replica/replica2/CoordinatorID.java b/src/main/java/derms/replica/replica2/CoordinatorID.java index 00aa99b..701ded7 100644 --- a/src/main/java/derms/replica/replica2/CoordinatorID.java +++ b/src/main/java/derms/replica/replica2/CoordinatorID.java @@ -11,12 +11,17 @@ public class CoordinatorID implements Serializable { this.num = num; } - public CoordinatorID(String city, int num) { - this(city, (short) num); - } + public static CoordinatorID parse(String str) throws IllegalArgumentException { + if (str.length() != City.codeLen+ID.nDigits) + throw new IllegalArgumentException("illegal coordinator ID: " + str); - public CoordinatorID() { - this("XXX", 0); + try { + String city = str.substring(0, City.codeLen); + short num = Short.parseShort(str.substring(City.codeLen)); + return new CoordinatorID(city, num); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("illegal coordinator ID '" + str + "': " + e.getMessage()); + } } @Override diff --git a/src/main/java/derms/replica/replica2/CoordinatorServer.java b/src/main/java/derms/replica/replica2/CoordinatorServer.java index a4f7547..ced9091 100644 --- a/src/main/java/derms/replica/replica2/CoordinatorServer.java +++ b/src/main/java/derms/replica/replica2/CoordinatorServer.java @@ -67,7 +67,7 @@ public class CoordinatorServer { } } - public Resource[] findResource(CoordinatorID cid, ResourceName rname) throws ServerCommunicationError { + public Resource[] findResource(CoordinatorID cid, ResourceType rname) throws ServerCommunicationError { log.info("Find Resource "+rname+" from "+cid); FindResource.Request request = new FindResource.Request(cid, rname); Collection<Resource> response = ConcurrentHashMap.newKeySet(); diff --git a/src/main/java/derms/replica/replica2/FindResource.java b/src/main/java/derms/replica/replica2/FindResource.java index be2eed9..ac5f29d 100644 --- a/src/main/java/derms/replica/replica2/FindResource.java +++ b/src/main/java/derms/replica/replica2/FindResource.java @@ -124,9 +124,9 @@ public class FindResource { public static class Request implements Serializable { private CoordinatorID cid; - private ResourceName rname; + private ResourceType rname; - public Request(CoordinatorID cid, ResourceName rname) { + public Request(CoordinatorID cid, ResourceType rname) { this.cid = cid; this.rname = rname; } diff --git a/src/main/java/derms/replica/replica2/Replica2.java b/src/main/java/derms/replica/replica2/Replica2.java new file mode 100644 index 0000000..f0c6bbf --- /dev/null +++ b/src/main/java/derms/replica/replica2/Replica2.java @@ -0,0 +1,237 @@ +package derms.replica.replica2; + +import derms.Replica; +import derms.Request; +import derms.Response; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Logger; + +public class Replica2 implements Replica { + public static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); + + public final City city; + public final InetAddress localAddr; + public final Resources resources; + public final Servers servers; + private final Logger log; + private final ResponderServer responderServer; + private final CoordinatorServer coordinatorServer; + private boolean alive; + + public Replica2(City city) throws IOException { + this.city = city; + this.localAddr = InetAddress.getLocalHost(); + this.resources = new Resources(); + this.servers = new Servers(); + this.log = DermsLogger.getLogger(getClass()); + + try { + this.responderServer = new ResponderServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create ResponderServer: "+e.getMessage()); + } + log.info("Created ResponderServer"); + + try { + this.coordinatorServer = new CoordinatorServer(city, resources, servers); + } catch (IOException e) { + throw new IOException("Failed to create CoordinatorServer: "+e.getMessage()); + } + log.info("Created CoordinatorServer"); + + log.info("Running"); + log.config("Local address is "+localAddr.toString()); + + ExecutorService pool = Executors.newCachedThreadPool(); + + try { + pool.execute(new ResourceAvailability.Server(localAddr, resources)); + } catch (IOException e) { + String msg = "Failed to start ResourceAvailability Server: "+e.getMessage(); + log.severe(msg); + throw e; + } + try { + pool.execute(new RequestResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start RequestResource Server: "+e.getMessage()); + throw e; + } + try { + pool.execute(new FindResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start FindResource Server: "+e.getMessage()); + throw e; + } + try { + pool.execute(new ReturnResource.Server(localAddr, resources)); + } catch (IOException e) { + log.severe("Failed to start ReturnResource Server: "+e.getMessage()); + throw e; + } + try { + pool.execute(new SwapResource.Server(localAddr, resources, servers)); + } catch (IOException e) { + log.severe("Failed to start SwapResource Server: "+e.getMessage()); + throw e; + } + + try { + pool.execute(new Announcer(announceGroup, localAddr, city)); + } catch (IOException e) { + log.severe("Failed to start Announcer: "+e.getMessage()); + throw e; + } + try { + pool.execute(new AnnounceListener(announceGroup, localAddr, servers)); + } catch (IOException e) { + log.severe("Failed to start AnnounceListener: "+e.getMessage()); + throw e; + } + + this.alive = true; + } + + @Override + public boolean isAlive() { return alive; } + + @Override + public void startProcess() { + // TODO + log.info(getClass().getSimpleName() + " started."); + } + + @Override + public void processRequest(Request request) { + log.info(request.toString()); + + String status = ""; + try { + switch (request.getFunction()) { + case "addResource": + status = addResource(request); + break; + case "removeResource": + status = removeResource(request); + break; + case "listResourceAvailability": + status = listResourceAvailability(request); + break; + case "requestResource": + status = requestResource(request); + break; + case "findResource": + status = findResource(request); + break; + case "returnResource": + status = returnResource(request); + break; + case "swapResource": + status = swapResource(request); + break; + default: + status = "Failure: unknown function '" + request.getFunction() + "'"; + } + } catch (Exception e) { + log.warning(e.getMessage()); + status = "Failure: " + request.getFunction() + ": " + e.getMessage(); + } + + Response response = new Response(request.getSequenceNumber(), status); + log.info("Processed request " + request + "; response: " + response); + replicaManager.sendResponseToFE(response); + } + + @Override + public void restart() { + // TODO + shutdown(); + startProcess(); + } + + @Override + public int getId() { return 2; } + + private void shutdown() { + // TODO + } + + private String addResource(Request request) { + Resource resource = new Resource( + ResourceID.parse(request.getResourceID()), + ResourceType.parse(request.getResourceType()), + request.getDuration()); + responderServer.addResource(resource); + return "Successfully added resource " + resource; + } + + private String removeResource(Request request) { + try { + responderServer.removeResource( + ResourceID.parse(request.getResourceID()), + request.getDuration()); + return "Successfully removed resource " + request.getResourceID(); + } catch (NoSuchResourceException e) { + String msg = "Error removing " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } + + private String listResourceAvailability(Request request) { + // TODO + throw new NotImplementedException(); + } + + private String requestResource(Request request) { + try { + coordinatorServer.requestResource( + CoordinatorID.parse(request.getClientID()), + ResourceID.parse(request.getResourceID()), + request.getDuration()); + return "Successfully borrowed " + request.getResourceID(); + } catch (NoSuchResourceException | AlreadyBorrowedException | InvalidDurationException |ServerCommunicationError e) { + String msg = "Failed to borrow resource " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } + + private String findResource(Request request) { + // TODO + throw new NotImplementedException(); + } + + private String returnResource(Request request) { + try { + coordinatorServer.returnResource( + CoordinatorID.parse(request.getClientID()), + ResourceID.parse(request.getResourceID())); + return "Successfully returned resource " + request.getResourceID(); + } catch (NoSuchResourceException | NotBorrowedException | ServerCommunicationError e) { + String msg = "Failed to borrow resource " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } + + private String swapResource(Request request) { + try { + coordinatorServer.swapResource( + CoordinatorID.parse(request.getClientID()), + ResourceID.parse(request.getOldResourceID()), + ResourceID.parse(request.getResourceID())); + return "Successfully swapped " + request.getOldResourceID() + " for " + request.getResourceID(); + } catch (NoSuchResourceException | ServerCommunicationError e) { + String msg = "Failed to swap " + request.getOldResourceID() + " for " + request.getResourceID() + ": " + e.getMessage(); + log.warning(msg); + return msg; + } + } +} diff --git a/src/main/java/derms/replica/replica2/Resource.java b/src/main/java/derms/replica/replica2/Resource.java index 18aa847..e4b54a2 100644 --- a/src/main/java/derms/replica/replica2/Resource.java +++ b/src/main/java/derms/replica/replica2/Resource.java @@ -4,27 +4,27 @@ import java.io.Serializable; public class Resource implements Serializable { public ResourceID id; - public ResourceName name; + public ResourceType type; public int duration; public boolean isBorrowed; public CoordinatorID borrower; public int borrowDuration; - public Resource(ResourceID id, ResourceName name, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) { + public Resource(ResourceID id, ResourceType type, int duration, boolean isBorrowed, CoordinatorID borrower, int borrowDuration) { this.id = id; - this.name = name; + this.type = type; this.duration = duration; this.isBorrowed = isBorrowed; this.borrower = borrower; this.borrowDuration = borrowDuration; } - public Resource(ResourceID id, ResourceName name, int duration) { - this(id, name, duration, false, new CoordinatorID(), -1); + public Resource(ResourceID id, ResourceType type, int duration) { + this(id, type, duration, false, new CoordinatorID(), -1); } public Resource() { - this(new ResourceID(), ResourceName.AMBULANCE, 0); + this(new ResourceID(), ResourceType.AMBULANCE, 0); } @Override diff --git a/src/main/java/derms/replica/replica2/ResourceAvailability.java b/src/main/java/derms/replica/replica2/ResourceAvailability.java index 8eacbf2..c3edbd8 100644 --- a/src/main/java/derms/replica/replica2/ResourceAvailability.java +++ b/src/main/java/derms/replica/replica2/ResourceAvailability.java @@ -13,11 +13,11 @@ public class ResourceAvailability { public static class Client implements Runnable { private InetAddress serverAddr; - private ResourceName request; + private ResourceType request; private Collection<Resource> resources; private Logger log; - public Client(InetAddress serverAddr, ResourceName request, Collection<Resource> response) throws IOException { + public Client(InetAddress serverAddr, ResourceType request, Collection<Resource> response) throws IOException { this.serverAddr = serverAddr; this.request = request; this.resources = response; @@ -104,9 +104,9 @@ public class ResourceAvailability { continue; } - ResourceName requestedName = null; + ResourceType requestedName = null; try { - requestedName = ObjectPacket.deserialize(request, ResourceName.class); + requestedName = ObjectPacket.deserialize(request, ResourceType.class); } catch (IOException e) { log.warning("Failed to deserialize request: "+e.getMessage()); continue; diff --git a/src/main/java/derms/replica/replica2/ResourceName.java b/src/main/java/derms/replica/replica2/ResourceName.java deleted file mode 100644 index f4c315e..0000000 --- a/src/main/java/derms/replica/replica2/ResourceName.java +++ /dev/null @@ -1,18 +0,0 @@ -package derms.replica.replica2; - -import java.io.Serializable; - -public enum ResourceName implements Serializable { - AMBULANCE, - FIRETRUCK, - PERSONNEL; - - public static ResourceName parse(String s) { - switch (s) { - case "AMBULANCE": return ResourceName.AMBULANCE; - case "FIRETRUCK": return ResourceName.FIRETRUCK; - case "PERSONNEL": return ResourceName.PERSONNEL; - } - throw new IllegalArgumentException("invalid resource name: "+s); - } -} diff --git a/src/main/java/derms/replica/replica2/ResourceType.java b/src/main/java/derms/replica/replica2/ResourceType.java new file mode 100644 index 0000000..19b6c8a --- /dev/null +++ b/src/main/java/derms/replica/replica2/ResourceType.java @@ -0,0 +1,18 @@ +package derms.replica.replica2; + +import java.io.Serializable; + +public enum ResourceType implements Serializable { + AMBULANCE, + FIRETRUCK, + PERSONNEL; + + public static ResourceType parse(String s) throws IllegalArgumentException { + switch (s) { + case "AMBULANCE": return ResourceType.AMBULANCE; + case "FIRETRUCK": return ResourceType.FIRETRUCK; + case "PERSONNEL": return ResourceType.PERSONNEL; + } + throw new IllegalArgumentException("invalid resource name: "+s); + } +} diff --git a/src/main/java/derms/replica/replica2/Resources.java b/src/main/java/derms/replica/replica2/Resources.java index 4341def..71d78ca 100644 --- a/src/main/java/derms/replica/replica2/Resources.java +++ b/src/main/java/derms/replica/replica2/Resources.java @@ -7,13 +7,13 @@ import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; public class Resources { - private Map<ResourceName, Map<ResourceID, Resource>> resources; + private Map<ResourceType, Map<ResourceID, Resource>> resources; public Resources() { - this.resources = new ConcurrentHashMap<ResourceName, Map<ResourceID, Resource>>(); + this.resources = new ConcurrentHashMap<ResourceType, Map<ResourceID, Resource>>(); } - public List<Resource> borrowed(CoordinatorID borrower, ResourceName name) { + public List<Resource> borrowed(CoordinatorID borrower, ResourceType name) { List<Resource> borrowed = new ArrayList<Resource>(); Resource[] namedResources = getByName(name); for (Resource r : namedResources) { @@ -34,7 +34,7 @@ public class Resources { throw new NoSuchElementException("No such resource "+id); } - public Resource[] getByName(ResourceName name) { + public Resource[] getByName(ResourceType name) { Map<ResourceID, Resource> rids = resources.get(name); if (rids == null) { return new Resource[0]; @@ -46,10 +46,10 @@ public class Resources { public void add(Resource r) { Map<ResourceID, Resource> rids; synchronized (resources) { - rids = resources.get(r.name); + rids = resources.get(r.type); if (rids == null) { rids = new ConcurrentHashMap<ResourceID, Resource>(); - resources.put(r.name, rids); + resources.put(r.type, rids); } } synchronized (rids) { diff --git a/src/main/java/derms/replica/replica2/ResponderServer.java b/src/main/java/derms/replica/replica2/ResponderServer.java index 0eb4544..d294924 100644 --- a/src/main/java/derms/replica/replica2/ResponderServer.java +++ b/src/main/java/derms/replica/replica2/ResponderServer.java @@ -62,7 +62,7 @@ public class ResponderServer { } } - public Resource[] listResourceAvailability(ResourceName rname) throws ServerCommunicationError { + public Resource[] listResourceAvailability(ResourceType rname) throws ServerCommunicationError { log.info("Request for available "+rname); Collection<Resource> availableResources = ConcurrentHashMap.newKeySet(); ExecutorService pool = Executors.newFixedThreadPool(servers.size()); diff --git a/src/main/java/derms/replica/replica2/StationServer.java b/src/main/java/derms/replica/replica2/StationServer.java deleted file mode 100644 index 12525f7..0000000 --- a/src/main/java/derms/replica/replica2/StationServer.java +++ /dev/null @@ -1,136 +0,0 @@ -package derms.replica.replica2; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class StationServer implements Runnable { - public static final String usage = "Usage: java StationServer <city> <local address>"; - public static final InetSocketAddress announceGroup = new InetSocketAddress("225.5.5.5", 5555); - - public City city; - public InetAddress localAddr; - public Resources resources; - public Servers servers; - private Logger log; - private ResponderServer responderServer; - private CoordinatorServer coordinatorServer; - - public StationServer(City city, InetAddress localAddr) throws IOException { - this.city = city; - this.localAddr = localAddr; - this.resources = new Resources(); - this.servers = new Servers(); - this.log = DermsLogger.getLogger(getClass()); - - try { - this.responderServer = new ResponderServer(city, resources, servers); - } catch (IOException e) { - throw new IOException("Failed to create ResponderServer: "+e.getMessage()); - } - log.info("Created ResponderServer"); - - try { - this.coordinatorServer = new CoordinatorServer(city, resources, servers); - } catch (IOException e) { - throw new IOException("Failed to create CoordinatorServer: "+e.getMessage()); - } - log.info("Created CoordinatorServer"); - } - - public static void main(String cmdlineArgs[]) { - Args args = null; - try { - args = new Args(cmdlineArgs); - } catch (IllegalArgumentException e) { - System.err.println(e.getMessage()); - System.out.println(usage); - System.exit(1); - } - - try { - (new StationServer(args.city, args.localAddr)).run(); - } catch (Exception e) { - System.err.println(e.getMessage()); - System.exit(1); - } - } - - @Override - public void run() { - log.info("Running"); - log.config("Local address is "+localAddr.toString()); - - ExecutorService pool = Executors.newCachedThreadPool(); - - try { - pool.execute(new ResourceAvailability.Server(localAddr, resources)); - } catch (IOException e) { - String msg = "Failed to start ResourceAvailability Server: "+e.getMessage(); - log.severe(msg); - return; - } - try { - pool.execute(new RequestResource.Server(localAddr, resources)); - } catch (IOException e) { - log.severe("Failed to start RequestResource Server: "+e.getMessage()); - return; - } - try { - pool.execute(new FindResource.Server(localAddr, resources)); - } catch (IOException e) { - log.severe("Failed to start FindResource Server: "+e.getMessage()); - return; - } - try { - pool.execute(new ReturnResource.Server(localAddr, resources)); - } catch (IOException e) { - log.severe("Failed to start ReturnResource Server: "+e.getMessage()); - return; - } - try { - pool.execute(new SwapResource.Server(localAddr, resources, servers)); - } catch (IOException e) { - log.severe("Failed to start SwapResource Server: "+e.getMessage()); - return; - } - - try { - pool.execute(new Announcer(announceGroup, localAddr, city)); - } catch (IOException e) { - log.severe("Failed to start Announcer: "+e.getMessage()); - return; - } - try { - pool.execute(new AnnounceListener(announceGroup, localAddr, servers)); - } catch (IOException e) { - log.severe("Failed to start AnnounceListener: "+e.getMessage()); - return; - } - } - - private static class Args { - private City city; - private InetAddress localAddr; - - private Args(String[] args) throws IllegalArgumentException { - if (args.length < 1) { - throw new IllegalArgumentException("Missing argument 'city'"); - } - city = new City(args[0]); - - if (args.length < 2) { - throw new IllegalArgumentException("Missing argument 'local host'"); - } - try { - localAddr = InetAddress.getByName(args[1]); - } catch (UnknownHostException | SecurityException e) { - throw new IllegalArgumentException("Bad value of 'local host': "+e.getMessage()); - } - } - } -}
\ No newline at end of file |