diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/InstrumentationUtil.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/InstrumentationUtil.java new file mode 100644 index 00000000000..4ce9622fe4c --- /dev/null +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/InstrumentationUtil.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import java.util.Objects; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class InstrumentationUtil { + private static final ContextKey SUPPRESS_INSTRUMENTATION_KEY = + ContextKey.named("suppress_internal_exporter_instrumentation"); + + private InstrumentationUtil() {} + + /** + * Adds a Context boolean key that will allow to identify HTTP calls coming from OTel exporters. + * The key later be checked by an automatic instrumentation to avoid tracing OTel exporter's + * calls. + */ + public static void suppressInstrumentation(Runnable runnable) { + Context.current().with(SUPPRESS_INSTRUMENTATION_KEY, true).wrap(runnable).run(); + } + + /** + * Checks if an automatic instrumentation should be suppressed with the provided Context. + * + * @return TRUE to suppress the automatic instrumentation, FALSE to continue with the + * instrumentation. + */ + public static boolean shouldSuppressInstrumentation(Context context) { + return Objects.equals(context.get(SUPPRESS_INSTRUMENTATION_KEY), true); + } +} diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/InstrumentationUtilTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/InstrumentationUtilTest.java new file mode 100644 index 00000000000..2c7e9f540c7 --- /dev/null +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/InstrumentationUtilTest.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.internal; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.context.Context; +import org.junit.jupiter.api.Test; + +class InstrumentationUtilTest { + @Test + void verifySuppressInstrumentation() { + // Should be false by default. + assertFalse(InstrumentationUtil.shouldSuppressInstrumentation(Context.current())); + + // Should be true inside the Runnable passed to InstrumentationUtil.suppressInstrumentation. + InstrumentationUtil.suppressInstrumentation( + () -> assertTrue(InstrumentationUtil.shouldSuppressInstrumentation(Context.current()))); + + // Should be false after the runnable finishes. + assertFalse(InstrumentationUtil.shouldSuppressInstrumentation(Context.current())); + } +} diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java index 5da750edead..9ed05b003b5 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java @@ -23,6 +23,7 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import io.opentelemetry.exporter.internal.InstrumentationUtil; import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil; import io.opentelemetry.exporter.internal.grpc.GrpcResponse; @@ -112,51 +113,53 @@ public void send(T request, Runnable onSuccess, BiConsumer + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + String description = e.getMessage(); + if (description == null) { + description = ""; + } + onError.accept(GrpcResponse.create(2 /* UNKNOWN */, description), e); + } + + @Override + public void onResponse(Call call, Response response) { + // Response body is empty but must be consumed to access trailers. + try { + response.body().bytes(); + } catch (IOException e) { + onError.accept( + GrpcResponse.create( + GrpcExporterUtil.GRPC_STATUS_UNKNOWN, + "Could not consume server response."), + e); + return; + } + + String status = grpcStatus(response); + if ("0".equals(status)) { + onSuccess.run(); + return; + } + + String errorMessage = grpcMessage(response); + int statusCode; + try { + statusCode = Integer.parseInt(status); + } catch (NumberFormatException ex) { + statusCode = GrpcExporterUtil.GRPC_STATUS_UNKNOWN; + } + onError.accept( + GrpcResponse.create(statusCode, errorMessage), + new IllegalStateException(errorMessage)); + } + })); } @Nullable diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 306c7b97163..2355a94ba60 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -5,6 +5,7 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import io.opentelemetry.exporter.internal.InstrumentationUtil; import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.http.HttpSender; @@ -101,38 +102,40 @@ public void send( requestBuilder.post(body); } - client - .newCall(requestBuilder.build()) - .enqueue( - new Callback() { - @Override - public void onFailure(Call call, IOException e) { - onError.accept(e); - } - - @Override - public void onResponse(Call call, okhttp3.Response response) { - try (ResponseBody body = response.body()) { - onResponse.accept( - new Response() { - @Override - public int statusCode() { - return response.code(); + InstrumentationUtil.suppressInstrumentation( + () -> + client + .newCall(requestBuilder.build()) + .enqueue( + new Callback() { + @Override + public void onFailure(Call call, IOException e) { + onError.accept(e); + } + + @Override + public void onResponse(Call call, okhttp3.Response response) { + try (ResponseBody body = response.body()) { + onResponse.accept( + new Response() { + @Override + public int statusCode() { + return response.code(); + } + + @Override + public String statusMessage() { + return response.message(); + } + + @Override + public byte[] responseBody() throws IOException { + return body.bytes(); + } + }); } - - @Override - public String statusMessage() { - return response.message(); - } - - @Override - public byte[] responseBody() throws IOException { - return body.bytes(); - } - }); - } - } - }); + } + })); } @Override diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java index 8aef6b7c21a..b641d1bad05 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpUtil.java @@ -18,6 +18,13 @@ * at any time. */ public final class OkHttpUtil { + @SuppressWarnings("NonFinalStaticField") + private static boolean propagateContextForTestingInDispatcher = false; + + public static void setPropagateContextForTestingInDispatcher( + boolean propagateContextForTestingInDispatcher) { + OkHttpUtil.propagateContextForTestingInDispatcher = propagateContextForTestingInDispatcher; + } /** Returns a {@link Dispatcher} using daemon threads, otherwise matching the OkHttp default. */ public static Dispatcher newDispatcher() { @@ -28,7 +35,7 @@ public static Dispatcher newDispatcher() { 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new DaemonThreadFactory("okhttp-dispatch"))); + new DaemonThreadFactory("okhttp-dispatch", propagateContextForTestingInDispatcher))); } private OkHttpUtil() {} diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/AbstractOkHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/AbstractOkHttpSuppressionTest.java new file mode 100644 index 00000000000..d1a84a7491d --- /dev/null +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/AbstractOkHttpSuppressionTest.java @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.context.Context; +import io.opentelemetry.exporter.internal.InstrumentationUtil; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +abstract class AbstractOkHttpSuppressionTest { + + @BeforeEach + void setUp() { + OkHttpUtil.setPropagateContextForTestingInDispatcher(true); + } + + @AfterEach + void tearDown() { + OkHttpUtil.setPropagateContextForTestingInDispatcher(false); + } + + @Test + void testSuppressInstrumentation() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean suppressInstrumentation = new AtomicBoolean(false); + + Runnable onSuccess = Assertions::fail; + Runnable onFailure = + () -> { + suppressInstrumentation.set( + InstrumentationUtil.shouldSuppressInstrumentation(Context.current())); + latch.countDown(); + }; + + send(getSender(), onSuccess, onFailure); + + latch.await(); + + assertTrue(suppressInstrumentation.get()); + } + + abstract void send(T sender, Runnable onSuccess, Runnable onFailure); + + private T getSender() { + return createSender("https://none"); + } + + abstract T createSender(String endpoint); +} diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java new file mode 100644 index 00000000000..df52ff0f891 --- /dev/null +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSuppressionTest.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize; +import io.opentelemetry.exporter.internal.marshal.Serializer; +import java.util.Collections; + +class OkHttpGrpcSuppressionTest + extends AbstractOkHttpSuppressionTest< + OkHttpGrpcSender> { + + @Override + void send(OkHttpGrpcSender sender, Runnable onSuccess, Runnable onFailure) { + sender.send(new DummyMarshaler(), onSuccess, (grpcResponse, throwable) -> onFailure.run()); + } + + @Override + OkHttpGrpcSender createSender(String endpoint) { + return new OkHttpGrpcSender<>( + "https://localhost", false, 10L, Collections.emptyMap(), null, null, null); + } + + protected static class DummyMarshaler extends MarshalerWithSize { + + protected DummyMarshaler() { + super(0); + } + + @Override + protected void writeTo(Serializer output) {} + } +} diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java new file mode 100644 index 00000000000..22c2c4a0dd0 --- /dev/null +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.okhttp.internal; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.function.Consumer; + +class OkHttpHttpSuppressionTest extends AbstractOkHttpSuppressionTest { + + @Override + void send(OkHttpHttpSender sender, Runnable onSuccess, Runnable onFailure) { + byte[] content = "A".getBytes(StandardCharsets.UTF_8); + Consumer outputStreamConsumer = + outputStream -> { + try { + outputStream.write(content); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + sender.send( + outputStreamConsumer, + content.length, + (response) -> onSuccess.run(), + (error) -> onFailure.run()); + } + + @Override + OkHttpHttpSender createSender(String endpoint) { + return new OkHttpHttpSender( + endpoint, false, "text/plain", 10L, Collections::emptyMap, null, null, null, null); + } +} diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java index 9e319d8f735..e8f75abe40f 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/internal/DaemonThreadFactory.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.internal; +import io.opentelemetry.context.Context; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @@ -20,14 +21,30 @@ public final class DaemonThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger counter = new AtomicInteger(); private final ThreadFactory delegate = Executors.defaultThreadFactory(); + private final boolean propagateContextForTesting; public DaemonThreadFactory(String namePrefix) { + this(namePrefix, /* propagateContextForTesting= */ false); + } + + /** + * {@link DaemonThreadFactory}'s constructor. + * + * @param namePrefix Used when setting the new thread's name. + * @param propagateContextForTesting For tests only. When enabled, the current thread's {@link + * Context} will be passed over to the new threads, this is useful for validating scenarios + * where context propagation is available through bytecode instrumentation. + */ + public DaemonThreadFactory(String namePrefix, boolean propagateContextForTesting) { this.namePrefix = namePrefix; + this.propagateContextForTesting = propagateContextForTesting; } @Override public Thread newThread(Runnable runnable) { - Thread t = delegate.newThread(runnable); + Thread t = + delegate.newThread( + propagateContextForTesting ? Context.current().wrap(runnable) : runnable); try { t.setDaemon(true); t.setName(namePrefix + "-" + counter.incrementAndGet());