Skip to content

Commit

Permalink
Propagate context into async http client CompletableFuture callbacks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Jan 15, 2025
1 parent 44bea8d commit 93dd4c8
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new AsyncHttpClientInstrumentation(),
new AsyncCompletionHandlerInstrumentation(),
new NettyRequestSenderInstrumentation());
new NettyRequestSenderInstrumentation(),
new NettyResponseFutureInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.asynchttpclient.v2_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.concurrent.CompletableFuture;

public final class CompletableFutureWrapper {

private CompletableFutureWrapper() {}

public static <T> CompletableFuture<T> wrap(CompletableFuture<T> future, Context context) {
CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(T value, Throwable throwable) -> {
try (Scope ignored = context.makeCurrent()) {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(value);
}
}
});

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.asynchttpclient.v2_0;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;

import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class NettyResponseFutureInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.asynchttpclient.netty.NettyResponseFuture");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("toCompletableFuture").and(takesNoArguments()).and(returns(CompletableFuture.class)),
this.getClass().getName() + "$WrapFutureAdvice");
}

@SuppressWarnings("unused")
public static class WrapFutureAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) CompletableFuture<?> result) {
result = CompletableFutureWrapper.wrap(result, Java8BytecodeBridge.currentContext());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.asynchttpclient.v2_0;

import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import java.net.URI;
import java.util.Map;
import org.asynchttpclient.Request;

class AsyncHttpClientCompletableFutureTest extends AsyncHttpClientTest {

@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
super.configure(optionsBuilder);

optionsBuilder.setHasSendRequest(false);
}

@Override
public int sendRequest(Request request, String method, URI uri, Map<String, String> headers) {
throw new IllegalStateException("this test only tests requests with callback");
}

@Override
public void sendRequestWithCallback(
Request request,
String method,
URI uri,
Map<String, String> headers,
HttpClientResult requestResult) {
client
.executeRequest(request)
.toCompletableFuture()
.whenComplete(
(response, throwable) -> {
if (throwable == null) {
requestResult.complete(response.getStatusCode());
} else {
requestResult.complete(throwable);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class AsyncHttpClientTest extends AbstractHttpClientTest<Request> {
private static final int CONNECTION_TIMEOUT_MS = (int) CONNECTION_TIMEOUT.toMillis();

// request timeout is needed in addition to connect timeout on async-http-client versions 2.1.0+
private static final AsyncHttpClient client = Dsl.asyncHttpClient(configureTimeout(Dsl.config()));
static final AsyncHttpClient client = Dsl.asyncHttpClient(configureTimeout(Dsl.config()));

private static DefaultAsyncHttpClientConfig.Builder configureTimeout(
DefaultAsyncHttpClientConfig.Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ void verifyExtension() {
@ParameterizedTest
@ValueSource(strings = {"/success", "/success?with=params"})
void successfulGetRequest(String path) throws Exception {
assumeTrue(options.getHasSendRequest());

URI uri = resolveAddress(path);
String method = "GET";
int responseCode = doRequest(method, uri);
Expand All @@ -130,6 +132,7 @@ void successfulGetRequest(String path) throws Exception {
@Test
void requestWithNonStandardHttpMethod() throws Exception {
assumeTrue(options.getTestNonStandardHttpMethod());
assumeTrue(options.getHasSendRequest());

URI uri = resolveAddress("/success");
String method = "TEST";
Expand All @@ -150,6 +153,8 @@ void requestWithNonStandardHttpMethod() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"PUT", "POST"})
void successfulRequestWithParent(String method) throws Exception {
assumeTrue(options.getHasSendRequest());

URI uri = resolveAddress("/success");
int responseCode = testing.runWithSpan("parent", () -> doRequest(method, uri));

Expand All @@ -168,6 +173,8 @@ void successfulRequestWithParent(String method) throws Exception {

@Test
void successfulRequestWithNotSampledParent() throws Exception {
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = resolveAddress("/success");
int responseCode = testing.runWithNonRecordingSpan(() -> doRequest(method, uri));
Expand All @@ -185,6 +192,7 @@ void successfulRequestWithNotSampledParent() throws Exception {
void shouldSuppressNestedClientSpanIfAlreadyUnderParentClientSpan(String method)
throws Exception {
assumeTrue(options.getTestWithClientParent());
assumeTrue(options.getHasSendRequest());

URI uri = resolveAddress("/success");
int responseCode =
Expand Down Expand Up @@ -441,6 +449,8 @@ void redirectToSecuredCopiesAuthHeader() throws Exception {
@ParameterizedTest
@CsvSource({"/error,500", "/client-error,400"})
void errorSpan(String path, int responseCode) {
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = resolveAddress(path);

Expand Down Expand Up @@ -468,6 +478,7 @@ void errorSpan(String path, int responseCode) {
@Test
void reuseRequest() throws Exception {
assumeTrue(options.getTestReusedRequest());
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = resolveAddress("/success");
Expand Down Expand Up @@ -497,6 +508,8 @@ void reuseRequest() throws Exception {
// and the trace is not broken)
@Test
void requestWithExistingTracingHeaders() throws Exception {
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = resolveAddress("/success");

Expand All @@ -515,6 +528,8 @@ void requestWithExistingTracingHeaders() throws Exception {
@Test
void captureHttpHeaders() throws Exception {
assumeTrue(options.getTestCaptureHttpHeaders());
assumeTrue(options.getHasSendRequest());

URI uri = resolveAddress("/success");
String method = "GET";
int responseCode =
Expand Down Expand Up @@ -544,6 +559,7 @@ void captureHttpHeaders() throws Exception {
@Test
void connectionErrorUnopenedPort() {
assumeTrue(options.getTestConnectionFailure());
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = URI.create("http://localhost:" + PortUtils.UNUSABLE_PORT + '/');
Expand Down Expand Up @@ -615,6 +631,7 @@ void connectionErrorUnopenedPortWithCallback() throws Exception {
@Test
void connectionErrorNonRoutableAddress() {
assumeTrue(options.getTestRemoteConnection());
assumeTrue(options.getHasSendRequest());

String method = "HEAD";
URI uri = URI.create(options.getTestHttps() ? "https://192.0.2.1/" : "http://192.0.2.1/");
Expand Down Expand Up @@ -648,6 +665,7 @@ void connectionErrorNonRoutableAddress() {
@Test
void readTimedOut() {
assumeTrue(options.getTestReadTimeout());
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = resolveAddress("/read-timeout");
Expand Down Expand Up @@ -687,6 +705,7 @@ void readTimedOut() {
void httpsRequest() throws Exception {
assumeTrue(options.getTestRemoteConnection());
assumeTrue(options.getTestHttps());
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = URI.create("https://localhost:" + server.httpsPort() + "/success");
Expand All @@ -705,6 +724,8 @@ void httpsRequest() throws Exception {

@Test
void httpClientMetrics() throws Exception {
assumeTrue(options.getHasSendRequest());

URI uri = resolveAddress("/success");
String method = "GET";
int responseCode = doRequest(method, uri);
Expand Down Expand Up @@ -743,6 +764,8 @@ void httpClientMetrics() throws Exception {
*/
@Test
void highConcurrency() {
assumeTrue(options.getHasSendRequest());

int count = 50;
String method = "GET";
URI uri = resolveAddress("/success");
Expand Down Expand Up @@ -968,6 +991,7 @@ void highConcurrencyOnSingleConnection() {
@Test
void spanEndsAfterBodyReceived() throws Exception {
assumeTrue(options.isSpanEndsAfterBody());
assumeTrue(options.getHasSendRequest());

String method = "GET";
URI uri = resolveAddress("/long-request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public boolean isLowLevelInstrumentation() {

public abstract boolean getTestCaptureHttpHeaders();

public abstract boolean getHasSendRequest();

public abstract Function<URI, String> getHttpProtocolVersion();

@Nullable
Expand Down Expand Up @@ -134,6 +136,7 @@ default Builder withDefaults() {
.setTestErrorWithCallback(true)
.setTestNonStandardHttpMethod(true)
.setTestCaptureHttpHeaders(true)
.setHasSendRequest(true)
.setHttpProtocolVersion(uri -> "1.1");
}

Expand Down Expand Up @@ -179,6 +182,8 @@ default Builder withDefaults() {

Builder setTestNonStandardHttpMethod(boolean value);

Builder setHasSendRequest(boolean value);

Builder setHttpProtocolVersion(Function<URI, String> value);

@CanIgnoreReturnValue
Expand Down

0 comments on commit 93dd4c8

Please sign in to comment.