Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port AsyncSpanEndStrategy to Instrumenter API #3262

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
ElementType.ANNOTATION_TYPE,
ElementType.CONSTRUCTOR,
ElementType.METHOD,
ElementType.TYPE
ElementType.TYPE,
ElementType.PACKAGE
})
@Documented
@UnstableApi
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.asyncannotationsupport;

import org.checkerframework.checker.nullness.qual.Nullable;

/** A global registry of {@link AsyncOperationEndStrategy} implementations. */
public abstract class AsyncOperationEndStrategies {
private static volatile AsyncOperationEndStrategies instance;

/**
* Sets the actual strategies' registry implementation. The javaagent uses weak references to make
* unloading strategy classes possible.
*
* <p>This is supposed to be only called by the javaagent. <b>Instrumentation must not call
* this.</b>
*/
public static void internalSetStrategiesStorage(AsyncOperationEndStrategies strategies) {
instance = strategies;
}

/** Obtain instance of the async strategy registry. */
public static AsyncOperationEndStrategies instance() {
if (instance == null) {
instance = new AsyncOperationEndStrategiesImpl();
}
return instance;
}

/** Add the passed {@code strategy} to the registry. */
public abstract void registerStrategy(AsyncOperationEndStrategy strategy);

/** Remove the passed {@code strategy} from the registry. */
public abstract void unregisterStrategy(AsyncOperationEndStrategy strategy);

/**
* Returns an {@link AsyncOperationEndStrategy} that is able to compose over {@code returnType},
* or {@code null} if passed type is not supported by any of the strategies stored in this
* registry.
*/
@Nullable
public abstract AsyncOperationEndStrategy resolveStrategy(Class<?> returnType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.asyncannotationsupport;

import static java.util.Objects.requireNonNull;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Default strategies' registry implementation that uses strong references. */
final class AsyncOperationEndStrategiesImpl extends AsyncOperationEndStrategies {
private final List<AsyncOperationEndStrategy> strategies = new CopyOnWriteArrayList<>();

AsyncOperationEndStrategiesImpl() {
registerStrategy(Jdk8AsyncOperationEndStrategy.INSTANCE);
}

@Override
public void registerStrategy(AsyncOperationEndStrategy strategy) {
strategies.add(requireNonNull(strategy));
}

@Override
public void unregisterStrategy(AsyncOperationEndStrategy strategy) {
strategies.remove(strategy);
}

@Nullable
@Override
public AsyncOperationEndStrategy resolveStrategy(Class<?> returnType) {
for (AsyncOperationEndStrategy strategy : strategies) {
if (strategy.supports(returnType)) {
return strategy;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.asyncannotationsupport;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;

/**
* Implementations of this interface describe how to compose over {@linkplain #supports(Class)
* supported} asynchronous computation types and delay marking the operation as ended by calling
* {@link Instrumenter#end(Context, Object, Object, Throwable)}.
*/
public interface AsyncOperationEndStrategy {

/**
* Returns true for every asynchronous computation type {@code asyncType} this strategy supports.
*/
boolean supports(Class<?> asyncType);

/**
* Composes over {@code asyncValue} and delays the {@link Instrumenter#end(Context, Object,
* Object, Throwable)} call until after the asynchronous operation represented by {@code
* asyncValue} completes.
*
* @param instrumenter The {@link Instrumenter} to be used to end the operation stored in the
* {@code context}.
* @param asyncValue Return value from the instrumented method. Must be an instance of a {@code
* asyncType} for which {@link #supports(Class)} returned true (in particular it must not be
* {@code null}).
* @param responseType Expected type of the response that should be obtained from the {@code
* asyncValue}. If the result of the async computation is instance of the passed type it will
* be passed when the {@code instrumenter} is called.
* @return Either {@code asyncValue} or a value composing over {@code asyncValue} for notification
* of completion.
*/
<REQUEST, RESPONSE> Object end(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Object asyncValue,
Class<RESPONSE> responseType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.asyncannotationsupport;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;

/**
* A wrapper over {@link Instrumenter} that is able to defer {@link Instrumenter#end(Context,
* Object, Object, Throwable)} until asynchronous computation finishes.
*/
public final class AsyncOperationEndSupport<REQUEST, RESPONSE> {

/**
* Returns a new {@link AsyncOperationEndSupport} that wraps over passed {@code syncInstrumenter},
* configured for usage with asynchronous computations that are instances of {@code asyncType}. If
* the result of the async computation ends up being an instance of {@code responseType} it will
* be passed as the response to the {@code syncInstrumenter} call; otherwise {@code null} value
* will be used as the response.
*/
public static <REQUEST, RESPONSE> AsyncOperationEndSupport<REQUEST, RESPONSE> create(
Instrumenter<REQUEST, RESPONSE> syncInstrumenter,
Class<RESPONSE> responseType,
Class<?> asyncType) {
return new AsyncOperationEndSupport<>(
syncInstrumenter,
responseType,
asyncType,
AsyncOperationEndStrategies.instance().resolveStrategy(asyncType));
}

private final Instrumenter<REQUEST, RESPONSE> instrumenter;
private final Class<RESPONSE> responseType;
private final Class<?> asyncType;
private final AsyncOperationEndStrategy asyncOperationEndStrategy;

private AsyncOperationEndSupport(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Class<RESPONSE> responseType,
Class<?> asyncType,
AsyncOperationEndStrategy asyncOperationEndStrategy) {
this.instrumenter = instrumenter;
this.responseType = responseType;
this.asyncType = asyncType;
this.asyncOperationEndStrategy = asyncOperationEndStrategy;
}

/**
* Attempts to compose over passed {@code asyncValue} and delay the {@link
* Instrumenter#end(Context, Object, Object, Throwable)} call until the async operation completes.
*
* <p>This method will end the operation immediately if {@code throwable} is passed, if there is
* no {@link AsyncOperationEndStrategy} for the {@code asyncType} used, or if there is a type
* mismatch between passed {@code asyncValue} and the {@code asyncType} that was used to create
* this object.
*
* <p>If the passed {@code asyncValue} is recognized as an asynchronous computation, the operation
* won't be {@link Instrumenter#end(Context, Object, Object, Throwable) ended} until {@code
* asyncValue} completes.
*/
public <ASYNC> ASYNC asyncEnd(
Context context, REQUEST request, ASYNC asyncValue, Throwable throwable) {
// we can end early if an exception was thrown
if (throwable != null) {
instrumenter.end(context, request, null, throwable);
return asyncValue;
}

// use the configured strategy to compose over the asyncValue
if (asyncOperationEndStrategy != null && asyncType.isInstance(asyncValue)) {
return (ASYNC)
asyncOperationEndStrategy.end(instrumenter, context, request, asyncValue, responseType);
}

// fall back to sync end() if asyncValue type doesn't match
instrumenter.end(context, request, null, null);
return asyncValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.asyncannotationsupport;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.checkerframework.checker.nullness.qual.Nullable;

public enum Jdk8AsyncOperationEndStrategy implements AsyncOperationEndStrategy {
INSTANCE;

@Override
public boolean supports(Class<?> asyncType) {
return asyncType == CompletionStage.class || asyncType == CompletableFuture.class;
}

@Override
public <REQUEST, RESPONSE> Object end(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
Object asyncValue,
Class<RESPONSE> responseType) {
if (asyncValue instanceof CompletableFuture) {
CompletableFuture<?> future = (CompletableFuture<?>) asyncValue;
if (tryToEndSynchronously(instrumenter, context, request, future, responseType)) {
return future;
}
return endWhenComplete(instrumenter, context, request, future, responseType);
}
CompletionStage<?> stage = (CompletionStage<?>) asyncValue;
return endWhenComplete(instrumenter, context, request, stage, responseType);
}

/**
* Checks to see if the {@link CompletableFuture} has already been completed and if so
* synchronously ends the span to avoid additional allocations and overhead registering for
* notification of completion.
*/
private static <REQUEST, RESPONSE> boolean tryToEndSynchronously(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
CompletableFuture<?> future,
Class<RESPONSE> responseType) {

if (!future.isDone()) {
return false;
}

try {
Object potentialResponse = future.join();
instrumenter.end(context, request, tryToGetResponse(responseType, potentialResponse), null);
} catch (Throwable t) {
instrumenter.end(context, request, null, t);
}
return true;
}

/**
* Registers for notification of the completion of the {@link CompletionStage} at which time the
* span will be ended.
*/
private static <REQUEST, RESPONSE> CompletionStage<?> endWhenComplete(
Instrumenter<REQUEST, RESPONSE> instrumenter,
Context context,
REQUEST request,
CompletionStage<?> stage,
Class<RESPONSE> responseType) {
return stage.whenComplete(
(result, exception) ->
instrumenter.end(context, request, tryToGetResponse(responseType, result), exception));
}

@Nullable
private static <RESPONSE> RESPONSE tryToGetResponse(Class<RESPONSE> responseType, Object result) {
if (responseType.isInstance(result)) {
return responseType.cast(result);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* Provides implementations of strategies for instrumenting methods that return asynchronous and/or
* reactive values so that the operation can be ended when the asynchronous type completes.
*/
@UnstableApi
package io.opentelemetry.instrumentation.api.asyncannotationsupport;

import io.opentelemetry.instrumentation.api.annotations.UnstableApi;
Loading