diff --git a/src/main/java/client/ChunkedCompressedChecksumFileWriter.java b/src/main/java/client/ChunkedCompressedChecksumFileWriter.java index c16eecc..02a5da8 100644 --- a/src/main/java/client/ChunkedCompressedChecksumFileWriter.java +++ b/src/main/java/client/ChunkedCompressedChecksumFileWriter.java @@ -1,5 +1,9 @@ package client; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import net.jpountz.xxhash.StreamingXXHash64; import shared.ArrayData; import shared.FileUtil; @@ -16,7 +20,14 @@ public class ChunkedCompressedChecksumFileWriter { private final int bufferSize; private final long seed; - public ChunkedCompressedChecksumFileWriter(DataOutputStream networkStreamWriter, DataInputStream fileInputReader, int bufferSize, long seed){ + private Span currentSpan = null; + private Scope currentScope = null; + + private long count = 0; + private long uncompressed_bytes = 0; + private long compressed_bytes = 0; + + public ChunkedCompressedChecksumFileWriter(DataOutputStream networkStreamWriter, DataInputStream fileInputReader, int bufferSize, long seed) { this.networkStreamWriter = networkStreamWriter; this.streamHash = FileUtil.XX_HASH_FACTORY.newStreamingHash64(seed); this.fileInputReader = fileInputReader; @@ -24,19 +35,45 @@ public class ChunkedCompressedChecksumFileWriter { this.seed = seed; } - public void processChunk() throws IOException { + public void processChunk(Tracer trace) throws IOException { + if (++count >= FileUtil.MAX_COUNT) { + currentSpan.addEvent("--{End Write}--"); + currentSpan.setStatus(StatusCode.OK); + currentScope.close(); + currentSpan.end(); + currentSpan = null; + currentScope = null; + count = 0; + } + if (currentSpan == null) { + currentSpan = trace.spanBuilder("Chunk Write").startSpan(); + currentScope = currentSpan.makeCurrent(); + } // read / write files in chunks + currentSpan.addEvent("--{Begin Write}--"); byte[] uncompressed = readSome(); if (uncompressed.length == 0) return; // create a checksum for this chunk + update the overall checksum + currentSpan.addEvent("Hash"); long hash = hash(uncompressed); // apply compression + currentSpan.addEvent("Compress"); ArrayData compressed = compress(uncompressed); + // track data + currentSpan.addEvent("Attribute: Write Uncompressed = " + uncompressed.length); + currentSpan.addEvent("Attribute: Write Compressed = " + compressed.getActualLength()); + currentSpan.addEvent("Attribute: Compression Ratio = " + ((double)uncompressed.length / (double)compressed.getActualLength())); + currentSpan.addEvent("Attribute: Write Hash = " + hash); + uncompressed_bytes += uncompressed.length; + compressed_bytes += compressed.getActualLength(); + count++; + // write data + currentSpan.addEvent("Write"); writeHeader(uncompressed.length, compressed.getActualLength(), hash); networkStreamWriter.write(compressed.getData(), 0, compressed.getActualLength()); networkStreamWriter.flush(); @@ -46,6 +83,11 @@ public class ChunkedCompressedChecksumFileWriter { networkStreamWriter.writeInt(0); networkStreamWriter.writeLong(streamHash.getValue()); networkStreamWriter.flush(); + if (currentSpan != null) { + currentSpan.addEvent("--{End Read}--"); + currentScope.close(); + currentSpan.end(); + } } private void writeHeader(int uncompressed, int compressed, long hash) throws IOException { @@ -58,21 +100,35 @@ public class ChunkedCompressedChecksumFileWriter { byte[] readBytes = new byte[Integer.min(fileInputReader.available(), bufferSize)]; int totalRead = fileInputReader.read(readBytes); - assert(readBytes.length == totalRead); + assert (readBytes.length == totalRead); return readBytes; } - private long hash(byte[] input){ + private long hash(byte[] input) { streamHash.update(input, 0, input.length); return FileUtil.HASH_64.hash(input, 0, input.length, seed); } - private ArrayData compress(byte[] input){ + private ArrayData compress(byte[] input) { int maxCompressedLength = FileUtil.COMPRESSOR.maxCompressedLength(input.length); byte[] compressedBytes = new byte[maxCompressedLength]; int compressedLength = FileUtil.COMPRESSOR.compress(input, 0, input.length, compressedBytes, 0, maxCompressedLength); return new ArrayData(compressedBytes, compressedLength); } + public long getCompressedBytes() { + return compressed_bytes; + } + + public long getUncompressedBytes() { + return uncompressed_bytes; + } + + public double getRatio() { + if (compressed_bytes == 0) + return 0; + return (double) uncompressed_bytes / (double) compressed_bytes; + } + } diff --git a/src/main/java/client/Client.java b/src/main/java/client/Client.java index 6b66fe5..2c7fa5d 100644 --- a/src/main/java/client/Client.java +++ b/src/main/java/client/Client.java @@ -1,8 +1,14 @@ package client; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import server.Server; import shared.ExceptionLogger; import shared.FileUtil; +import shared.OTelUtils; import java.io.*; import java.net.Socket; @@ -13,50 +19,84 @@ public class Client { private final Socket serverConnection; private final DataOutputStream out; private final DataInputStream in; + private OTelUtils.SexyContainer ot = OTelUtils.create("CumClient"); // Computational Unit Machine Client + private final Tracer tracer; + private final Scope sc; + private final Span s; public Client(String address, int port) throws IOException { serverConnection = new Socket(address, port); out = new DataOutputStream(new BufferedOutputStream(serverConnection.getOutputStream())); in = new DataInputStream(new BufferedInputStream(serverConnection.getInputStream())); + tracer = ot.ot.getTracer("A Client", "1.33.7"); + s = tracer.spanBuilder("Client Connection").setAttribute("Server Address", address).setAttribute("Server Port", port).startSpan(); + sc = s.makeCurrent(); } - public Client sendFile(String path){ - System.out.println("Sending path " + path); - FileUtil.write(path, out); - return this; - } - - public Client sendDir(String path){ - File p = new File(path); - ArrayDeque filesToCheck = new ArrayDeque<>(Arrays.asList(Objects.requireNonNull(p.listFiles()))); - while (!filesToCheck.isEmpty()) { - File f = filesToCheck.remove(); - if (f.isDirectory()){ - filesToCheck.add(f); - } else - sendFile(f.getPath()); + public Client sendFile(String path) { + Span sp = tracer.spanBuilder("Send File").setAttribute("File", path).startSpan(); + try (Scope seethe = sp.makeCurrent()) { + System.out.println("Sending path " + path); + FileUtil.write(path, out, tracer, sp); + System.out.println("Sent path " + path); + } finally { + sp.end(); } + ot.tp.forceFlush(); + ot.bp.forceFlush(); + ot.ox.flush(); return this; } - public void close(){ + public Client sendDir(String path) { + Span sd = tracer.spanBuilder("Send Directory").setAttribute("Directory", path).startSpan(); + try (Scope cope = sd.makeCurrent()) { + File p = new File(path); + ArrayDeque filesToCheck = new ArrayDeque<>(Arrays.asList(Objects.requireNonNull(p.listFiles()))); + while (!filesToCheck.isEmpty()) { + File f = filesToCheck.remove(); + System.out.println("Processing file " + f.getPath()); + if (f.isDirectory()) + filesToCheck.addAll(Arrays.asList(Objects.requireNonNull(f.listFiles()))); + else + sendFile(f.getPath()); + } + } finally { + sd.setStatus(StatusCode.OK); + sd.end(); + } + System.out.println("Sent directory " + path); + return this; + } + + public Client close() { try { + sc.close(); + s.setStatus(StatusCode.OK); + s.end(); out.writeByte(FileUtil.COMMAND.CLOSE.type); out.flush(); in.close(); out.close(); serverConnection.close(); - } catch (Exception e){ + } catch (Exception e) { ExceptionLogger.log(e); } System.out.println("Disconnected!"); + return this; + } + + public OTelUtils.SexyContainer getContainer() { + return ot; } public static void main(String[] args) { try { - new Client("localhost", Server.SERVER_PORT).sendDir("in/").close(); + OTelUtils.SexyContainer con = new Client("localhost", Server.SERVER_PORT).sendDir("in/").close().getContainer(); + con.tp.forceFlush(); + con.tp.shutdown(); //new Client("localhost", Server.SERVER_PORT).sendFile("in/ihaveafile.txt").close(); - } catch (Exception e){ + } catch (Exception e) { ExceptionLogger.log(e); } } diff --git a/src/main/java/server/ChunkedCompressedChecksumFileReader.java b/src/main/java/server/ChunkedCompressedChecksumFileReader.java index 58e0d99..d9eb0c4 100644 --- a/src/main/java/server/ChunkedCompressedChecksumFileReader.java +++ b/src/main/java/server/ChunkedCompressedChecksumFileReader.java @@ -1,6 +1,7 @@ package server; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; import net.jpountz.xxhash.StreamingXXHash64; @@ -19,9 +20,8 @@ public class ChunkedCompressedChecksumFileReader { private Span currentSpan = null; private Scope currentScope = null; - private long count = 0; - private static final long MAX_COUNT = 50; + private long count = 0; private long uncompressed_bytes = 0; private long compressed_bytes = 0; @@ -32,16 +32,17 @@ public class ChunkedCompressedChecksumFileReader { this.seed = seed; } - public FileHeader readChunk(Tracer trace, Span sp) throws IOException { - if (++count >= MAX_COUNT) { + public FileHeader readChunk(Tracer trace) throws IOException { + if (++count >= FileUtil.MAX_COUNT) { currentSpan.addEvent("--{End Read}--"); + currentSpan.setStatus(StatusCode.OK); currentScope.close(); currentSpan.end(); currentSpan = null; currentScope = null; + count = 0; } if (currentSpan == null) { - count = 0; currentSpan = trace.spanBuilder("Chunk Read").startSpan(); currentScope = currentSpan.makeCurrent(); } @@ -72,9 +73,11 @@ public class ChunkedCompressedChecksumFileReader { throw new RuntimeException("Stream total hash doesn't match the client's sent hash!"); fileOutputWriter.flush(); fileOutputWriter.close(); - currentSpan.addEvent("--{End Read}--"); - currentScope.close(); - currentSpan.end(); + if (currentSpan != null) { + currentSpan.addEvent("--{End Read}--"); + currentScope.close(); + currentSpan.end(); + } } public long getCompressedBytes(){ @@ -86,6 +89,8 @@ public class ChunkedCompressedChecksumFileReader { } public double getRatio(){ + if (compressed_bytes == 0) + return 0; return (double) uncompressed_bytes / (double) compressed_bytes; } diff --git a/src/main/java/server/Connection.java b/src/main/java/server/Connection.java index 06d4172..6d8d029 100644 --- a/src/main/java/server/Connection.java +++ b/src/main/java/server/Connection.java @@ -3,6 +3,7 @@ package server; import client.Client; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -73,6 +74,7 @@ public class Connection implements Runnable { } } } finally { + fileSend.setStatus(StatusCode.OK); fileSend.end(); } try { diff --git a/src/main/java/server/Server.java b/src/main/java/server/Server.java index 1a85bdf..5c965aa 100644 --- a/src/main/java/server/Server.java +++ b/src/main/java/server/Server.java @@ -24,10 +24,10 @@ public class Server { public static volatile boolean running = true; - private static final OpenTelemetry ot = OTelUtils.create("CumServer"); + private static final OTelUtils.SexyContainer ot = OTelUtils.create("CumServer"); public Server() { - Tracer main = ot.getTracer("Main Server", "0.69"); + Tracer main = ot.ot.getTracer("Main Server", "0.69"); try { System.out.println("Starting server"); ServerSocket serverSocket = new ServerSocket(SERVER_PORT); diff --git a/src/main/java/shared/FileUtil.java b/src/main/java/shared/FileUtil.java index e0523c0..49412ae 100644 --- a/src/main/java/shared/FileUtil.java +++ b/src/main/java/shared/FileUtil.java @@ -2,6 +2,7 @@ package shared; import client.ChunkedCompressedChecksumFileWriter; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; import net.jpountz.lz4.LZ4Compressor; @@ -21,6 +22,7 @@ public class FileUtil { // do not change it breaks stuff protected static final int READER_SIZE = 65000; public static final long SEED = 691; + public static final long MAX_COUNT = 128; private static final LZ4Factory LZ_FACTORY = LZ4Factory.fastestInstance(); public static final LZ4Compressor COMPRESSOR = LZ_FACTORY.highCompressor(); @@ -39,7 +41,7 @@ public class FileUtil { } } - public static void write(String path, DataOutputStream dataOut) { + public static void write(String path, DataOutputStream dataOut, Tracer trace, Span sp) { validatePath(path); String relative_path = path.replace(System.getProperty("user.dir"), ""); try { @@ -51,7 +53,12 @@ public class FileUtil { ChunkedCompressedChecksumFileWriter writer = new ChunkedCompressedChecksumFileWriter(dataOut, fileReader, FileUtil.READER_SIZE, FileUtil.SEED); while (fileReader.available() > 0) - writer.processChunk(); + writer.processChunk(trace); + + sp.setAttribute("Data Read Uncompressed Bytes", writer.getUncompressedBytes()); + sp.setAttribute("Data Read Compressed Bytes", writer.getCompressedBytes()); + sp.setAttribute("Data Compression Ratio", writer.getRatio()); + sp.setStatus(StatusCode.OK); writer.close(); fileReader.close(); @@ -72,7 +79,7 @@ public class FileUtil { // ugh I want while(reader.readChunk().getUncompressed()); but it makes warnings!!! while (true) { - if (reader.readChunk(trace, sp).getUncompressed() == 0) { + if (reader.readChunk(trace).getUncompressed() == 0) { sp.addEvent("Chunk Read"); break; } @@ -80,6 +87,7 @@ public class FileUtil { sp.setAttribute("Data Read Uncompressed Bytes", reader.getUncompressedBytes()); sp.setAttribute("Data Read Compressed Bytes", reader.getCompressedBytes()); sp.setAttribute("Data Compression Ratio", reader.getRatio()); + sp.setStatus(StatusCode.OK); reader.close(); System.out.println("Writing " + path + " complete"); sp.addEvent("File Written"); diff --git a/src/main/java/shared/OTelUtils.java b/src/main/java/shared/OTelUtils.java index f7cfcb2..2e7c35e 100644 --- a/src/main/java/shared/OTelUtils.java +++ b/src/main/java/shared/OTelUtils.java @@ -23,6 +23,19 @@ import io.opentelemetry.semconv.ResourceAttributes; public class OTelUtils { + public static class SexyContainer { + public OpenTelemetry ot; + public SdkTracerProvider tp; + public BatchSpanProcessor bp; + public SpanExporter ox; + public SexyContainer(OpenTelemetry ot, SdkTracerProvider tp, BatchSpanProcessor bp, SpanExporter ox){ + this.ot = ot; + this.tp = tp; + this.bp = bp; + this.ox = ox; + } + } + public static OpenTelemetry createLogger(){ Resource resource = Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME, "cum").put(ResourceAttributes.SERVICE_VERSION, "0.1.0").build(); @@ -39,7 +52,7 @@ public class OTelUtils { .buildAndRegisterGlobal(); } - public static OpenTelemetry create(String name){ + public static SexyContainer create(String name){ Resource resource = Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME.getKey(), name).put(ResourceAttributes.SERVICE_VERSION.getKey(), "1.3.37").build(); SpanExporter otlpExporter = OtlpGrpcSpanExporter.builder() @@ -57,12 +70,12 @@ public class OTelUtils { .setResource(resource) .build(); - return OpenTelemetrySdk.builder() + return new SexyContainer(OpenTelemetrySdk.builder() .setTracerProvider(sdkTracerProvider) .setMeterProvider(createLoggingMeter(resource)) .setLoggerProvider(createLoggerProvider(resource)) .setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), W3CBaggagePropagator.getInstance()))) - .buildAndRegisterGlobal(); + .buildAndRegisterGlobal(), sdkTracerProvider, batchSpanProcessor, otlpExporter); } private static SdkMeterProvider createLoggingMeter(Resource resource){ diff --git a/target/classes/client/ChunkedCompressedChecksumFileWriter.class b/target/classes/client/ChunkedCompressedChecksumFileWriter.class index 2cb0de6..12bf04b 100644 Binary files a/target/classes/client/ChunkedCompressedChecksumFileWriter.class and b/target/classes/client/ChunkedCompressedChecksumFileWriter.class differ diff --git a/target/classes/client/Client.class b/target/classes/client/Client.class index f52f057..3dc6628 100644 Binary files a/target/classes/client/Client.class and b/target/classes/client/Client.class differ diff --git a/target/classes/server/ChunkedCompressedChecksumFileReader.class b/target/classes/server/ChunkedCompressedChecksumFileReader.class index 2ef7130..70c0d61 100644 Binary files a/target/classes/server/ChunkedCompressedChecksumFileReader.class and b/target/classes/server/ChunkedCompressedChecksumFileReader.class differ diff --git a/target/classes/server/Connection.class b/target/classes/server/Connection.class index f493d39..f9a1f18 100644 Binary files a/target/classes/server/Connection.class and b/target/classes/server/Connection.class differ diff --git a/target/classes/server/Server.class b/target/classes/server/Server.class index 0ad00d6..ac33088 100644 Binary files a/target/classes/server/Server.class and b/target/classes/server/Server.class differ diff --git a/target/classes/shared/FileUtil$COMMAND.class b/target/classes/shared/FileUtil$COMMAND.class index c128413..d487116 100644 Binary files a/target/classes/shared/FileUtil$COMMAND.class and b/target/classes/shared/FileUtil$COMMAND.class differ diff --git a/target/classes/shared/FileUtil$InvalidUsageException.class b/target/classes/shared/FileUtil$InvalidUsageException.class index 30486b1..341c4a9 100644 Binary files a/target/classes/shared/FileUtil$InvalidUsageException.class and b/target/classes/shared/FileUtil$InvalidUsageException.class differ diff --git a/target/classes/shared/FileUtil.class b/target/classes/shared/FileUtil.class index 2532607..c3bc57c 100644 Binary files a/target/classes/shared/FileUtil.class and b/target/classes/shared/FileUtil.class differ diff --git a/target/classes/shared/OTelUtils$SexyContainer.class b/target/classes/shared/OTelUtils$SexyContainer.class new file mode 100644 index 0000000..f5c625e Binary files /dev/null and b/target/classes/shared/OTelUtils$SexyContainer.class differ diff --git a/target/classes/shared/OTelUtils.class b/target/classes/shared/OTelUtils.class index b9b2bab..ff5a11f 100644 Binary files a/target/classes/shared/OTelUtils.class and b/target/classes/shared/OTelUtils.class differ diff --git a/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst index c49df92..eccd196 100644 --- a/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst +++ b/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -1,12 +1,13 @@ shared/ExceptionLogger.class -shared/FileUtil$InvalidUsageException.class +shared/OTelUtils$SexyContainer.class shared/FileUtil.class -shared/OTelUtils.class -client/Client.class server/Server.class server/Connection.class -server/FileHeader.class -shared/FileUtil$COMMAND.class shared/ArrayData.class server/ChunkedCompressedChecksumFileReader.class client/ChunkedCompressedChecksumFileWriter.class +shared/FileUtil$InvalidUsageException.class +shared/OTelUtils.class +client/Client.class +server/FileHeader.class +shared/FileUtil$COMMAND.class