diff --git a/src/client/FileChunkingWriter.java b/src/client/FileChunkingWriter.java new file mode 100644 index 0000000..cd9844a --- /dev/null +++ b/src/client/FileChunkingWriter.java @@ -0,0 +1,74 @@ +package client; + +import net.jpountz.xxhash.StreamingXXHash64; +import shared.ArrayData; +import shared.FileUtil; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class FileChunkingWriter { + + private final DataOutputStream writer; + private final StreamingXXHash64 streamHash; + private final DataInputStream fileReader; + private final int bufferSize; + private final long seed; + + public FileChunkingWriter(DataOutputStream writer, DataInputStream fileReader, int bufferSize, long seed){ + this.writer = writer; + this.streamHash = FileUtil.XX_HASH_FACTORY.newStreamingHash64(seed); + this.fileReader = fileReader; + this.bufferSize = bufferSize; + this.seed = seed; + } + + public void processChunk() throws IOException { + // read / write files in chunks + byte[] uncompressed = readSome(); + if (uncompressed.length == 0) + return; + + // create a checksum for this chunk + update the overall checksum + long hash = hash(uncompressed); + + // apply compression + ArrayData compressed = compress(uncompressed); + + // write data + writer.writeInt(uncompressed.length); + writer.writeInt(compressed.getActualLength()); + writer.writeLong(hash); + writer.write(compressed.getData(), 0, compressed.getActualLength()); + writer.flush(); + } + + public void close() throws IOException { + writer.writeInt(0); + writer.writeLong(streamHash.getValue()); + writer.flush(); + } + + private byte[] readSome() throws IOException { + byte[] readBytes = new byte[Integer.min(fileReader.available(), bufferSize)]; + + int totalRead = fileReader.read(readBytes); + assert(readBytes.length == totalRead); + + return readBytes; + } + + private long hash(byte[] input){ + streamHash.update(input, 0, input.length); + return FileUtil.HASH_64.hash(input, 0, input.length, seed); + } + + private ArrayData compress(byte[] input){ + int maxCompressedLength = FileUtil.COMPRESSOR.maxCompressedLength(input.length); + byte[] compressedBytes = new byte[maxCompressedLength]; + int compressedLength = FileUtil.COMPRESSOR.compress(input, 0, input.length, compressedBytes, 0, maxCompressedLength); + return new ArrayData(compressedBytes, compressedLength); + } + +} diff --git a/src/server/Connection.java b/src/server/Connection.java index b5920c8..d9ebe67 100644 --- a/src/server/Connection.java +++ b/src/server/Connection.java @@ -10,7 +10,7 @@ import shared.FileHeader; import java.io.*; import java.net.Socket; -public class Connection extends Thread { +public class Connection implements Runnable { private final Socket clientSocket; private final Server server; diff --git a/src/server/Server.java b/src/server/Server.java index 3f70e0e..144bac6 100644 --- a/src/server/Server.java +++ b/src/server/Server.java @@ -3,25 +3,31 @@ package server; import shared.ExceptionLogger; import java.io.IOException; import java.net.ServerSocket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class Server { + private static final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + public static final int SERVER_PORT = 42069; private volatile boolean running = true; public Server() { + System.out.println("Starting server"); try { ServerSocket serverSocket = new ServerSocket(SERVER_PORT); + System.out.println("Server Started"); - while (running) { - new Connection(this, serverSocket.accept()).start(); - } + while (running) + executor.execute(new Connection(this, serverSocket.accept())); serverSocket.close(); } catch (IOException e) { ExceptionLogger.log(e); } + executor.shutdown(); } public boolean isRunning(){ diff --git a/src/shared/ArrayData.java b/src/shared/ArrayData.java new file mode 100644 index 0000000..322207e --- /dev/null +++ b/src/shared/ArrayData.java @@ -0,0 +1,20 @@ +package shared; + +public class ArrayData { + + private final byte[] data; + private final int actualLength; + + public ArrayData(byte[] data, int actualLength){ + this.data = data; + this.actualLength = actualLength; + } + + public byte[] getData() { + return data; + } + + public int getActualLength() { + return actualLength; + } +} diff --git a/src/shared/FileHeader.java b/src/shared/FileHeader.java index 56669f6..0422ce3 100644 --- a/src/shared/FileHeader.java +++ b/src/shared/FileHeader.java @@ -1,12 +1,8 @@ package shared; import client.Client; -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4FastDecompressor; +import client.FileChunkingWriter; import net.jpountz.xxhash.StreamingXXHash64; -import net.jpountz.xxhash.XXHash64; -import net.jpountz.xxhash.XXHashFactory; import java.io.*; import java.nio.file.Files; @@ -14,17 +10,6 @@ import java.nio.file.Paths; public class FileHeader { - private static final int READER_SIZE = 8192; - - private static final LZ4Factory LZ_FACTORY = LZ4Factory.fastestInstance(); - private static final LZ4Compressor COMPRESSOR = LZ_FACTORY.highCompressor(); - private static final LZ4FastDecompressor DECOMPRESSOR = LZ_FACTORY.fastDecompressor(); - - private static final XXHashFactory XX_HASH_FACTORY = XXHashFactory.fastestInstance(); - private static final XXHash64 HASH_64 = XX_HASH_FACTORY.hash64(); - - private static final long SEED = 691; - public enum COMMAND { WRITE((byte) 1); public final byte type; @@ -49,41 +34,20 @@ public class FileHeader { System.out.println(relative_path); } - public void write(DataOutputStream writer) { + public void write(DataOutputStream dataOut) { try { DataInputStream fileReader = new DataInputStream(new BufferedInputStream(Files.newInputStream(Paths.get(full_path)))); - writer.writeByte(COMMAND.WRITE.type); - writer.writeUTF(relative_path); + dataOut.writeByte(COMMAND.WRITE.type); + dataOut.writeUTF(relative_path); - StreamingXXHash64 streamHash = XX_HASH_FACTORY.newStreamingHash64(SEED); - while (fileReader.available() > 0) { - // read / write files in chunks - byte[] readBytes = new byte[Integer.min(fileReader.available(), READER_SIZE)]; + FileChunkingWriter writer = new FileChunkingWriter(dataOut, fileReader, FileUtil.READER_SIZE, FileUtil.SEED); - int totalRead = fileReader.read(readBytes); - if (totalRead <= 0) - break; + while (fileReader.available() > 0) + writer.processChunk(); - // create a checksum for this chunk + update the overall checksum - streamHash.update(readBytes, 0, totalRead); - long hash = HASH_64.hash(readBytes, 0, totalRead, SEED); - - // apply compression - int maxCompressedLength = COMPRESSOR.maxCompressedLength(readBytes.length); - byte[] compressedBytes = new byte[maxCompressedLength]; - int compressedLength = COMPRESSOR.compress(readBytes, 0, readBytes.length, compressedBytes, 0, maxCompressedLength); - - writer.writeInt(totalRead); - writer.writeInt(compressedLength); - writer.writeLong(hash); - writer.write(compressedBytes, 0, compressedLength); - writer.flush(); - } + writer.close(); fileReader.close(); - writer.writeInt(0); - writer.writeLong(streamHash.getValue()); - writer.flush(); } catch (Exception e) { ExceptionLogger.log(e); } @@ -96,7 +60,7 @@ public class FileHeader { DataOutputStream writer = new DataOutputStream(new BufferedOutputStream(Files.newOutputStream(Paths.get(path)))); - StreamingXXHash64 computedStreamHash = XX_HASH_FACTORY.newStreamingHash64(SEED); + StreamingXXHash64 computedStreamHash = FileUtil.XX_HASH_FACTORY.newStreamingHash64(FileUtil.SEED); while (true) { int uncompressed_size = reader.readInt(); @@ -111,11 +75,11 @@ public class FileHeader { assert(amount == compressed_size); byte[] restored = new byte[uncompressed_size]; - int len = DECOMPRESSOR.decompress(data, 0, restored, 0, uncompressed_size); + int len = FileUtil.DECOMPRESSOR.decompress(data, 0, restored, 0, uncompressed_size); assert(len == uncompressed_size); - long computedHash = HASH_64.hash(restored, 0, uncompressed_size, SEED); + long computedHash = FileUtil.HASH_64.hash(restored, 0, uncompressed_size, FileUtil.SEED); computedStreamHash.update(restored, 0, uncompressed_size); if (hash != computedHash) diff --git a/src/shared/FileUtil.java b/src/shared/FileUtil.java new file mode 100644 index 0000000..0c9ebdb --- /dev/null +++ b/src/shared/FileUtil.java @@ -0,0 +1,22 @@ +package shared; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +public class FileUtil { + + protected static final int READER_SIZE = 8192; + public static final long SEED = 691; + + private static final LZ4Factory LZ_FACTORY = LZ4Factory.fastestInstance(); + public static final LZ4Compressor COMPRESSOR = LZ_FACTORY.highCompressor(); + public static final LZ4FastDecompressor DECOMPRESSOR = LZ_FACTORY.fastDecompressor(); + + public static final XXHashFactory XX_HASH_FACTORY = XXHashFactory.fastestInstance(); + public static final XXHash64 HASH_64 = XX_HASH_FACTORY.hash64(); + +} +