Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identifying otel http calls #5918

Merged
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c43769d
Created InstrumentationUtil
LikeTheSalad Oct 16, 2023
908e04f
Suppressing automatic instrumentation for OTel exporters using OkHttp
LikeTheSalad Oct 16, 2023
f00b5b7
Adding InstrumentationUtilTest validations
LikeTheSalad Oct 16, 2023
c10cf0f
Spotless
LikeTheSalad Oct 16, 2023
f7c5ef8
Verifying suppress instrumentation key is present in okhttp calls
LikeTheSalad Oct 25, 2023
2bb40e5
Spotless
LikeTheSalad Oct 25, 2023
52b83a0
Suppressing NonFinalStaticField
LikeTheSalad Oct 25, 2023
2e2bed7
Added javadoc
LikeTheSalad Oct 26, 2023
456f71b
Renaming lock by latch
LikeTheSalad Oct 26, 2023
faa5f65
Reordering functions
LikeTheSalad Oct 26, 2023
adff295
Update exporters/common/src/main/java/io/opentelemetry/exporter/inter…
LikeTheSalad Nov 6, 2023
246ed42
Update exporters/common/src/main/java/io/opentelemetry/exporter/inter…
LikeTheSalad Nov 6, 2023
0b7601b
Update exporters/sender/okhttp/src/test/java/io/opentelemetry/exporte…
LikeTheSalad Nov 6, 2023
33b68cf
Update exporters/sender/okhttp/src/test/java/io/opentelemetry/exporte…
LikeTheSalad Nov 6, 2023
79e616b
Spotless
LikeTheSalad Nov 6, 2023
9a47e53
Renamed ContextKey to suppress_internal_exporter_instrumentation
LikeTheSalad Nov 6, 2023
b5f90ac
Renaming test classes
LikeTheSalad Nov 6, 2023
5fe0c06
Updating InstrumentationUtilTest
LikeTheSalad Nov 6, 2023
5011af8
Adding javadoc to DaemonThreadFactory's constructor
LikeTheSalad Nov 8, 2023
2dcdb09
Adding setter for OkHttpUtil.propagateContextForTestingInDispatcher
LikeTheSalad Nov 8, 2023
7ca697e
Spotless
LikeTheSalad Nov 8, 2023
7c2c165
Suppressing NonFinalStaticField warning
LikeTheSalad Nov 8, 2023
180a523
Adding DaemonThreadFactory constructor's summary javadoc to avoid "Su…
LikeTheSalad Nov 8, 2023
db23665
Fixing DaemonThreadFactory's constructor javadoc
LikeTheSalad Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import java.util.Objects;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class InstrumentationUtil {
private static final ContextKey<Boolean> SUPPRESS_INSTRUMENTATION_KEY =
ContextKey.named("suppress_internal_exporter_instrumentation");

private InstrumentationUtil() {}

/**
* Adds a Context boolean key that will allow to identify HTTP calls coming from OTel exporters.
* The key later be checked by an automatic instrumentation to avoid tracing OTel exporter's
* calls.
*/
public static void suppressInstrumentation(Runnable runnable) {
Context.current().with(SUPPRESS_INSTRUMENTATION_KEY, true).wrap(runnable).run();
}

/**
* Checks if an automatic instrumentation should be suppressed with the provided Context.
*
* @return TRUE to suppress the automatic instrumentation, FALSE to continue with the
* instrumentation.
*/
public static boolean shouldSuppressInstrumentation(Context context) {
return Objects.equals(context.get(SUPPRESS_INSTRUMENTATION_KEY), true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.internal;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.opentelemetry.context.Context;
import org.junit.jupiter.api.Test;

class InstrumentationUtilTest {
@Test
void verifySuppressInstrumentation() {
// Should be false by default.
assertFalse(InstrumentationUtil.shouldSuppressInstrumentation(Context.current()));

// Should be true inside the Runnable passed to InstrumentationUtil.suppressInstrumentation.
InstrumentationUtil.suppressInstrumentation(
() -> assertTrue(InstrumentationUtil.shouldSuppressInstrumentation(Context.current())));

// Should be false after the runnable finishes.
assertFalse(InstrumentationUtil.shouldSuppressInstrumentation(Context.current()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

package io.opentelemetry.exporter.sender.okhttp.internal;

import io.opentelemetry.exporter.internal.InstrumentationUtil;
import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcResponse;
Expand Down Expand Up @@ -112,51 +113,53 @@
RequestBody requestBody = new GrpcRequestBody(request, compressionEnabled);
requestBuilder.post(requestBody);

client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
String description = e.getMessage();
if (description == null) {
description = "";
}
onError.accept(GrpcResponse.create(2 /* UNKNOWN */, description), e);
}

@Override
public void onResponse(Call call, Response response) {
// Response body is empty but must be consumed to access trailers.
try {
response.body().bytes();
} catch (IOException e) {
onError.accept(
GrpcResponse.create(
GrpcExporterUtil.GRPC_STATUS_UNKNOWN,
"Could not consume server response."),
e);
return;
}

String status = grpcStatus(response);
if ("0".equals(status)) {
onSuccess.run();
return;
}

String errorMessage = grpcMessage(response);
int statusCode;
try {
statusCode = Integer.parseInt(status);
} catch (NumberFormatException ex) {
statusCode = GrpcExporterUtil.GRPC_STATUS_UNKNOWN;
}
onError.accept(
GrpcResponse.create(statusCode, errorMessage),
new IllegalStateException(errorMessage));
}
});
InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
String description = e.getMessage();
if (description == null) {
description = "";

Check warning on line 126 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L126

Added line #L126 was not covered by tests
}
onError.accept(GrpcResponse.create(2 /* UNKNOWN */, description), e);
}

@Override
public void onResponse(Call call, Response response) {
// Response body is empty but must be consumed to access trailers.
try {
response.body().bytes();
} catch (IOException e) {
onError.accept(
GrpcResponse.create(

Check warning on line 138 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L136-L138

Added lines #L136 - L138 were not covered by tests
GrpcExporterUtil.GRPC_STATUS_UNKNOWN,
"Could not consume server response."),
e);
return;

Check warning on line 142 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L142

Added line #L142 was not covered by tests
}

String status = grpcStatus(response);
if ("0".equals(status)) {
onSuccess.run();
return;
}

String errorMessage = grpcMessage(response);
int statusCode;
try {
statusCode = Integer.parseInt(status);
} catch (NumberFormatException ex) {
statusCode = GrpcExporterUtil.GRPC_STATUS_UNKNOWN;

Check warning on line 156 in exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java

View check run for this annotation

Codecov / codecov/patch

exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpGrpcSender.java#L155-L156

Added lines #L155 - L156 were not covered by tests
}
onError.accept(
GrpcResponse.create(statusCode, errorMessage),
new IllegalStateException(errorMessage));
}
}));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.exporter.sender.okhttp.internal;

