fuck your cum assholes
parent
2aea192c1a
commit
4f7be92b03
|
@ -2,16 +2,13 @@ package server;
|
|||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import net.jpountz.xxhash.StreamingXXHash64;
|
||||
import shared.ArrayData;
|
||||
import shared.FileUtil;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ChunkedCompressedChecksumFileReader {
|
||||
|
||||
|
@ -20,6 +17,14 @@ public class ChunkedCompressedChecksumFileReader {
|
|||
private final DataOutputStream fileOutputWriter;
|
||||
private final long seed;
|
||||
|
||||
private Span currentSpan = null;
|
||||
private Scope currentScope = null;
|
||||
private long count = 0;
|
||||
private static final long MAX_COUNT = 50;
|
||||
|
||||
private long uncompressed_bytes = 0;
|
||||
private long compressed_bytes = 0;
|
||||
|
||||
public ChunkedCompressedChecksumFileReader(DataInputStream networkStreamReader, String fileOutputPath, long seed) throws IOException {
|
||||
this.networkStreamReader = networkStreamReader;
|
||||
this.streamHash = FileUtil.XX_HASH_FACTORY.newStreamingHash64(seed);
|
||||
|
@ -28,26 +33,36 @@ public class ChunkedCompressedChecksumFileReader {
|
|||
}
|
||||
|
||||
public FileHeader readChunk(Tracer trace, Span sp) throws IOException {
|
||||
Span gf = trace.spanBuilder("Chunk Read").startSpan();
|
||||
FileHeader header = readHeader();
|
||||
gf.setAttribute("Read Uncompressed", header.getUncompressed());
|
||||
gf.setAttribute("Read Compressed", header.getCompressed());
|
||||
gf.setAttribute("Read Hash", header.getHash());
|
||||
try (Scope scope = gf.makeCurrent()) {
|
||||
if (header.getUncompressed() == 0)
|
||||
return header;
|
||||
gf.addEvent("Read Data");
|
||||
byte[] data = readSome(header);
|
||||
gf.addEvent("Decompress Data");
|
||||
byte[] decompressed = decompress(header, data);
|
||||
gf.addEvent("Hash");
|
||||
hash(header, decompressed);
|
||||
gf.addEvent("Write");
|
||||
fileOutputWriter.write(decompressed, 0, decompressed.length);
|
||||
gf.addEvent("End");
|
||||
} finally {
|
||||
gf.end();
|
||||
if (++count >= MAX_COUNT) {
|
||||
currentSpan.addEvent("--{End Read}--");
|
||||
currentScope.close();
|
||||
currentSpan.end();
|
||||
currentSpan = null;
|
||||
currentScope = null;
|
||||
}
|
||||
if (currentSpan == null) {
|
||||
count = 0;
|
||||
currentSpan = trace.spanBuilder("Chunk Read").startSpan();
|
||||
currentScope = currentSpan.makeCurrent();
|
||||
}
|
||||
FileHeader header = readHeader();
|
||||
uncompressed_bytes += header.getUncompressed();
|
||||
compressed_bytes += header.getCompressed();
|
||||
currentSpan.addEvent("--{Begin Read}--");
|
||||
currentSpan.addEvent("Attribute: Read Uncompressed = " + header.getUncompressed());
|
||||
currentSpan.addEvent("Attribute: Read Compressed = " + header.getCompressed());
|
||||
currentSpan.addEvent("Attribute: Compression Ratio = " + ((double)header.getUncompressed() / header.getCompressed()));
|
||||
currentSpan.addEvent("Attribute: Read Hash = " + header.getHash());
|
||||
if (header.getUncompressed() == 0)
|
||||
return header;
|
||||
currentSpan.addEvent("Read Data");
|
||||
byte[] data = readSome(header);
|
||||
currentSpan.addEvent("Decompress Data");
|
||||
byte[] decompressed = decompress(header, data);
|
||||
currentSpan.addEvent("Hash");
|
||||
hash(header, decompressed);
|
||||
currentSpan.addEvent("Write");
|
||||
fileOutputWriter.write(decompressed, 0, decompressed.length);
|
||||
return header;
|
||||
}
|
||||
|
||||
|
@ -57,6 +72,21 @@ public class ChunkedCompressedChecksumFileReader {
|
|||
throw new RuntimeException("Stream total hash doesn't match the client's sent hash!");
|
||||
fileOutputWriter.flush();
|
||||
fileOutputWriter.close();
|
||||
currentSpan.addEvent("--{End Read}--");
|
||||
currentScope.close();
|
||||
currentSpan.end();
|
||||
}
|
||||
|
||||
public long getCompressedBytes(){
|
||||
return compressed_bytes;
|
||||
}
|
||||
|
||||
public long getUncompressedBytes(){
|
||||
return uncompressed_bytes;
|
||||
}
|
||||
|
||||
public double getRatio(){
|
||||
return (double) uncompressed_bytes / (double) compressed_bytes;
|
||||
}
|
||||
|
||||
private FileHeader readHeader() throws IOException {
|
||||
|
|
|
@ -51,19 +51,20 @@ public class Connection implements Runnable {
|
|||
}
|
||||
try {
|
||||
if (in.available() > 0) {
|
||||
fileSend.addEvent("File Received");
|
||||
Span fileIn = trace.spanBuilder("File Received").setAttribute("Files Received", filesReceived).startSpan();
|
||||
try (Scope s = fileIn.makeCurrent()){
|
||||
byte command = in.readByte();
|
||||
byte command = in.readByte();
|
||||
|
||||
if (command == FileUtil.COMMAND.CLOSE.type) {
|
||||
System.out.println("Client sent disconnect signal!");
|
||||
break;
|
||||
}
|
||||
if (command == FileUtil.COMMAND.WRITE.type)
|
||||
if (command == FileUtil.COMMAND.CLOSE.type) {
|
||||
System.out.println("Client sent disconnect signal!");
|
||||
break;
|
||||
}
|
||||
if (command == FileUtil.COMMAND.WRITE.type) {
|
||||
fileSend.addEvent("File Received");
|
||||
Span fileIn = trace.spanBuilder("File Received").setAttribute("Files Received", filesReceived).startSpan();
|
||||
try (Scope s = fileIn.makeCurrent()) {
|
||||
FileUtil.receive(in, trace, fileIn);
|
||||
} finally {
|
||||
fileIn.end();
|
||||
} finally {
|
||||
fileIn.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -24,7 +24,7 @@ public class Server {
|
|||
|
||||
public static volatile boolean running = true;
|
||||
|
||||
private static final OpenTelemetry ot = OTelUtils.create();
|
||||
private static final OpenTelemetry ot = OTelUtils.create("CumServer");
|
||||
|
||||
public Server() {
|
||||
Tracer main = ot.getTracer("Main Server", "0.69");
|
||||
|
|
|
@ -19,7 +19,7 @@ import java.util.concurrent.TimeUnit;
|
|||
public class FileUtil {
|
||||
|
||||
// do not change it breaks stuff
|
||||
protected static final int READER_SIZE = 131072;
|
||||
protected static final int READER_SIZE = 65000;
|
||||
public static final long SEED = 691;
|
||||
|
||||
private static final LZ4Factory LZ_FACTORY = LZ4Factory.fastestInstance();
|
||||
|
@ -77,6 +77,9 @@ public class FileUtil {
|
|||
break;
|
||||
}
|
||||
}
|
||||
sp.setAttribute("Data Read Uncompressed Bytes", reader.getUncompressedBytes());
|
||||
sp.setAttribute("Data Read Compressed Bytes", reader.getCompressedBytes());
|
||||
sp.setAttribute("Data Compression Ratio", reader.getRatio());
|
||||
reader.close();
|
||||
System.out.println("Writing " + path + " complete");
|
||||
sp.addEvent("File Written");
|
||||
|
|
|
@ -39,8 +39,8 @@ public class OTelUtils {
|
|||
.buildAndRegisterGlobal();
|
||||
}
|
||||
|
||||
public static OpenTelemetry create(){
|
||||
Resource resource = Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME.getKey(), "cum").put(ResourceAttributes.SERVICE_VERSION.getKey(), "0.1.0").build();
|
||||
public static OpenTelemetry create(String name){
|
||||
Resource resource = Resource.getDefault().toBuilder().put(ResourceAttributes.SERVICE_NAME.getKey(), name).put(ResourceAttributes.SERVICE_VERSION.getKey(), "1.3.37").build();
|
||||
|
||||
SpanExporter otlpExporter = OtlpGrpcSpanExporter.builder()
|
||||
.setEndpoint("http://sc.on.underlying.skynet.tpgc.me:4317")
|
||||
|
@ -49,7 +49,7 @@ public class OTelUtils {
|
|||
|
||||
BatchSpanProcessor batchSpanProcessor = BatchSpanProcessor.builder(otlpExporter)
|
||||
.setMaxQueueSize(2048)
|
||||
.setMaxExportBatchSize(512) // Example max export batch size
|
||||
.setMaxExportBatchSize(512)
|
||||
.build();
|
||||
|
||||
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue