From 9ebfe03e7260ef33d99d0d3438fa0fbdb0a9df9e Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 14 Mar 2023 13:35:29 +0200 Subject: [PATCH] Use timer for pulsar consumer spans (#8050) --- .../v2_8/ConsumerImplInstrumentation.java | 31 ++++++++++--------- .../v2_8/telemetry/PulsarSingletons.java | 8 ++--- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java index 1c7395c4de73..19dffdb07003 100644 --- a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/ConsumerImplInstrumentation.java @@ -16,6 +16,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.util.concurrent.CompletableFuture; @@ -87,22 +88,22 @@ public static class ConsumerInternalReceiveAdviser { private ConsumerInternalReceiveAdviser() {} @Advice.OnMethodEnter - public static void before(@Advice.Local(value = "startTime") long startTime) { - startTime = System.currentTimeMillis(); + public static Timer before() { + return Timer.start(); } @Advice.OnMethodExit(onThrowable = Throwable.class) public static void after( + @Advice.Enter Timer timer, @Advice.This Consumer consumer, @Advice.Return Message message, - @Advice.Thrown Throwable t, - @Advice.Local(value = "startTime") long startTime) { + @Advice.Thrown Throwable t) { if (t != null) { return; } Context parent = Context.current(); - Context current = startAndEndConsumerReceive(parent, message, startTime, consumer); + Context current = startAndEndConsumerReceive(parent, message, timer, consumer); if (current != null) { // ConsumerBase#internalReceive(long,TimeUnit) will be called before // ConsumerListener#receive(Consumer,Message), so, need to inject Context into Message. @@ -116,22 +117,22 @@ public static class ConsumerSyncReceiveAdviser { private ConsumerSyncReceiveAdviser() {} @Advice.OnMethodEnter - public static void before(@Advice.Local(value = "startTime") long startTime) { - startTime = System.currentTimeMillis(); + public static Timer before() { + return Timer.start(); } @Advice.OnMethodExit(onThrowable = Throwable.class) public static void after( + @Advice.Enter Timer timer, @Advice.This Consumer consumer, @Advice.Return Message message, - @Advice.Thrown Throwable t, - @Advice.Local(value = "startTime") long startTime) { + @Advice.Thrown Throwable t) { if (t != null) { return; } Context parent = Context.current(); - startAndEndConsumerReceive(parent, message, startTime, consumer); + startAndEndConsumerReceive(parent, message, timer, consumer); // No need to inject context to message. } } @@ -141,15 +142,15 @@ public static class ConsumerAsyncReceiveAdviser { private ConsumerAsyncReceiveAdviser() {} @Advice.OnMethodEnter - public static void before(@Advice.Local(value = "startTime") long startTime) { - startTime = System.currentTimeMillis(); + public static Timer before() { + return Timer.start(); } @Advice.OnMethodExit(onThrowable = Throwable.class) public static void after( + @Advice.Enter Timer timer, @Advice.This Consumer consumer, - @Advice.Return CompletableFuture> future, - @Advice.Local(value = "startTime") long startTime) { + @Advice.Return CompletableFuture> future) { future.whenComplete( (message, t) -> { if (t != null) { @@ -157,7 +158,7 @@ public static void after( } Context parent = Context.current(); - startAndEndConsumerReceive(parent, message, startTime, consumer); + startAndEndConsumerReceive(parent, message, timer, consumer); // No need to inject context to message. }); } diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 7c3520a860d9..9bd589427c8b 100644 --- a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -18,10 +18,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore; -import java.time.Instant; import java.util.List; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -109,7 +109,7 @@ private static AttributesExtractor createMessagingAttribute } public static Context startAndEndConsumerReceive( - Context parent, Message message, long start, Consumer consumer) { + Context parent, Message message, Timer timer, Consumer consumer) { if (message == null) { return null; } @@ -127,8 +127,8 @@ public static Context startAndEndConsumerReceive( request, null, null, - Instant.ofEpochMilli(start), - Instant.now()); + timer.startTime(), + timer.now()); } private PulsarSingletons() {}