diff options
| author | Sam Anthony <sam@samanthony.xyz> | 2024-11-23 13:51:12 -0500 |
|---|---|---|
| committer | Sam Anthony <sam@samanthony.xyz> | 2024-11-23 13:51:12 -0500 |
| commit | 3c62b863509131e78c18ed13c6b83e4fc508848f (patch) | |
| tree | cf246e91da283edf704af936b9cef7eb8e09a6f8 /src/main/java/derms/replica/replica1/RequestResource.java | |
| parent | e3b72053e8b04f2df013da0d7d49fe33927461a9 (diff) | |
| download | soen423-3c62b863509131e78c18ed13c6b83e4fc508848f.zip | |
import replica code from assignment
Diffstat (limited to 'src/main/java/derms/replica/replica1/RequestResource.java')
| -rw-r--r-- | src/main/java/derms/replica/replica1/RequestResource.java | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/src/main/java/derms/replica/replica1/RequestResource.java b/src/main/java/derms/replica/replica1/RequestResource.java new file mode 100644 index 0000000..f7fa4ab --- /dev/null +++ b/src/main/java/derms/replica/replica1/RequestResource.java @@ -0,0 +1,227 @@ +package derms.replica.replica1; + +import java.io.IOException; +import java.io.Serializable; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; +import java.util.NoSuchElementException; + +public class RequestResource { + public static final int port = 5557; + public static final int bufsize = 4096; + + public static class Client { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + public Client(CoordinatorID coordinatorID, ResourceID resourceID, int duration) { + this.coordinatorID = coordinatorID; + this.resourceID = resourceID; + this.duration = duration; + } + + public Response sendRequest(InetAddress serverAddr) throws IOException { + Request request = new Request(coordinatorID, resourceID, duration); + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + throw new IOException("Request Resource Client: failed to open socket: "+e.getMessage()); + } + + DatagramPacket requestPkt; + try { + requestPkt = ObjectPacket.create(request, new InetSocketAddress(serverAddr, port)); + } catch (IOException e) { + sock.close(); + throw new IOException("Request Resource Client: failed to create request: "+e.getMessage()); + } + + sock.send(requestPkt); + + byte[] buf = new byte[bufsize]; + DatagramPacket responsePkt = new DatagramPacket(buf, buf.length); + try { + sock.receive(responsePkt); + } catch (Exception e) { + sock.close(); + throw new IOException("Request Resource Client: error receiving from server: "+e.getMessage()); + } + + try { + return ObjectPacket.deserialize(responsePkt, Response.class); + } catch (IOException e) { + throw new IOException("Request Resource Client: failed to deserialize response: "+e.getMessage()); + } finally { + sock.close(); + } + } + } + + public static class Server implements Runnable { + private InetAddress localAddr; + private Resources resources; + private ExecutorService pool; + private Logger log; + + public Server(InetAddress localAddr, Resources resources) throws IOException { + this.localAddr = localAddr; + this.resources = resources; + pool = Executors.newWorkStealingPool(); + this.log = DermsLogger.getLogger(this.getClass()); + } + + @Override + public void run() { + DatagramSocket sock = null; + try { + sock = new DatagramSocket(port, localAddr); + } catch (Exception e) { + log.severe("Failed to bind socket to "+localAddr.toString() + +": "+e.getMessage()); + return; + } + + log.info("Listening on "+localAddr.toString()+":"+port); + DatagramPacket requestPkt = new DatagramPacket(new byte[bufsize], bufsize); + try { + for (;;) { + try { + sock.receive(requestPkt); + } catch (Exception e) { + log.warning("Error receiving from socket: "+e.getMessage()); + continue; + } + log.info("Got request"); + + Request request = null; + try { + request = ObjectPacket.deserialize(requestPkt, Request.class); + } catch (IOException e) { + log.warning("Failed to deserialize request: "+e.getMessage()); + continue; + } + + pool.execute(new RequestHandler(request, requestPkt.getSocketAddress(), resources, log)); + } + } finally { + sock.close(); + } + } + } + + private static class Request implements Serializable { + private CoordinatorID coordinatorID; + private ResourceID resourceID; + private int duration; + + private Request(CoordinatorID cid, ResourceID rid, int duration) { + this.coordinatorID = cid; + this.resourceID = rid; + this.duration = duration; + } + } + + private static class RequestHandler implements Runnable { + private Request request; + private SocketAddress client; + private Resources resources; + private Logger log; + + private RequestHandler(Request request, SocketAddress client, Resources resources, Logger log) { + this.request = request; + this.client = client; + this.resources = resources; + this.log = log; + } + + @Override + public void run() { + Response response = borrow(); + + DatagramSocket sock; + try { + sock = new DatagramSocket(); + } catch (Exception e) { + log.severe("Failed to open socket: "+e.getMessage()); + return; + } + + DatagramPacket pkt; + try { + pkt = ObjectPacket.create(response, client); + } catch (IOException e) { + log.severe("Failed to create response packet: "+e.getMessage()); + sock.close(); + return; + } + try { + sock.send(pkt); + } catch (Exception e) { + log.severe("Failed to send response: "+e.getMessage()); + } + sock.close(); + } + + private Response borrow() { + try { + Resource resource = resources.getByID(request.resourceID); + synchronized (resource) { + if (resource.isBorrowed && !request.coordinatorID.equals(resource.borrower)) { + return new Response(Response.Status.ALREADY_BORROWED, + request.coordinatorID+" cannot borrow "+request.resourceID + +"; already borrowed by "+resource.borrower); + } else if (request.duration <= 0) { + return new Response(Response.Status.INVALID_DURATION, + "duration "+request.duration+" less than 1"); + } else if (request.duration > resource.duration) { + return new Response(Response.Status.INVALID_DURATION, + "cannot borrow "+resource.id+" for duration of "+request.duration + +"; only "+resource.duration+" remaining"); + } + + if (resource.borrower.equals(request.coordinatorID)) { + // Resource is already borrowed. Add to existing duration. + resource.borrowDuration += request.duration; + } else { + resource.borrowDuration = request.duration; + } + resource.borrower = request.coordinatorID; + resource.isBorrowed = true; + resource.duration -= request.duration; + + return new Response(Response.Status.SUCCESS, request.coordinatorID + +" successfully borrowed "+request.resourceID); + } + } catch (NoSuchElementException e) { + return new Response(Response.Status.NO_SUCH_RESOURCE, "no such resource "+request.resourceID); + } + } + } + + public static class Response implements Serializable { + public Status status; + public String message; + + private Response(Status status, String message) { + this.status = status; + this.message = message; + } + + private Response() { + this(Status.SUCCESS, ""); + } + + public enum Status { + SUCCESS, NO_SUCH_RESOURCE, ALREADY_BORROWED, INVALID_DURATION + } + } +} + |