diff --git a/src/main/java/server/ChunkedCompressedChecksumFileReader.java b/src/main/java/server/ChunkedCompressedChecksumFileReader.java index b27c77b..58e0d99 100644 --- a/src/main/java/server/ChunkedCompressedChecksumFileReader.java +++ b/src/main/java/server/ChunkedCompressedChecksumFileReader.java @@ -2,16 +2,13 @@ package server; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import net.jpountz.xxhash.StreamingXXHash64; -import shared.ArrayData; import shared.FileUtil; import java.io.*; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.concurrent.TimeUnit; public class ChunkedCompressedChecksumFileReader { @@ -20,6 +17,14 @@ public class ChunkedCompressedChecksumFileReader { private final DataOutputStream fileOutputWriter; private final long seed; + private Span currentSpan = null; + private Scope currentScope = null; + private long count = 0; + private static final long MAX_COUNT = 50; + + private long uncompressed_bytes = 0; + private long compressed_bytes = 0; + public ChunkedCompressedChecksumFileReader(DataInputStream networkStreamReader, String fileOutputPath, long seed) throws IOException { this.networkStreamReader = networkStreamReader; this.streamHash = FileUtil.XX_HASH_FACTORY.newStreamingHash64(seed); @@ -28,26 +33,36 @@ public class ChunkedCompressedChecksumFileReader { } public FileHeader readChunk(Tracer trace, Span sp) throws IOException { - Span gf = trace.spanBuilder("Chunk Read").startSpan(); - FileHeader header = readHeader(); - gf.setAttribute("Read Uncompressed", header.getUncompressed()); - gf.setAttribute("Read Compressed", header.getCompressed()); - gf.setAttribute("Read Hash", header.getHash()); - try (Scope scope = gf.makeCurrent()) { - if (header.getUncompressed() == 0) - return header; - gf.addEvent("Read Data"); - byte[] data = readSome(header); - gf.addEvent("Decompress Data"); - byte[] decompressed = decompress(header, data); - gf.addEvent("Hash"); - hash(header, decompressed); - gf.addEvent("Write"); - fileOutputWriter.write(decompressed, 0, decompressed.length); - gf.addEvent("End"); - } finally { - gf.end(); + if (++count >= MAX_COUNT) { + currentSpan.addEvent("--{End Read}--"); + currentScope.close(); + currentSpan.end(); + currentSpan = null; + currentScope = null; } + if (currentSpan == null) { + count = 0; + currentSpan = trace.spanBuilder("Chunk Read").startSpan(); + currentScope = currentSpan.makeCurrent(); + } + FileHeader header = readHeader(); + uncompressed_bytes += header.getUncompressed(); + compressed_bytes += header.getCompressed(); + currentSpan.addEvent("--{Begin Read}--"); + currentSpan.addEvent("Attribute: Read Uncompressed = " + header.getUncompressed()); + currentSpan.addEvent("Attribute: Read Compressed = " + header.getCompressed()); + currentSpan.addEvent("Attribute: Compression Ratio = " + ((double)header.getUncompressed() / header.getCompressed())); + currentSpan.addEvent("Attribute: Read Hash = " + header.getHash()); + if (header.getUncompressed() == 0) + return header; + currentSpan.addEvent("Read Data"); + byte[] data = readSome(header); + currentSpan.addEvent("Decompress Data"); + byte[] decompressed = decompress(header, data); + currentSpan.addEvent("Hash"); + hash(header, decompressed); + currentSpan.addEvent("Write"); + fileOutputWriter.write(decompressed, 0, decompressed.length); return header; } @@ -57,6 +72,21 @@ public class ChunkedCompressedChecksumFileReader { throw new RuntimeException("Stream total hash doesn't match the client's sent hash!"); fileOutputWriter.flush(); fileOutputWriter.close(); + currentSpan.addEvent("--{End Read}--"); + currentScope.close(); + currentSpan.end(); + } + + public long getCompressedBytes(){ + return compressed_bytes; + } + + public long getUncompressedBytes(){ + return uncompressed_bytes; + } + + public double getRatio(){ + return (double) uncompressed_bytes / (double) compressed_bytes; } private FileHeader readHeader() throws IOException { diff --git a/src/main/java/server/Connection.java b/src/main/java/server/Connection.java index 9ece3cc..06d4172 100644 --- a/src/main/java/server/Connection.java +++ b/src/main/java/server/Connection.java @@ -51,19 +51,20 @@ public class Connection implements Runnable { } try { if (in.available() > 0) { - fileSend.addEvent("File Received"); - Span fileIn = trace.spanBuilder("File Received").setAttribute("Files Received", filesReceived).startSpan(); - try (Scope s = fileIn.makeCurrent()){ - byte command = in.readByte(); + byte command = in.readByte(); - if (command == FileUtil.COMMAND.CLOSE.type) { - System.out.println("Client sent disconnect signal!"); - break; - } - if (command == FileUtil.COMMAND.WRITE.type) + if (command == FileUtil.COMMAND.CLOSE.type) { + System.out.println("Client sent disconnect signal!"); + break; + } + if (command == FileUtil.COMMAND.WRITE.type) { + fileSend.addEvent("File Received"); + Span fileIn = trace.spanBuilder("File Received").setAttribute("Files Received", filesReceived).startSpan(); + try (Scope s = fileIn.makeCurrent()) { FileUtil.receive(in, trace, fileIn); - } finally { - fileIn.end(); + } finally { + fileIn.end(); + } } } } catch (IOException e) { diff --git a/src/main/java/server/Server.java b/src/main/java/server/Server.java index 4a04e58..1a85bdf 100644 --- a/src/main/java/server/Server.java +++ b/src/main/java/server/Server.java @@ -24,7 +24,7 @@ public class Server { public static volatile boolean running = true; - private static final OpenTelemetry ot = OTelUtils.create(); + private static final OpenTelemetry ot = OTelUtils.create("CumServer"); public Server() { Tracer main = ot.getTracer("Main Server", "0.69"); diff --git a/src/main/java/shared/FileUtil.java b/src/main/java/shared/FileUtil.java index b2a1c2d..e0523c0 100644 --- a/src/main/java/shared/FileUtil.java +++ b/src/main/java/shared/FileUtil.java @@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit; public class FileUtil { // do not change it breaks stuff - protected static final int READER_SIZE = 131072; + protected static final int READER_SIZE = 65000; public static final long SEED = 691; private static final LZ4Factory LZ_FACTORY = LZ4Factory.fastestInstance(); @@ -77,6 +77,9 @@ public class FileUtil { break; } } + sp.setAttribute("Data Read Uncompressed Bytes", reader.getUncompressedBytes()); + sp.setAttribute("Data Read Compressed Bytes", reader.getCompressedBytes()); + sp.setAttribute("Data Compression Ratio", reader.getRatio()); reader.close(); System.out.println("Writing " + path + " complete"); sp.addEvent("File Written"); diff --git a/src/main/java/shared/OTelUtils.java b/src/main/java/shared/OTelUtils.java index f4af919..f7cfcb2 100644 --- a/src/main/java/shared/OTelUtils.java +++ b/src/main/java/shared/OTelUtils.java @@ -39,8 +39,8 @@ public class OTelUtils { .buildAndRegisterGlobal(); } - public static OpenTelemetry create(){ - Resource resource = Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME.getKey(), "cum").put(ResourceAttributes.SERVICE_VERSION.getKey(), "0.1.0").build(); + public static OpenTelemetry create(String name){ + Resource resource = Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME.getKey(), name).put(ResourceAttributes.SERVICE_VERSION.getKey(), "1.3.37").build(); SpanExporter otlpExporter = OtlpGrpcSpanExporter.builder() .setEndpoint("http://sc.on.underlying.skynet.tpgc.me:4317") @@ -49,7 +49,7 @@ public class OTelUtils { BatchSpanProcessor batchSpanProcessor = BatchSpanProcessor.builder(otlpExporter) .setMaxQueueSize(2048) - .setMaxExportBatchSize(512) // Example max export batch size + .setMaxExportBatchSize(512) .build(); SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() diff --git a/target/classes/client/ChunkedCompressedChecksumFileWriter.class b/target/classes/client/ChunkedCompressedChecksumFileWriter.class index 18420d6..2cb0de6 100644 Binary files a/target/classes/client/ChunkedCompressedChecksumFileWriter.class and b/target/classes/client/ChunkedCompressedChecksumFileWriter.class differ diff --git a/target/classes/server/ChunkedCompressedChecksumFileReader.class b/target/classes/server/ChunkedCompressedChecksumFileReader.class index 6acedb5..2ef7130 100644 Binary files a/target/classes/server/ChunkedCompressedChecksumFileReader.class and b/target/classes/server/ChunkedCompressedChecksumFileReader.class differ diff --git a/target/classes/server/Connection.class b/target/classes/server/Connection.class index 09208c4..f493d39 100644 Binary files a/target/classes/server/Connection.class and b/target/classes/server/Connection.class differ diff --git a/target/classes/shared/FileUtil$InvalidUsageException.class b/target/classes/shared/FileUtil$InvalidUsageException.class index f5f0dc4..30486b1 100644 Binary files a/target/classes/shared/FileUtil$InvalidUsageException.class and b/target/classes/shared/FileUtil$InvalidUsageException.class differ diff --git a/target/classes/shared/FileUtil.class b/target/classes/shared/FileUtil.class index 4a1e8e0..2532607 100644 Binary files a/target/classes/shared/FileUtil.class and b/target/classes/shared/FileUtil.class differ