From 09a052853ec2fd1975bf51cca6da9400153ed748 Mon Sep 17 00:00:00 2001 From: Jeremy Michael <60355474+jmsusanto@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:21:56 -0500 Subject: [PATCH] otel trace source and otel metrics source test coverage (#5242) Added security/authentication tests for OtelTraceSource and OTelMetricsSource authentication Signed-off-by: Jeremy Michael --- .../otelmetrics/OTelMetricsSourceTest.java | 115 ++++++++++++++++++ .../source/oteltrace/OTelTraceSourceTest.java | 105 ++++++++++++++++ 2 files changed, 220 insertions(+) diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index 9972a81de8..f3342a612e 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -13,6 +13,7 @@ import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.ClosedSessionException; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; @@ -35,6 +36,7 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.common.v1.InstrumentationLibrary; @@ -96,6 +98,7 @@ import java.util.Map; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -103,6 +106,7 @@ import java.util.stream.Stream; import java.util.zip.GZIPOutputStream; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -347,6 +351,7 @@ void testHttpFullJsonWithCustomPathAndUnframedRequests() throws InvalidProtocolB .join(); } + @Test void testHttpFullJsonWithCustomPathAndAuthHeader_with_successful_response() throws InvalidProtocolBufferException { when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME); @@ -411,6 +416,116 @@ void testHttpFullJsonWithCustomPathAndAuthHeader_with_unsuccessful_response() th .join(); } + @Test + void testHttpRequestWithInvalidCredentials_with_unsuccessful_response() throws InvalidProtocolBufferException { + when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME); + when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD); + final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig); + + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(grpcAuthenticationProvider); + when(oTelMetricsSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic", + Map.of( + "username", USERNAME, + "password", PASSWORD + ))); + when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true); + when(oTelMetricsSourceConfig.getPath()).thenReturn(TEST_PATH); + + configureObjectUnderTest(); + SOURCE.start(buffer); + + final String invalidUsername = "wrong_user"; + final String invalidPassword = "wrong_password"; + final String invalidCredentials = Base64.getEncoder() + .encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8)); + + final String transformedPath = "/" + TEST_PIPELINE_NAME + "/v1/metrics"; + + WebClient.of().prepare() + .post("http://127.0.0.1:21891" + transformedPath) + .content(MediaType.JSON_UTF_8, JsonFormat.printer().print(createExportMetricsRequest()).getBytes()) + .header("Authorization", "Basic " + invalidCredentials) + .execute() + .aggregate() + .whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.UNAUTHORIZED, throwable)) + .join(); + } + + @Test + void testGrpcRequestWithInvalidCredentials_with_unsuccessful_response() throws Exception { + when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME); + when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD); + final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig); + + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(grpcAuthenticationProvider); + when(oTelMetricsSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic", + Map.of( + "username", USERNAME, + "password", PASSWORD + ))); + configureObjectUnderTest(); + SOURCE.start(buffer); + + final String invalidUsername = "wrong_user"; + final String invalidPassword = "wrong_password"; + final String invalidCredentials = Base64.getEncoder() + .encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8)); + + final MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .addHeader("Authorization", "Basic " + invalidCredentials) + .build(MetricsServiceGrpc.MetricsServiceBlockingStub.class); + + final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest())); + + assertThat(actualException.getStatus(), notNullValue()); + assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNAUTHENTICATED)); + } + + @Test + void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferException { + when(oTelMetricsSourceConfig.isSsl()).thenReturn(true); + when(oTelMetricsSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt"); + when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key"); + configureObjectUnderTest(); + SOURCE.start(buffer); + + WebClient client = WebClient.builder("http://127.0.0.1:21891") + .build(); + + CompletionException exception = assertThrows(CompletionException.class, () -> client.execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:21891") + .method(HttpMethod.POST) + .path("/opentelemetry.proto.collector.metrics.v1.MetricsService/Export") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.copyOf(JsonFormat.printer().print(createExportMetricsRequest()).getBytes())) + .aggregate() + .join()); + + assertThat(exception.getCause(), instanceOf(ClosedSessionException.class)); + } + + @Test + void testGrpcFailsIfSslIsEnabledAndNoTls() { + when(oTelMetricsSourceConfig.isSsl()).thenReturn(true); + when(oTelMetricsSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt"); + when(oTelMetricsSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key"); + configureObjectUnderTest(); + SOURCE.start(buffer); + + MetricsServiceGrpc.MetricsServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(MetricsServiceGrpc.MetricsServiceBlockingStub.class); + + StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportMetricsRequest())); + + assertThat(actualException.getStatus(), notNullValue()); + assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNKNOWN)); + } + + @Test void testServerStartCertFileSuccess() throws IOException { try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) { diff --git a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java index f52f2379dd..03418a9735 100644 --- a/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java +++ b/data-prepper-plugins/otel-trace-source/src/test/java/org/opensearch/dataprepper/plugins/source/oteltrace/OTelTraceSourceTest.java @@ -14,6 +14,7 @@ import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.ClosedSessionException; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; @@ -93,6 +94,7 @@ import java.util.StringJoiner; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -100,6 +102,7 @@ import java.util.stream.Stream; import java.util.zip.GZIPOutputStream; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -462,6 +465,108 @@ void testHttpFullJsonWithCustomPathAndAuthHeader_with_unsuccessful_response() th .join(); } + @Test + void testHttpRequestWithInvalidCredentialsShouldReturnUnauthorized() throws InvalidProtocolBufferException { + when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME); + when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD); + final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig); + + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(grpcAuthenticationProvider); + when(oTelTraceSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic", + Map.of( + "username", USERNAME, + "password", PASSWORD + ))); + when(oTelTraceSourceConfig.enableUnframedRequests()).thenReturn(true); + when(oTelTraceSourceConfig.getPath()).thenReturn(TEST_PATH); + configureObjectUnderTest(); + SOURCE.start(buffer); + + final String transformedPath = "/" + TEST_PIPELINE_NAME + "/v1/traces"; + + final String invalidUsername = "wrong_user"; + final String invalidPassword = "wrong_password"; + final String invalidCredentials = Base64.getEncoder() + .encodeToString(String.format("%s:%s", invalidUsername, invalidPassword).getBytes(StandardCharsets.UTF_8)); + + WebClient.of().prepare() + .post("http://127.0.0.1:21890" + transformedPath) + .content(MediaType.JSON_UTF_8, JsonFormat.printer().print(createExportTraceRequest()).getBytes()) + .header("Authorization", "Basic " + invalidCredentials) + .execute() + .aggregate() + .whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.UNAUTHORIZED, throwable)) + .join(); + } + + @Test + void testGrpcRequestWithoutAuthentication_with_unsuccessful_response() throws Exception { + when(httpBasicAuthenticationConfig.getUsername()).thenReturn(USERNAME); + when(httpBasicAuthenticationConfig.getPassword()).thenReturn(PASSWORD); + final GrpcAuthenticationProvider grpcAuthenticationProvider = new GrpcBasicAuthenticationProvider(httpBasicAuthenticationConfig); + + when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))) + .thenReturn(grpcAuthenticationProvider); + when(oTelTraceSourceConfig.getAuthentication()).thenReturn(new PluginModel("http_basic", + Map.of( + "username", USERNAME, + "password", PASSWORD + ))); + configureObjectUnderTest(); + SOURCE.start(buffer); + + final TraceServiceGrpc.TraceServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(TraceServiceGrpc.TraceServiceBlockingStub.class); + + final StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportTraceRequest())); + + assertThat(actualException.getStatus(), notNullValue()); + assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNAUTHENTICATED)); + } + + @Test + void testHttpWithoutSslFailsWhenSslIsEnabled() throws InvalidProtocolBufferException { + when(oTelTraceSourceConfig.isSsl()).thenReturn(true); + when(oTelTraceSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt"); + when(oTelTraceSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key"); + configureObjectUnderTest(); + SOURCE.start(buffer); + + WebClient client = WebClient.builder("http://127.0.0.1:21890") + .build(); + + CompletionException exception = assertThrows(CompletionException.class, () -> client.execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTP) + .authority("127.0.0.1:21890") + .method(HttpMethod.POST) + .path("/opentelemetry.proto.collector.trace.v1.TraceService/Export") + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.copyOf(JsonFormat.printer().print(createExportTraceRequest()).getBytes())) + .aggregate() + .join()); + + assertThat(exception.getCause(), instanceOf(ClosedSessionException.class)); + } + + @Test + void testGrpcFailsIfSslIsEnabledAndNoTls() { + when(oTelTraceSourceConfig.isSsl()).thenReturn(true); + when(oTelTraceSourceConfig.getSslKeyCertChainFile()).thenReturn("data/certificate/test_cert.crt"); + when(oTelTraceSourceConfig.getSslKeyFile()).thenReturn("data/certificate/test_decrypted_key.key"); + configureObjectUnderTest(); + SOURCE.start(buffer); + + TraceServiceGrpc.TraceServiceBlockingStub client = Clients.builder(GRPC_ENDPOINT) + .build(TraceServiceGrpc.TraceServiceBlockingStub.class); + + StatusRuntimeException actualException = assertThrows(StatusRuntimeException.class, () -> client.export(createExportTraceRequest())); + + assertThat(actualException.getStatus(), notNullValue()); + assertThat(actualException.getStatus().getCode(), equalTo(Status.Code.UNKNOWN)); + } + @Test void testServerStartCertFileSuccess() throws IOException { try (MockedStatic armeriaServerMock = Mockito.mockStatic(Server.class)) {