diff --git a/RFS/src/main/java/com/rfs/common/RestClient.java b/RFS/src/main/java/com/rfs/common/RestClient.java index 24c3a4023..dbc476a45 100644 --- a/RFS/src/main/java/com/rfs/common/RestClient.java +++ b/RFS/src/main/java/com/rfs/common/RestClient.java @@ -46,7 +46,7 @@ public RestClient(ConnectionDetails connectionDetails) { public Mono getAsync(String path, IRfsContexts.IRequestContext context) { return client - .doOnRequest(sizeMetricsHandler(context)) + .doOnRequest(addSizeMetricsHandlers(context)) .get() .uri("/" + path) .responseSingle((response, bytes) -> bytes.asString() @@ -61,7 +61,7 @@ public Response get(String path, IRfsContexts.IRequestContext context) { public Mono postAsync(String path, String body, IRfsContexts.IRequestContext context) { return client - .doOnRequest(sizeMetricsHandler(context)) + .doOnRequest(addSizeMetricsHandlers(context)) .post() .uri("/" + path) .send(ByteBufMono.fromString(Mono.just(body))) @@ -73,7 +73,7 @@ public Mono postAsync(String path, String body, IRfsContexts.IRequestC public Mono putAsync(String path, String body, IRfsContexts.IRequestContext context) { return client - .doOnRequest(sizeMetricsHandler(context)) + .doOnRequest(addSizeMetricsHandlers(context)) .put() .uri("/" + path) .send(ByteBufMono.fromString(Mono.just(body))) @@ -87,10 +87,10 @@ public Response put(String path, String body, IRfsContexts.IRequestContext conte return putAsync(path, body, context).block(); } - private BiConsumer sizeMetricsHandler(final IRfsContexts.IRequestContext context) { + private BiConsumer addSizeMetricsHandlers(final IRfsContexts.IRequestContext ctx) { return (r, conn) -> { - conn.channel().pipeline().addFirst(new WriteMeteringHandler(context::addBytesSent)); - conn.channel().pipeline().addFirst(new ReadMeteringHandler(context::addBytesRead)); + conn.channel().pipeline().addFirst(new WriteMeteringHandler(ctx::addBytesSent)); + conn.channel().pipeline().addFirst(new ReadMeteringHandler(ctx::addBytesRead)); }; } } \ No newline at end of file diff --git a/RFS/src/test/java/com/rfs/common/RestClientTest.java b/RFS/src/test/java/com/rfs/common/RestClientTest.java index c55211dfd..2f4873434 100644 --- a/RFS/src/test/java/com/rfs/common/RestClientTest.java +++ b/RFS/src/test/java/com/rfs/common/RestClientTest.java @@ -3,28 +3,20 @@ import com.rfs.tracing.RfsContexts; import com.rfs.tracing.TestContext; -import io.opentelemetry.api.trace.Span; import io.opentelemetry.sdk.trace.data.SpanData; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Assertions; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; import org.opensearch.migrations.testutils.HttpRequestFirstLine; import org.opensearch.migrations.testutils.SimpleHttpResponse; import org.opensearch.migrations.testutils.SimpleNettyHttpServer; import java.nio.charset.StandardCharsets; -import java.time.Instant; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static java.util.stream.Collectors.toList; - class RestClientTest { @Test public void testGetEmitsInstrumentation() throws Exception{ @@ -57,16 +49,18 @@ public void testGetEmitsInstrumentation() throws Exception{ .filter(pd -> pd.getAttributes().asMap().values().stream().map(o -> (String) o).collect(Collectors.joining()) .equals(kvp.getKey())) .reduce((a, b) -> b).get().getValue(); - assertThat("Checking bytes {send, read} for context '" + kvp.getKey() + "'", new long[]{bytesSent, bytesRead}, equalTo(kvp.getValue())); + MatcherAssert.assertThat("Checking bytes {send, read} for context '" + kvp.getKey() + "'", + new long[]{bytesSent, bytesRead}, Matchers.equalTo(kvp.getValue())); } final var finishedSpans = rootContext.instrumentationBundle.getFinishedSpans(); - final var finishedSpanNames = finishedSpans.stream().map(SpanData::getName).collect(toList()); - assertThat(finishedSpanNames, containsInAnyOrder("httpRequest", "httpRequest", "createSnapshot")); + final var finishedSpanNames = finishedSpans.stream().map(SpanData::getName).collect(Collectors.toList()); + MatcherAssert.assertThat(finishedSpanNames, + Matchers.containsInAnyOrder("httpRequest", "httpRequest", "createSnapshot")); final var httpRequestSpansByTime = finishedSpans.stream() .filter(sd -> sd.getName().equals("httpRequest")) - .sorted(Comparator.comparing(SpanData::getEndEpochNanos)).collect(toList()); + .sorted(Comparator.comparing(SpanData::getEndEpochNanos)).collect(Collectors.toList()); int i = 0; for (var expectedBytes : List.of( new long[]{139,66}, @@ -74,7 +68,8 @@ public void testGetEmitsInstrumentation() throws Exception{ var span = httpRequestSpansByTime.get(i++); long bytesSent = span.getAttributes().get(RfsContexts.GenericRequestContext.BYTES_SENT_ATTR); long bytesRead = span.getAttributes().get(RfsContexts.GenericRequestContext.BYTES_READ_ATTR); - assertThat("Checking bytes {send, read} for httpRequest " + i, new long[]{bytesSent, bytesRead}, equalTo(expectedBytes)); + MatcherAssert.assertThat("Checking bytes {send, read} for httpRequest " + i, + new long[]{bytesSent, bytesRead}, Matchers.equalTo(expectedBytes)); } }