Skip to content

Commit

Permalink
Use timer for pulsar consumer spans (#8050)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Mar 14, 2023
1 parent 41c2dfd commit 9ebfe03
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -87,22 +88,22 @@ public static class ConsumerInternalReceiveAdviser {
private ConsumerInternalReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
public static Timer before() {
return Timer.start();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.Enter Timer timer,
@Advice.This Consumer<?> consumer,
@Advice.Return Message<?> message,
@Advice.Thrown Throwable t,
@Advice.Local(value = "startTime") long startTime) {
@Advice.Thrown Throwable t) {
if (t != null) {
return;
}

Context parent = Context.current();
Context current = startAndEndConsumerReceive(parent, message, startTime, consumer);
Context current = startAndEndConsumerReceive(parent, message, timer, consumer);
if (current != null) {
// ConsumerBase#internalReceive(long,TimeUnit) will be called before
// ConsumerListener#receive(Consumer,Message), so, need to inject Context into Message.
Expand All @@ -116,22 +117,22 @@ public static class ConsumerSyncReceiveAdviser {
private ConsumerSyncReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
public static Timer before() {
return Timer.start();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.Enter Timer timer,
@Advice.This Consumer<?> consumer,
@Advice.Return Message<?> message,
@Advice.Thrown Throwable t,
@Advice.Local(value = "startTime") long startTime) {
@Advice.Thrown Throwable t) {
if (t != null) {
return;
}

Context parent = Context.current();
startAndEndConsumerReceive(parent, message, startTime, consumer);
startAndEndConsumerReceive(parent, message, timer, consumer);
// No need to inject context to message.
}
}
Expand All @@ -141,23 +142,23 @@ public static class ConsumerAsyncReceiveAdviser {
private ConsumerAsyncReceiveAdviser() {}

@Advice.OnMethodEnter
public static void before(@Advice.Local(value = "startTime") long startTime) {
startTime = System.currentTimeMillis();
public static Timer before() {
return Timer.start();
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.Enter Timer timer,
@Advice.This Consumer<?> consumer,
@Advice.Return CompletableFuture<Message<?>> future,
@Advice.Local(value = "startTime") long startTime) {
@Advice.Return CompletableFuture<Message<?>> future) {
future.whenComplete(
(message, t) -> {
if (t != null) {
return;
}

Context parent = Context.current();
startAndEndConsumerReceive(parent, message, startTime, consumer);
startAndEndConsumerReceive(parent, message, timer, consumer);
// No need to inject context to message.
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore;
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -109,7 +109,7 @@ private static AttributesExtractor<PulsarRequest, Void> createMessagingAttribute
}

public static Context startAndEndConsumerReceive(
Context parent, Message<?> message, long start, Consumer<?> consumer) {
Context parent, Message<?> message, Timer timer, Consumer<?> consumer) {
if (message == null) {
return null;
}
Expand All @@ -127,8 +127,8 @@ public static Context startAndEndConsumerReceive(
request,
null,
null,
Instant.ofEpochMilli(start),
Instant.now());
timer.startTime(),
timer.now());
}

private PulsarSingletons() {}
Expand Down

0 comments on commit 9ebfe03

Please sign in to comment.