Skip to content

Commit

Permalink
Fix native-image generation of reactive applications (#8012)
Browse files Browse the repository at this point in the history
* Refactor registration of AsyncResult extensions so it doesn't pull in AsyncResultDecorator
* Rebuild AsyncResultExtensions at native-image runtime
  • Loading branch information
mcculls authored Nov 27, 2024
1 parent f7a6771 commit 207f770
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -1,47 +1,27 @@
package datadog.trace.bootstrap.instrumentation.decorator;

import static java.util.Collections.singletonList;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;

/**
* This decorator handles asynchronous result types, finishing spans only when the async calls are
* complete. The different async types are supported using {@link AsyncResultSupportExtension} that
* should be registered using {@link #registerExtension(AsyncResultSupportExtension)} first.
* complete. The different async types are supported using {@link AsyncResultExtension} that should
* be registered using {@link AsyncResultExtensions#register(AsyncResultExtension)} first.
*/
public abstract class AsyncResultDecorator extends BaseDecorator {
private static final CopyOnWriteArrayList<AsyncResultSupportExtension> EXTENSIONS =
new CopyOnWriteArrayList<>(
singletonList(new JavaUtilConcurrentAsyncResultSupportExtension()));

private static final ClassValue<AsyncResultSupportExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultSupportExtension>() {
private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultExtension>() {
@Override
protected AsyncResultSupportExtension computeValue(Class<?> type) {
return EXTENSIONS.stream()
protected AsyncResultExtension computeValue(Class<?> type) {
return AsyncResultExtensions.registered().stream()
.filter(extension -> extension.supports(type))
.findFirst()
.orElse(null);
}
};

/**
* Registers an extension to add supported async types.
*
* @param extension The extension to register.
*/
public static void registerExtension(AsyncResultSupportExtension extension) {
if (extension != null) {
EXTENSIONS.add(extension);
}
}

/**
* Look for asynchronous result and decorate it with span finisher. If the result is not
* asynchronous, it will be return unmodified and span will be finished.
Expand All @@ -53,7 +33,7 @@ public static void registerExtension(AsyncResultSupportExtension extension) {
*/
public Object wrapAsyncResultOrFinishSpan(
final Object result, final Class<?> methodReturnType, final AgentSpan span) {
AsyncResultSupportExtension extension;
AsyncResultExtension extension;
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(methodReturnType)) != null) {
Object applied = extension.apply(result, span);
if (applied != null) {
Expand All @@ -64,63 +44,4 @@ public Object wrapAsyncResultOrFinishSpan(
span.finish();
return result;
}

/**
* This interface defines asynchronous result type support extension. It allows deferring the
* support implementations where types are available on classpath.
*/
public interface AsyncResultSupportExtension {
/**
* Checks whether this extensions support a result type.
*
* @param result The result type to check.
* @return {@code true} if the type is supported by this extension, {@code false} otherwise.
*/
boolean supports(Class<?> result);

/**
* Applies the extension to the async result.
*
* @param result The async result.
* @param span The related span.
* @return The result object to return (can be the original result if not modified), or {@code
* null} if the extension could not be applied.
*/
Object apply(Object result, AgentSpan span);
}

private static class JavaUtilConcurrentAsyncResultSupportExtension
implements AsyncResultSupportExtension {
@Override
public boolean supports(Class<?> result) {
return CompletableFuture.class.isAssignableFrom(result)
|| CompletionStage.class.isAssignableFrom(result);
}

@Override
public Object apply(Object result, AgentSpan span) {
if (result instanceof CompletableFuture<?>) {
CompletableFuture<?> completableFuture = (CompletableFuture<?>) result;
if (!completableFuture.isDone() && !completableFuture.isCancelled()) {
return completableFuture.whenComplete(finishSpan(span));
}
} else if (result instanceof CompletionStage<?>) {
CompletionStage<?> completionStage = (CompletionStage<?>) result;
return completionStage.whenComplete(finishSpan(span));
}
return null;
}

private <T> BiConsumer<T, Throwable> finishSpan(AgentSpan span) {
return (o, throwable) -> {
if (throwable != null) {
span.addThrowable(
throwable instanceof ExecutionException || throwable instanceof CompletionException
? throwable.getCause()
: throwable);
}
span.finish();
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;

/**
* This interface defines asynchronous result type support extension. It allows deferring the
* support implementations where types are available on classpath.
*/
public interface AsyncResultExtension {
/**
* Checks whether this extensions support a result type.
*
* @param result The result type to check.
* @return {@code true} if the type is supported by this extension, {@code false} otherwise.
*/
boolean supports(Class<?> result);

/**
* Applies the extension to the async result.
*
* @param result The async result.
* @param span The related span.
* @return The result object to return (can be the original result if not modified), or {@code
* null} if the extension could not be applied.
*/
Object apply(Object result, AgentSpan span);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import static java.util.Collections.singletonList;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;

public final class AsyncResultExtensions {
private static final List<AsyncResultExtension> EXTENSIONS =
new CopyOnWriteArrayList<>(singletonList(new CompletableAsyncResultExtension()));

/**
* Registers an extension to add supported async types.
*
* @param extension The extension to register.
*/
public static void register(AsyncResultExtension extension) {
if (extension != null) {
EXTENSIONS.add(extension);
}
}

/** Returns the list of currently registered extensions. */
public static List<AsyncResultExtension> registered() {
return EXTENSIONS;
}

static final class CompletableAsyncResultExtension implements AsyncResultExtension {
@Override
public boolean supports(Class<?> result) {
return CompletableFuture.class.isAssignableFrom(result)
|| CompletionStage.class.isAssignableFrom(result);
}

@Override
public Object apply(Object result, AgentSpan span) {
if (result instanceof CompletableFuture<?>) {
CompletableFuture<?> completableFuture = (CompletableFuture<?>) result;
if (!completableFuture.isDone() && !completableFuture.isCancelled()) {
return completableFuture.whenComplete(finishSpan(span));
}
} else if (result instanceof CompletionStage<?>) {
CompletionStage<?> completionStage = (CompletionStage<?>) result;
return completionStage.whenComplete(finishSpan(span));
}
return null;
}

private <T> BiConsumer<T, Throwable> finishSpan(AgentSpan span) {
return (o, throwable) -> {
if (throwable != null) {
span.addThrowable(
throwable instanceof ExecutionException || throwable instanceof CompletionException
? throwable.getCause()
: throwable);
}
span.finish();
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) String[
+ "datadog.trace.bootstrap.benchmark.StaticEventLogger:build_time,"
+ "datadog.trace.bootstrap.blocking.BlockingExceptionHandler:build_time,"
+ "datadog.trace.bootstrap.InstrumentationErrors:build_time,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions:rerun,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState:build_time,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter:build_time,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimeHelper:build_time,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

import com.google.common.util.concurrent.ListenableFuture;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public class GuavaAsyncResultSupportExtension
implements AsyncResultDecorator.AsyncResultSupportExtension {
public class GuavaAsyncResultExtension implements AsyncResultExtension {
static {
AsyncResultDecorator.registerExtension(new GuavaAsyncResultSupportExtension());
AsyncResultExtensions.register(new GuavaAsyncResultExtension());
}

/**
* Register the extension as an {@link AsyncResultDecorator.AsyncResultSupportExtension} using
* static class initialization.<br>
* Register the extension as an {@link AsyncResultExtension} using static class initialization.
* <br>
* It uses an empty static method call to ensure the class loading and the one-time-only static
* class initialization. This will ensure this extension will only be registered once to the
* {@link AsyncResultDecorator}.
* class initialization. This will ensure this extension will only be registered once under {@link
* AsyncResultExtensions}.
*/
public static void initialize() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public String instrumentedType() {
@Override
public String[] helperClassNames() {
return new String[] {
this.packageName + ".GuavaAsyncResultSupportExtension",
this.packageName + ".GuavaAsyncResultExtension",
};
}

Expand All @@ -57,7 +57,7 @@ public void methodAdvice(MethodTransformer transformer) {
public static class AbstractFutureAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void init() {
GuavaAsyncResultSupportExtension.initialize();
GuavaAsyncResultExtension.initialize();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class GuavaAsyncResultSupportExtensionTest extends AgentTestRunner {
class GuavaAsyncResultExtensionTest extends AgentTestRunner {
@Override
void configurePreAgent() {
super.configurePreAgent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public Map<String, String> contextStore() {
@Override
public String[] helperClassNames() {
return new String[] {
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension",
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension$WrappedPublisher",
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension$WrappedSubscriber",
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension$WrappedSubscription",
this.packageName + ".ReactiveStreamsAsyncResultExtension",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscription",
};
}

Expand All @@ -82,7 +82,7 @@ public void methodAdvice(MethodTransformer transformer) {
public static class PublisherAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void init() {
ReactiveStreamsAsyncResultSupportExtension.initialize();
ReactiveStreamsAsyncResultExtension.initialize();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package datadog.trace.instrumentation.reactivestreams;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator;
import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator.AsyncResultSupportExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ReactiveStreamsAsyncResultSupportExtension implements AsyncResultSupportExtension {
public class ReactiveStreamsAsyncResultExtension implements AsyncResultExtension {
static {
AsyncResultDecorator.registerExtension(new ReactiveStreamsAsyncResultSupportExtension());
AsyncResultExtensions.register(new ReactiveStreamsAsyncResultExtension());
}

/**
* Register the extension as an {@link AsyncResultSupportExtension} using static class
* initialization.<br>
* Register the extension as an {@link AsyncResultExtension} using static class initialization.
* <br>
* It uses an empty static method call to ensure the class loading and the one-time-only static
* class initialization. This will ensure this extension will only be registered once to the
* {@link AsyncResultDecorator}.
* class initialization. This will ensure this extension will only be registered once under {@link
* AsyncResultExtensions}.
*/
public static void initialize() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import datadog.trace.bootstrap.instrumentation.api.Tags

import java.util.concurrent.CountDownLatch

class ReactiveStreamsAsyncResultSupportExtensionTest extends AgentTestRunner {
class ReactiveStreamsAsyncResultExtensionTest extends AgentTestRunner {
@Override
void configurePreAgent() {
super.configurePreAgent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import spock.lang.Shared
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors

class ReactorAsyncResultSupportExtensionTest extends AgentTestRunner {
class ReactorAsyncResultExtensionTest extends AgentTestRunner {
@Override
void configurePreAgent() {
super.configurePreAgent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public BlockingPublisherInstrumentation() {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ReactorAsyncResultSupportExtension",
packageName + ".ReactorAsyncResultExtension",
};
}

Expand Down Expand Up @@ -83,7 +83,7 @@ public static void after(@Advice.Enter final AgentScope scope) {
public static class AsyncExtensionInstallAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void init() {
ReactorAsyncResultSupportExtension.initialize();
ReactorAsyncResultExtension.initialize();
}
}
}
Loading

0 comments on commit 207f770

Please sign in to comment.