From 6a559bc5ab1ec851ecd57f627147b49f98b6c8b7 Mon Sep 17 00:00:00 2001 From: Brett Date: Sat, 15 Apr 2023 15:29:19 -0400 Subject: [PATCH] working on client side --- .idea/runConfigurations.xml | 10 -- Assignment 4/src/ca/cosc3p91/a4/Main.java | 2 +- .../ca/cosc3p91/a4/util/network/Client.java | 42 ++++++- .../ca/cosc3p91/a4/util/network/Message.java | 56 ++++++++- .../ca/cosc3p91/a4/util/network/Server.java | 114 ++++++++---------- 5 files changed, 141 insertions(+), 83 deletions(-) delete mode 100644 .idea/runConfigurations.xml diff --git a/.idea/runConfigurations.xml b/.idea/runConfigurations.xml deleted file mode 100644 index 797acea..0000000 --- a/.idea/runConfigurations.xml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/Assignment 4/src/ca/cosc3p91/a4/Main.java b/Assignment 4/src/ca/cosc3p91/a4/Main.java index 1242464..47c2b32 100644 --- a/Assignment 4/src/ca/cosc3p91/a4/Main.java +++ b/Assignment 4/src/ca/cosc3p91/a4/Main.java @@ -6,7 +6,7 @@ import java.io.*; public class Main { public static void main(String[] args) throws IOException { - Client gameClient = new Client(42069); + Client gameClient = new Client("localhost"); } } diff --git a/Assignment 4/src/ca/cosc3p91/a4/util/network/Client.java b/Assignment 4/src/ca/cosc3p91/a4/util/network/Client.java index 3b65efb..5df9280 100644 --- a/Assignment 4/src/ca/cosc3p91/a4/util/network/Client.java +++ b/Assignment 4/src/ca/cosc3p91/a4/util/network/Client.java @@ -2,21 +2,30 @@ package ca.cosc3p91.a4.util.network; import ca.cosc3p91.a4.userinterface.GameDisplay; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; -public class Client { - GameDisplay view = new GameDisplay(); +public class Client implements Runnable { + private GameDisplay view = new GameDisplay(); + + private DatagramSocket clientSocket; + private boolean running = true; + private Thread receiveThread; + + public Client(String address) throws IOException { + InetAddress serverAddress = InetAddress.getByName(address); + clientSocket = new DatagramSocket(); + receiveThread = new Thread(this); + receiveThread.start(); - public Client(int port) throws IOException { - DatagramSocket clientSocket = new DatagramSocket(); - InetAddress IPAddress = InetAddress.getByName("localhost"); String prompt; byte[] sendData = new byte[1024]; byte[] receiveData = new byte[1024]; - while (true) { + while (running) { if ((prompt = view.nextInput()) != null) { if (!prompt.isEmpty() && prompt.charAt(0) == '6') break; sendData = prompt.getBytes(); @@ -31,4 +40,25 @@ public class Client { } clientSocket.close(); } + + public void run(){ + while (running){ + try { + byte[] receiveData = new byte[Server.PACKET_SIZE]; + DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); + clientSocket.receive(receivePacket); + + DataInputStream stream = new DataInputStream(new ByteArrayInputStream(receivePacket.getData())); + + byte packetID = stream.readByte(); + long clientID = stream.readLong(); + long messageID = stream.readLong(); + + + + } catch (Exception e){ + e.printStackTrace(); + } + } + } } diff --git a/Assignment 4/src/ca/cosc3p91/a4/util/network/Message.java b/Assignment 4/src/ca/cosc3p91/a4/util/network/Message.java index d5dc741..319ff9d 100644 --- a/Assignment 4/src/ca/cosc3p91/a4/util/network/Message.java +++ b/Assignment 4/src/ca/cosc3p91/a4/util/network/Message.java @@ -1,5 +1,8 @@ package ca.cosc3p91.a4.util.network; +import ca.cosc3p91.a4.util.Time; + +import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -26,32 +29,77 @@ public class Message { return messageID; } - public static class ReceivedMessage extends Message { + public static class Received extends Message { private final DataInputStream reader; + private final byte[] data; - public ReceivedMessage(byte packetID, long clientID, long messageID, DataInputStream reader) { + public Received(byte packetID, long clientID, long messageID, DataInputStream reader, byte[] data) { super(packetID, clientID, messageID); this.reader = reader; + this.data = data; } public DataInputStream getReader(){ return reader; } + + public byte[] getData(){ + return data; + } } - public static class SentMessage extends Message { + public static class Sent extends Message { private final DataOutputStream writer; + private final ByteArrayOutputStream data; + private boolean ack = false; + private final Time timeSent; - public SentMessage(byte packetID, long clientID, long messageID, DataOutputStream writer) { + /** + * A message packet which will be sent to a client or the server, contains the standard message header and + * writes the header to the stream, make sure you don't write into the stream before constructing this! + * + * @param packetID type of this message + * @param clientID the client id, if this is going to the client it is unlikely to be used but should always be correct! + * @param messageID client specific message id, used to reference/acknowledge messages + * @param writer stream to write to + * @param data byte array stream which contains the byte[] used in packet construction + */ + public Sent(byte packetID, long clientID, long messageID, DataOutputStream writer, ByteArrayOutputStream data) { super(packetID, clientID, messageID); this.writer = writer; + this.data = data; + timeSent = Time.getTime(); + // write the header to the stream, make sure you don't write into the stream before constructing this! + try { + writer.writeByte(packetID); + writer.writeLong(clientID); + writer.writeLong(messageID); + } catch (Exception e){ + e.printStackTrace(); + } + } + + public void acknowledged(){ + this.ack = true; + } + + public boolean isAcknowledged(){ + return ack; } public DataOutputStream getWriter(){ return writer; } + + public Time getTimeSinceSent(){ + return Time.getTime().difference(timeSent); + } + + public ByteArrayOutputStream getData(){ + return data; + } } } diff --git a/Assignment 4/src/ca/cosc3p91/a4/util/network/Server.java b/Assignment 4/src/ca/cosc3p91/a4/util/network/Server.java index ea4945b..1cc7e98 100644 --- a/Assignment 4/src/ca/cosc3p91/a4/util/network/Server.java +++ b/Assignment 4/src/ca/cosc3p91/a4/util/network/Server.java @@ -1,7 +1,6 @@ package ca.cosc3p91.a4.util.network; import ca.cosc3p91.a4.game.GameEngine; -import ca.cosc3p91.a4.util.Time; import java.io.*; import java.net.*; @@ -17,9 +16,9 @@ public class Server implements Runnable { private final HashMap clients = new HashMap<>(); private long clientAssignmentID = 0; - private long lastMessageID = 0; private final DatagramSocket socket; private final Thread ioThread; + private long lastSentMessageID = 0; private GameEngine mainEngine; @@ -33,22 +32,25 @@ public class Server implements Runnable { public void run(){ while (running) { - byte[] receiveData = new byte[PACKET_SIZE]; - DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); try { + byte[] receiveData = new byte[PACKET_SIZE]; + DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); // BLOCKING! socket.receive(receivePacket); + // read in the message header that is associated with every message. DataInputStream stream = new DataInputStream(new ByteArrayInputStream(receivePacket.getData())); byte packetID = stream.readByte(); long clientID = stream.readLong(); + long messageID = stream.readLong(); ConnectedClient client = clients.get(clientID); + // the server must handle connection requests while the client's processing thread will handle all other messages if (packetID == PacketTable.CONNECT){ - clients.put(++clientAssignmentID, new ConnectedClient(socket, clientID, receivePacket.getAddress(), receivePacket.getPort())); - } else if (packetID == PacketTable.DISCONNECT){ + clients.put(++clientAssignmentID, new ConnectedClient(socket, clientID, messageID, receivePacket.getAddress(), receivePacket.getPort())); + } else if (packetID == PacketTable.DISCONNECT) { if (client == null) throw new ServerException("Client disconnected with invalid client id! (" + clientID + ")"); client.halt(); @@ -56,7 +58,7 @@ public class Server implements Runnable { } else { if (client == null) throw new ServerException("Client message with invalid client id! (" + clientID + ")"); - client.handleRequest(new ConnectedClient.ServerRequest(packetID, stream)); + client.handleRequest(new Message.Received(packetID, clientID, messageID, stream, receivePacket.getData())); } } catch (IOException | InterruptedException e) { throw new RuntimeException(e); @@ -76,100 +78,88 @@ public class Server implements Runnable { private static class ConnectedClient implements Runnable { private final InetAddress address; private final int port; - private final ArrayList requests = new ArrayList<>(); - private final Queue pendingRequests = new PriorityQueue<>(); - // could use read/write lock for some of this, as certain operations, mostly timeout check, won't modify data. + private final Queue pendingRequests = new PriorityQueue<>(); private final ReentrantLock requestLock = new ReentrantLock(); - private final DatagramSocket socket; + private final HashMap sentMessages = new HashMap<>(); + private final DatagramSocket serverSocket; private final long clientID; private volatile boolean running = true; private final Thread processingThread; - public ConnectedClient(DatagramSocket socket, long clientID, InetAddress address, int port){ - this.socket = socket; + public ConnectedClient(DatagramSocket serverSocket, long clientID, long messageID, InetAddress address, int port){ + this.serverSocket = serverSocket; this.address = address; this.port = port; this.clientID = clientID; processingThread = new Thread(this); processingThread.start(); + + ByteArrayOutputStream bstream = new ByteArrayOutputStream(); + DataOutputStream stream = new DataOutputStream(bstream); + sendMessage(new Message.Sent(PacketTable.ACK, clientID, messageID, stream, bstream)); } - public void handleRequest(ServerRequest request){ + public void handleRequest(Message.Received request){ + if (request.getClientID() != this.clientID) + throw new RuntimeException("Server sent us a message, yet we are not the intended recipient!"); requestLock.lock(); pendingRequests.add(request); requestLock.unlock(); } - private void processRequest(ServerRequest request){ + private void processRequest(Message.Received request){ try { - switch (request.getID()){ + switch (request.getPacketID()){ case PacketTable.ACK: - long messageID = request.getDataStream().readLong(); + Message.Sent message = sentMessages.get(request.getMessageID()); + if (message == null) + throw new RuntimeException("A message was acknowledged but does not exist!"); + message.acknowledged(); + break; + case PacketTable.MESSAGE: + System.out.println(request.getReader().readUTF()); break; } - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } } public void run(){ while (running){ + // handle request processing without blocking the I/O thread requestLock.lock(); - while (pendingRequests.size() > 0) { - ServerRequest request = pendingRequests.remove(); - processRequest(request); - requests.add(request); - } + Message.Received request = pendingRequests.remove(); + processRequest(request); requestLock.unlock(); - requests.removeIf(ServerRequest::isAck); - for (ServerRequest request : requests){ - // TODO: - if (request.getTimeSinceReceived().get() > MAX_PACKET_ACK_TIME_SECONDS) - System.out.println("A packet hasn't received a ack, it might have been lost!"); + for (Map.Entry message : sentMessages.entrySet()){ + if (message.getValue().getTimeSinceSent().get() > MAX_PACKET_ACK_TIME_SECONDS) { + System.out.println("The server did not process our message, did they receive it?"); + // todo: resend message + } } } } + public void sendMessage(Message.Sent message){ + this.sentMessages.put(message.getMessageID(), message); + byte[] data = message.getData().toByteArray(); + if (data.length > PACKET_SIZE) + throw new RuntimeException("Unable to send packet as it exceeds maximum packet size!"); + DatagramPacket request = new DatagramPacket(data, data.length, address, port); + try { + serverSocket.send(request); + } catch (IOException e) { + e.printStackTrace(); + } + } + public void halt() throws InterruptedException { running = false; processingThread.join(); } - private static class ServerRequest { - private final byte id; - private final Time receiveTime; - private final DataInputStream receive; - // ack should be on messages send to the client, which the client acks! - private boolean ack = false; - - public ServerRequest(byte id, DataInputStream receive){ - this.id = id; - this.receive = receive; - receiveTime = Time.getTime(); - } - - public byte getID(){ - return id; - } - - public void acknowledged(){ - this.ack = true; - } - - public boolean isAck(){ - return this.ack; - } - - public DataInputStream getDataStream(){ - return receive; - } - - public Time getTimeSinceReceived(){ - return Time.getTime().difference(receiveTime); - } - } - } }