diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java index 07177d470..4020098ee 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java @@ -2,20 +2,13 @@ import com.google.protobuf.CodedOutputStream; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.util.ResourceLeakDetector; -import io.netty.util.ResourceLeakDetectorFactory; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.opensearch.migrations.testutils.CountingNettyResourceLeakDetector; import org.opensearch.migrations.testutils.TestUtilities; -import org.opensearch.migrations.testutils.TestWithNettyLeakDetection; +import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; @@ -111,7 +104,7 @@ public void testThatAPostInTinyPacketsBlocksFutureActivity(boolean usePool) thro @ParameterizedTest @ValueSource(booleans = {true, false}) - @TestWithNettyLeakDetection(repetitions = 16) + @WrapWithNettyLeakDetection(repetitions = 16) public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInTinyPacketsBlocksFutureActivity(usePool); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); @@ -119,7 +112,7 @@ public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boo @ParameterizedTest @ValueSource(booleans = {true, false}) - @TestWithNettyLeakDetection(repetitions = 32) + @WrapWithNettyLeakDetection(repetitions = 32) public void testThatAPostInASinglePacketBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInASinglePacketBlocksFutureActivity(usePool); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java b/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java index a4fb11f9b..7a1a2af47 100644 --- a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java +++ b/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java @@ -6,7 +6,6 @@ import org.junit.jupiter.api.extension.ReflectiveInvocationContext; import java.lang.reflect.Method; -import java.util.Optional; import java.util.concurrent.Callable; public class NettyLeakCheckTestExtension implements InvocationInterceptor { @@ -15,7 +14,7 @@ public NettyLeakCheckTestExtension() {} private void wrapWithLeakChecks(ExtensionContext extensionContext, Callable repeatCall, Callable finalCall) throws Throwable { int repetitions = extensionContext.getTestMethod() - .map(ec->ec.getAnnotation(TestWithNettyLeakDetection.class).repetitions()) + .map(ec->ec.getAnnotation(WrapWithNettyLeakDetection.class).repetitions()) .orElseThrow(() -> new IllegalStateException("No test method present")); CountingNettyResourceLeakDetector.activate(); diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/TestWithNettyLeakDetection.java b/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/WrapWithNettyLeakDetection.java similarity index 94% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/TestWithNettyLeakDetection.java rename to TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/WrapWithNettyLeakDetection.java index 0879f0053..94d6504a3 100644 --- a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/TestWithNettyLeakDetection.java +++ b/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/WrapWithNettyLeakDetection.java @@ -10,7 +10,7 @@ @Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @ExtendWith(NettyLeakCheckTestExtension.class) -public @interface TestWithNettyLeakDetection { +public @interface WrapWithNettyLeakDetection { /** * Some leaks might need to put a bit more stress on the GC for objects to get cleared out * and trigger potential checks within any resource finalizers to determine if there have diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonEmitter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonEmitter.java index 6dc9ee53c..4a935ed19 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonEmitter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonEmitter.java @@ -28,7 +28,7 @@ * the memory load rather than expanding the working set for a large textual stream. */ @Slf4j -public class JsonEmitter { +public class JsonEmitter implements AutoCloseable { public final static int NUM_SEGMENT_THRESHOLD = 256; @@ -61,12 +61,12 @@ public LevelContext(Iterator iterator, Runnable onPopContinuation) { } } - static class ImmediateByteBufOutputStream extends OutputStream { + static class ChunkingByteBufOutputStream extends OutputStream { private final ByteBufAllocator byteBufAllocator; @Getter CompositeByteBuf compositeByteBuf; - public ImmediateByteBufOutputStream(ByteBufAllocator byteBufAllocator) { + public ChunkingByteBufOutputStream(ByteBufAllocator byteBufAllocator) { this.byteBufAllocator = byteBufAllocator; compositeByteBuf = byteBufAllocator.compositeBuffer(); } @@ -88,9 +88,13 @@ private void write(ByteBuf byteBuf) { } @Override - public void close() throws IOException { + public void close() { compositeByteBuf.release(); - super.close(); + try { + super.close(); + } catch (IOException e) { + throw new RuntimeException("Expected OutputStream::close() to be empty as per docs in Java 11"); + } } /** @@ -106,18 +110,23 @@ public ByteBuf recycleByteBufRetained() { } private JsonGenerator jsonGenerator; - private ImmediateByteBufOutputStream outputStream; + private ChunkingByteBufOutputStream outputStream; private ObjectMapper objectMapper; private Stack cursorStack; @SneakyThrows public JsonEmitter(ByteBufAllocator byteBufAllocator) { - outputStream = new ImmediateByteBufOutputStream(byteBufAllocator); + outputStream = new ChunkingByteBufOutputStream(byteBufAllocator); jsonGenerator = new JsonFactory().createGenerator(outputStream, JsonEncoding.UTF8); objectMapper = new ObjectMapper(); cursorStack = new Stack<>(); } + @Override + public void close() { + outputStream.close(); + } + /** * This returns a ByteBuf block of serialized data plus a recursive generation function (Supplier) * for the next block and continuation. The size of the ByteBuf that will be returned can be diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java index 9f700e265..7c33fdacf 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java @@ -4,18 +4,18 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.LastHttpContent; +import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.replay.datahandlers.JsonEmitter; import org.opensearch.migrations.replay.datahandlers.PayloadAccessFaultingMap; import java.io.IOException; import java.util.Map; +@Slf4j public class NettyJsonBodySerializeHandler extends ChannelInboundHandlerAdapter { public static final int NUM_BYTES_TO_ACCUMULATE_BEFORE_FIRING = 1024; - final JsonEmitter jsonEmitter; - public NettyJsonBodySerializeHandler(ChannelHandlerContext ctx) { - this.jsonEmitter = new JsonEmitter(ctx.alloc()); + public NettyJsonBodySerializeHandler() { } @Override @@ -34,16 +34,17 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } - private void serializePayload(ChannelHandlerContext ctx, Map payload) throws IOException { - var pac = jsonEmitter.getChunkAndContinuations(payload, NUM_BYTES_TO_ACCUMULATE_BEFORE_FIRING); - while (true) { - ctx.fireChannelRead(new DefaultHttpContent(pac.partialSerializedContents)); - pac.partialSerializedContents.release(); - if (pac.nextSupplier == null) { - ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); - break; + private void serializePayload(ChannelHandlerContext ctx, Map payload) throws IOException { + try (var jsonEmitter = new JsonEmitter(ctx.alloc())) { + var pac = jsonEmitter.getChunkAndContinuations(payload, NUM_BYTES_TO_ACCUMULATE_BEFORE_FIRING); + while (true) { + ctx.fireChannelRead(new DefaultHttpContent(pac.partialSerializedContents)); + if (pac.nextSupplier == null) { + ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); + break; + } + pac = pac.nextSupplier.get(); } - pac = pac.nextSupplier.get(); } } -} +} \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/RequestPipelineOrchestrator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/RequestPipelineOrchestrator.java index f3ad67f00..fbc32a138 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/RequestPipelineOrchestrator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/RequestPipelineOrchestrator.java @@ -123,7 +123,7 @@ void addContentParsingHandlers(ChannelHandlerContext ctx, pipeline.addLast(new NettyJsonBodyConvertHandler(transformer)); // IN: Netty HttpRequest(2) + HttpJsonMessage(3) with headers AND payload // OUT: Netty HttpRequest(2) + HttpJsonMessage(3) with headers only + HttpContent(3) blocks - pipeline.addLast(new NettyJsonBodySerializeHandler(ctx)); + pipeline.addLast(new NettyJsonBodySerializeHandler()); addLoggingHandler(pipeline, "F"); } if (authTransfomer != null) { diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PrettyPrinterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PrettyPrinterTest.java index 072ca536b..586642559 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PrettyPrinterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PrettyPrinterTest.java @@ -7,7 +7,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.opensearch.migrations.testutils.CountingNettyResourceLeakDetector; import org.opensearch.migrations.testutils.TestUtilities; -import org.opensearch.migrations.testutils.TestWithNettyLeakDetection; +import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -16,7 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -@TestWithNettyLeakDetection +@WrapWithNettyLeakDetection public class PrettyPrinterTest { @BeforeAll @@ -64,7 +64,7 @@ private static Stream makeCombos() { @ParameterizedTest @MethodSource("makeCombos") - @TestWithNettyLeakDetection(repetitions = 4) + @WrapWithNettyLeakDetection(repetitions = 4) public void httpPacketBufsToString(PrettyPrinter.PacketPrintFormat format, BufferType bufferType) { byte[] fullTrafficBytes = SAMPLE_REQUEST_STRING.getBytes(StandardCharsets.UTF_8); var byteArrays = new ArrayList(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java index b81e6dc2b..44c5482e7 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java @@ -18,7 +18,7 @@ public class JsonAccumulatorTest { public static final String LARGE_PACKED = "largeAndPacked"; GenerateRandomNestedJsonObject randomJsonGenerator = new GenerateRandomNestedJsonObject(); - private static Object readJson(byte[] testFileBytes, int chunkBound) throws IOException { + static Object readJson(byte[] testFileBytes, int chunkBound) throws IOException { var jsonParser = new JsonAccumulator(); Random r = new Random(2); var entireByteBuffer = ByteBuffer.wrap(testFileBytes); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonEmitterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonEmitterTest.java index 8475c8f30..a1f477dde 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonEmitterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonEmitterTest.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import java.io.IOException; import java.io.StringReader; @@ -12,31 +13,31 @@ import java.nio.charset.StandardCharsets; @Slf4j -class JsonEmitterTest { +public class JsonEmitterTest { @Test + @WrapWithNettyLeakDetection(repetitions = 2) public void testEmitterWorksRoundTrip() throws IOException { - JsonEmitter jse = new JsonEmitter(ByteBufAllocator.DEFAULT); - var mapper = new ObjectMapper(); + try (JsonEmitter jse = new JsonEmitter(ByteBufAllocator.DEFAULT)) { + var mapper = new ObjectMapper(); - //var originalTree = mapper.readTree(new File("bigfile.json")); - var originalTree = mapper.readTree(new StringReader("{\"index\":{\"_id\":\"1\"}}")); - var writer = new StringWriter(); - var pac = jse.getChunkAndContinuations(originalTree, 10*1024); - while (true) { - var asBytes = new byte[pac.partialSerializedContents.readableBytes()]; - log.info("Got "+asBytes.length+" bytes back"); - pac.partialSerializedContents.readBytes(asBytes); - var nextChunkStr = new String(asBytes, StandardCharsets.UTF_8); - writer.append(nextChunkStr); - if (pac.nextSupplier == null) { - break; + var originalTree = mapper.readTree(new StringReader("{\"index\":{\"_id\":\"1\"}}")); + var writer = new StringWriter(); + var pac = jse.getChunkAndContinuations(originalTree, 10 * 1024); + while (true) { + var chunk = pac.partialSerializedContents.toString(StandardCharsets.UTF_8); + pac.partialSerializedContents.release(); + log.info("Got: " + chunk); + writer.append(chunk); + if (pac.nextSupplier == null) { + break; + } + pac = pac.nextSupplier.get(); } - pac = pac.nextSupplier.get(); + writer.flush(); + var streamedToStringRoundTripped = + mapper.writeValueAsString(mapper.readTree(new StringReader(writer.toString()))); + var originalRoundTripped = mapper.writeValueAsString(originalTree); + Assertions.assertEquals(originalRoundTripped, streamedToStringRoundTripped); } - writer.flush(); - var streamedToStringRoundTripped = - mapper.writeValueAsString(mapper.readTree(new StringReader(writer.toString()))); - var originalRoundTripped = mapper.writeValueAsString(originalTree); - Assertions.assertEquals(originalRoundTripped, streamedToStringRoundTripped); } } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandlerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandlerTest.java new file mode 100644 index 000000000..261ccdae1 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandlerTest.java @@ -0,0 +1,58 @@ +package org.opensearch.migrations.replay.datahandlers.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.HttpContent; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.migrations.replay.GenerateRandomNestedJsonObject; +import org.opensearch.migrations.replay.ReplayUtils; +import org.opensearch.migrations.replay.datahandlers.PayloadAccessFaultingMap; +import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Random; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; + +@Slf4j +public class NettyJsonBodySerializeHandlerTest { + @Test + @WrapWithNettyLeakDetection + public void testJsonSerializerHandler() throws Exception { + var randomJsonGenerator = new GenerateRandomNestedJsonObject(); + var randomJson = randomJsonGenerator.makeRandomJsonObject(new Random(2), 2, 1); + var headers = new StrictCaseInsensitiveHttpHeadersMap(); + headers.put(IHttpMessage.CONTENT_TYPE, List.of(IHttpMessage.APPLICATION_JSON)); + var fullHttpMessageWithJsonBody = new HttpJsonMessageWithFaultingPayload(headers); + fullHttpMessageWithJsonBody.setPayloadFaultMap(new PayloadAccessFaultingMap(headers)); + fullHttpMessageWithJsonBody.payload().put(PayloadAccessFaultingMap.INLINED_JSON_BODY_DOCUMENT_KEY, randomJson); + + var channel = new EmbeddedChannel(new NettyJsonBodySerializeHandler()); + channel.writeInbound(fullHttpMessageWithJsonBody); + + var handlerAccumulatedStream = + ReplayUtils.byteBufsToInputStream(getByteBufStreamFromChannel(channel)); + + String originalTreeStr = new ObjectMapper().writeValueAsString(randomJson); + var reconstitutedTreeStr = new String(handlerAccumulatedStream.readAllBytes(),StandardCharsets.UTF_8); + Assertions.assertEquals(originalTreeStr, reconstitutedTreeStr); + + getByteBufStreamFromChannel(channel).forEach(bb->bb.release()); + } + + private static Stream getByteBufStreamFromChannel(EmbeddedChannel channel) { + return channel.inboundMessages().stream() + .filter(x -> x instanceof HttpContent) + .map(x -> { + var rval = ((HttpContent) x).content(); + log.info("refCnt=" + rval.refCnt() + " for " + x); + return rval; + }); + } +} \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java index 6765a7dc0..dc3f9921d 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonToByteBufHandlerTest.java @@ -28,6 +28,7 @@ class NettyJsonToByteBufHandlerTest { public static final int ORIGINAL_NUMBER_OF_CHUNKS = 4; public static final int ADDITIONAL_CHUNKS = 2; + // TODO - replace this with just an operation on the EmbeddedChannel.inboundMessage static class ValidationHandler extends ChannelInboundHandlerAdapter { final List chunkSizesReceivedList; int totalBytesRead;