import io.opentelemetry.exporter.internal.InstrumentationUtil;
import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.http.HttpSender;
Expand Down Expand Up @@ -101,38 +102,40 @@ public void send(
requestBuilder.post(body);
}

client
.newCall(requestBuilder.build())
.enqueue(
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, okhttp3.Response response) {
try (ResponseBody body = response.body()) {
onResponse.accept(
new Response() {
@Override
public int statusCode() {
return response.code();
InstrumentationUtil.suppressInstrumentation(
() ->
client
.newCall(requestBuilder.build())
.enqueue(
LikeTheSalad marked this conversation as resolved.
Show resolved Hide resolved
new Callback() {
@Override
public void onFailure(Call call, IOException e) {
onError.accept(e);
}

@Override
public void onResponse(Call call, okhttp3.Response response) {
try (ResponseBody body = response.body()) {
onResponse.accept(
new Response() {
@Override
public int statusCode() {
return response.code();
}

@Override
public String statusMessage() {
return response.message();
}

@Override
public byte[] responseBody() throws IOException {
return body.bytes();
}
});
}

@Override
public String statusMessage() {
return response.message();
}

@Override
public byte[] responseBody() throws IOException {
return body.bytes();
}
});
}
}
});
}
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
* at any time.
*/
public final class OkHttpUtil {
// For testing purposes
@SuppressWarnings("NonFinalStaticField")
static boolean propagateContextInDispatcher = false;
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

/** Returns a {@link Dispatcher} using daemon threads, otherwise matching the OkHttp default. */
public static Dispatcher newDispatcher() {
Expand All @@ -28,7 +31,7 @@ public static Dispatcher newDispatcher() {
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new DaemonThreadFactory("okhttp-dispatch")));
new DaemonThreadFactory("okhttp-dispatch", propagateContextInDispatcher)));
}

private OkHttpUtil() {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.junit.jupiter.api.Assertions.assertTrue;

import io.opentelemetry.context.Context;
import io.opentelemetry.exporter.internal.InstrumentationUtil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

abstract class AbstractOkHttpSuppressionTest<T> {

@BeforeEach
void setUp() {
OkHttpUtil.propagateContextInDispatcher = true;
}

@AfterEach
void tearDown() {
OkHttpUtil.propagateContextInDispatcher = false;
}

@Test
void testSuppressInstrumentation() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean suppressInstrumentation = new AtomicBoolean(false);

Runnable onSuccess = Assertions::fail;
Runnable onFailure =
() -> {
suppressInstrumentation.set(
InstrumentationUtil.shouldSuppressInstrumentation(Context.current()));
latch.countDown();
};

send(getSender(), onSuccess, onFailure);

latch.await();

assertTrue(suppressInstrumentation.get());
}

abstract void send(T sender, Runnable onSuccess, Runnable onFailure);

private T getSender() {
return createSender("https://none");
}

abstract T createSender(String endpoint);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.sender.okhttp.internal;

import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import java.util.Collections;

class OkHttpGrpcSuppressionTest
extends AbstractOkHttpSuppressionTest<
OkHttpGrpcSender<OkHttpGrpcSuppressionTest.DummyMarshaler>> {

@Override
void send(OkHttpGrpcSender<DummyMarshaler> sender, Runnable onSuccess, Runnable onFailure) {
sender.send(new DummyMarshaler(), onSuccess, (grpcResponse, throwable) -> onFailure.run());
}

@Override
OkHttpGrpcSender<DummyMarshaler> createSender(String endpoint) {
return new OkHttpGrpcSender<>(
"https://localhost", false, 10L, Collections.emptyMap(), null, null, null);
}

protected static class DummyMarshaler extends MarshalerWithSize {

protected DummyMarshaler() {
super(0);
}

@Override
protected void writeTo(Serializer output) {}
}
}
Loading