From 0f001bad30e00865c54c630976b42650b5a18e86 Mon Sep 17 00:00:00 2001 From: Nikita Salnikov-Tarnovski Date: Thu, 21 Oct 2021 21:31:27 +0300 Subject: [PATCH 1/2] Convert RabbitMQ to Instrumenter --- .../rabbitmq-2.7/javaagent/build.gradle.kts | 3 + .../rabbitmq/ChannelAndMethod.java | 21 ++ .../rabbitmq/DeliveryRequest.java | 39 ++++ .../RabbitChannelAttributesExtractor.java | 82 ++++++++ .../RabbitChannelInstrumentation.java | 46 +++-- .../RabbitChannelNetAttributesExtractor.java | 36 ++++ .../RabbitCommandInstrumentation.java | 2 +- .../RabbitDeliveryAttributesExtractor.java | 93 +++++++++ ...liveryExperimentalAttributesExtractor.java | 43 ++++ ...abbitDeliveryExtraAttributesExtractor.java | 31 +++ .../rabbitmq/RabbitInstrumenterHelper.java | 68 +++++++ .../RabbitReceiveAttributesExtractor.java | 91 +++++++++ ...eceiveExperimentalAttributesExtractor.java | 33 +++ .../RabbitReceiveNetAttributesExtractor.java | 37 ++++ .../rabbitmq/RabbitSingletons.java | 83 ++++++++ .../rabbitmq/RabbitTracer.java | 188 ------------------ .../rabbitmq/ReceiveRequest.java | 34 ++++ .../rabbitmq/TextMapExtractAdapter.java | 13 +- .../rabbitmq/TracedDelegatingConsumer.java | 10 +- 19 files changed, 736 insertions(+), 217 deletions(-) create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelNetAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExtraAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesExtractor.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java delete mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitTracer.java create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java diff --git a/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts b/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts index c7e676fe412d..4c7138f8c545 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts +++ b/instrumentation/rabbitmq-2.7/javaagent/build.gradle.kts @@ -14,6 +14,9 @@ muzzle { dependencies { library("com.rabbitmq:amqp-client:2.7.0") + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + testLibrary("org.springframework.amqp:spring-rabbit:1.1.0.RELEASE") { exclude("com.rabbitmq", "amqp-client") } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java new file mode 100644 index 000000000000..bdc7e40b3065 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.google.auto.value.AutoValue; +import com.rabbitmq.client.Channel; + +@AutoValue +public abstract class ChannelAndMethod { + + public static ChannelAndMethod create(Channel channel, String method) { + return new AutoValue_ChannelAndMethod(channel, method); + } + + abstract Channel getChannel(); + + abstract String getMethod(); +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java new file mode 100644 index 000000000000..d8e8a7351787 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/DeliveryRequest.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.google.auto.value.AutoValue; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +@AutoValue +abstract class DeliveryRequest { + + static DeliveryRequest create( + String queue, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + return new AutoValue_DeliveryRequest(queue, envelope, properties, body); + } + + abstract String getQueue(); + + abstract Envelope getEnvelope(); + + abstract AMQP.BasicProperties getProperties(); + + @SuppressWarnings("mutable") + abstract byte[] getBody(); + + String spanName() { + String queue = getQueue(); + if (queue == null || queue.isEmpty()) { + return " process"; + } else if (queue.startsWith("amq.gen-")) { + return " process"; + } else { + return queue + " process"; + } + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesExtractor.java new file mode 100644 index 000000000000..b9d7f1a4cc8d --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesExtractor.java @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; + +public class RabbitChannelAttributesExtractor + extends MessagingAttributesExtractor { + @Override + public MessageOperation operation() { + return MessageOperation.SEND; + } + + @Override + protected String system(ChannelAndMethod channelAndMethod) { + return "rabbitmq"; + } + + @Override + protected String destinationKind(ChannelAndMethod channelAndMethod) { + return SemanticAttributes.MessagingDestinationKindValues.QUEUE; + } + + @Nullable + @Override + protected String destination(ChannelAndMethod channelAndMethod) { + return null; + } + + @Override + protected boolean temporaryDestination(ChannelAndMethod channelAndMethod) { + return false; + } + + @Nullable + @Override + protected String protocol(ChannelAndMethod channelAndMethod) { + return null; + } + + @Nullable + @Override + protected String protocolVersion(ChannelAndMethod channelAndMethod) { + return null; + } + + @Nullable + @Override + protected String url(ChannelAndMethod channelAndMethod) { + return null; + } + + @Nullable + @Override + protected String conversationId(ChannelAndMethod channelAndMethod) { + return null; + } + + @Nullable + @Override + protected Long messagePayloadSize(ChannelAndMethod channelAndMethod) { + return null; + } + + @Nullable + @Override + protected Long messagePayloadCompressedSize(ChannelAndMethod channelAndMethod) { + return null; + } + + @Nullable + @Override + protected String messageId(ChannelAndMethod channelAndMethod, @Nullable Void unused) { + return null; + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java index 9ccf1a577245..d132a5b53e8d 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java @@ -8,7 +8,9 @@ import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitCommandInstrumentation.SpanHolder.CURRENT_RABBIT_CONTEXT; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitTracer.tracer; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitInstrumenterHelper.helper; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitSingletons.channelInstrumenter; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitSingletons.receiveInstrumenter; import static net.bytebuddy.matcher.ElementMatchers.canThrow; import static net.bytebuddy.matcher.ElementMatchers.isGetter; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -94,13 +96,21 @@ public static void onEnter( @Advice.Origin("Channel.#m") String method, @Advice.Local("otelCallDepth") CallDepth callDepth, @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { + @Advice.Local("otelScope") Scope scope, + @Advice.Local("otelRequest") ChannelAndMethod request) { callDepth = CallDepth.forClass(Channel.class); if (callDepth.getAndIncrement() > 0) { return; } - context = tracer().startSpan(method, channel.getConnection()); + Context parentContext = Java8BytecodeBridge.currentContext(); + request = ChannelAndMethod.create(channel, method); + + if (!channelInstrumenter().shouldStart(parentContext, request)) { + return; + } + + context = channelInstrumenter().start(parentContext, request); CURRENT_RABBIT_CONTEXT.set(context); scope = context.makeCurrent(); } @@ -110,7 +120,8 @@ public static void stopSpan( @Advice.Thrown Throwable throwable, @Advice.Local("otelCallDepth") CallDepth callDepth, @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { + @Advice.Local("otelScope") Scope scope, + @Advice.Local("otelRequest") ChannelAndMethod request) { if (callDepth.decrementAndGet() > 0) { return; } @@ -118,11 +129,7 @@ public static void stopSpan( scope.close(); CURRENT_RABBIT_CONTEXT.remove(); - if (throwable != null) { - tracer().endExceptionally(context, throwable); - } else { - tracer().end(context); - } + channelInstrumenter().end(context, request, null, throwable); } } @@ -139,7 +146,7 @@ public static void setSpanNameAddHeaders( Span span = Java8BytecodeBridge.spanFromContext(context); if (span.getSpanContext().isValid()) { - tracer().onPublish(span, exchange, routingKey); + helper().onPublish(span, exchange, routingKey); if (body != null) { span.setAttribute( SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length); @@ -149,13 +156,13 @@ public static void setSpanNameAddHeaders( if (props == null) { props = MessageProperties.MINIMAL_BASIC; } - tracer().onProps(span, props); + helper().onProps(span, props); // We need to copy the BasicProperties and provide a header map we can modify Map headers = props.getHeaders(); headers = (headers == null) ? new HashMap<>() : new HashMap<>(headers); - tracer().inject(context, headers, TextMapInjectAdapter.SETTER); + helper().inject(context, headers, TextMapInjectAdapter.SETTER); props = new AMQP.BasicProperties( @@ -199,14 +206,17 @@ public static void extractAndStartSpan( return; } + Context parentContext = Java8BytecodeBridge.currentContext(); + ReceiveRequest request = + ReceiveRequest.create(queue, startTime, response, channel.getConnection()); + if (!receiveInstrumenter().shouldStart(parentContext, request)) { + return; + } + // can't create span and put into scope in method enter above, because can't add parent after // span creation - Context context = tracer().startGetSpan(queue, startTime, response, channel.getConnection()); - if (throwable != null) { - tracer().endExceptionally(context, throwable); - } else { - tracer().end(context); - } + Context context = receiveInstrumenter().start(parentContext, request); + receiveInstrumenter().end(context, request, null, throwable); } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelNetAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelNetAttributesExtractor.java new file mode 100644 index 000000000000..7ecc88a10f13 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelNetAttributesExtractor.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor; +import javax.annotation.Nullable; + +public class RabbitChannelNetAttributesExtractor + extends NetClientAttributesExtractor { + @Nullable + @Override + public String transport(ChannelAndMethod channelAndMethod, @Nullable Void unused) { + return null; + } + + @Nullable + @Override + public String peerName(ChannelAndMethod channelAndMethod, @Nullable Void unused) { + return channelAndMethod.getChannel().getConnection().getAddress().getHostName(); + } + + @Nullable + @Override + public Integer peerPort(ChannelAndMethod channelAndMethod, @Nullable Void unused) { + return channelAndMethod.getChannel().getConnection().getPort(); + } + + @Nullable + @Override + public String peerIp(ChannelAndMethod channelAndMethod, @Nullable Void unused) { + return channelAndMethod.getChannel().getConnection().getAddress().getHostAddress(); + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitCommandInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitCommandInstrumentation.java index b16fd3160050..ec5d094e38d9 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitCommandInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitCommandInstrumentation.java @@ -51,7 +51,7 @@ public static void setSpanNameAddHeaders(@Advice.This Command command) { Context context = CURRENT_RABBIT_CONTEXT.get(); if (context != null && command.getMethod() != null) { - RabbitTracer.onCommand(Java8BytecodeBridge.spanFromContext(context), command); + RabbitInstrumenterHelper.onCommand(Java8BytecodeBridge.spanFromContext(context), command); } } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesExtractor.java new file mode 100644 index 000000000000..d360c272dfe6 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesExtractor.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; + +public class RabbitDeliveryAttributesExtractor + extends MessagingAttributesExtractor { + @Override + public MessageOperation operation() { + return MessageOperation.PROCESS; + } + + @Override + protected String system(DeliveryRequest request) { + return "rabbitmq"; + } + + @Override + protected String destinationKind(DeliveryRequest request) { + return SemanticAttributes.MessagingDestinationKindValues.QUEUE; + } + + @Nullable + @Override + protected String destination(DeliveryRequest request) { + if (request.getEnvelope() != null) { + return normalizeExchangeName(request.getEnvelope().getExchange()); + } else { + return null; + } + } + + private static String normalizeExchangeName(String exchange) { + return exchange == null || exchange.isEmpty() ? "" : exchange; + } + + @Override + protected boolean temporaryDestination(DeliveryRequest request) { + return false; + } + + @Nullable + @Override + protected String protocol(DeliveryRequest request) { + return null; + } + + @Nullable + @Override + protected String protocolVersion(DeliveryRequest request) { + return null; + } + + @Nullable + @Override + protected String url(DeliveryRequest request) { + return null; + } + + @Nullable + @Override + protected String conversationId(DeliveryRequest request) { + return null; + } + + @Nullable + @Override + protected Long messagePayloadSize(DeliveryRequest request) { + if (request.getBody() != null) { + return (long) request.getBody().length; + } + return null; + } + + @Nullable + @Override + protected Long messagePayloadCompressedSize(DeliveryRequest request) { + return null; + } + + @Nullable + @Override + protected String messageId(DeliveryRequest request, @Nullable Void unused) { + return null; + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java new file mode 100644 index 000000000000..83b90cdcf765 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import java.util.Date; +import javax.annotation.Nullable; + +class RabbitDeliveryExperimentalAttributesExtractor + implements AttributesExtractor { + private static final AttributeKey RABBITMQ_COMMAND = + AttributeKey.stringKey("rabbitmq.command"); + private static final AttributeKey RABBITMQ_QUEUE_TIME = + AttributeKey.longKey("rabbitmq.record.queue_time_ms"); + + @Override + public void onStart(AttributesBuilder attributes, DeliveryRequest request) { + Date timestamp = request.getProperties().getTimestamp(); + if (timestamp != null) { + // this will be set if the sender sets the timestamp, + // or if a plugin is installed on the rabbitmq broker + long produceTimeMillis = timestamp.getTime(); + set( + attributes, + RABBITMQ_QUEUE_TIME, + Math.max(0L, System.currentTimeMillis() - produceTimeMillis)); + } + + set(attributes, RABBITMQ_COMMAND, "basic.deliver"); + } + + @Override + public void onEnd( + AttributesBuilder attributes, + DeliveryRequest request, + @Nullable Void unused, + @Nullable Throwable error) {} +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExtraAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExtraAttributesExtractor.java new file mode 100644 index 000000000000..7042b2d13207 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExtraAttributesExtractor.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.rabbitmq.client.Envelope; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; + +class RabbitDeliveryExtraAttributesExtractor implements AttributesExtractor { + + @Override + public void onStart(AttributesBuilder attributes, DeliveryRequest request) { + Envelope envelope = request.getEnvelope(); + String routingKey = envelope.getRoutingKey(); + if (routingKey != null && !routingKey.isEmpty()) { + set(attributes, SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY, routingKey); + } + } + + @Override + public void onEnd( + AttributesBuilder attributes, + DeliveryRequest request, + @Nullable Void unused, + @Nullable Throwable error) {} +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java new file mode 100644 index 000000000000..52fff0a454d1 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java @@ -0,0 +1,68 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Command; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Map; + +public class RabbitInstrumenterHelper { + + private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = + Config.get().getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false); + + private static final RabbitInstrumenterHelper INSTRUMENTER_HELPER = + new RabbitInstrumenterHelper(); + + public static RabbitInstrumenterHelper helper() { + return INSTRUMENTER_HELPER; + } + + public void onPublish(Span span, String exchange, String routingKey) { + String exchangeName = normalizeExchangeName(exchange); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, exchangeName); + span.updateName(exchangeName + " send"); + if (routingKey != null && !routingKey.isEmpty()) { + span.setAttribute(SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY, routingKey); + } + if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + span.setAttribute("rabbitmq.command", "basic.publish"); + } + } + + public void onProps(Span span, AMQP.BasicProperties props) { + if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + Integer deliveryMode = props.getDeliveryMode(); + if (deliveryMode != null) { + span.setAttribute("rabbitmq.delivery_mode", deliveryMode); + } + } + } + + private static String normalizeExchangeName(String exchange) { + return exchange == null || exchange.isEmpty() ? "" : exchange; + } + + public static void onCommand(Span span, Command command) { + String name = command.getMethod().protocolMethodName(); + + if (!name.equals("basic.publish")) { + span.updateName(name); + } + if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + span.setAttribute("rabbitmq.command", name); + } + } + + public void inject(Context context, Map headers, TextMapInjectAdapter setter) { + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, headers, setter); + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesExtractor.java new file mode 100644 index 000000000000..690e1235e8af --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesExtractor.java @@ -0,0 +1,91 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.rabbitmq.client.GetResponse; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; + +public class RabbitReceiveAttributesExtractor + extends MessagingAttributesExtractor { + @Override + public MessageOperation operation() { + return MessageOperation.RECEIVE; + } + + @Override + protected String system(ReceiveRequest request) { + return "rabbitmq"; + } + + @Override + protected String destinationKind(ReceiveRequest request) { + return SemanticAttributes.MessagingDestinationKindValues.QUEUE; + } + + @Nullable + @Override + protected String destination(ReceiveRequest request) { + if (request.getResponse() != null) { + return normalizeExchangeName(request.getResponse().getEnvelope().getExchange()); + } else { + return null; + } + } + + private static String normalizeExchangeName(String exchange) { + return exchange == null || exchange.isEmpty() ? "" : exchange; + } + + @Override + protected boolean temporaryDestination(ReceiveRequest request) { + return false; + } + + @Nullable + @Override + protected String protocol(ReceiveRequest request) { + return null; + } + + @Nullable + @Override + protected String protocolVersion(ReceiveRequest request) { + return null; + } + + @Nullable + @Override + protected String url(ReceiveRequest request) { + return null; + } + + @Nullable + @Override + protected String conversationId(ReceiveRequest request) { + return null; + } + + @Nullable + @Override + protected Long messagePayloadSize(ReceiveRequest request) { + return null; + } + + @Nullable + @Override + protected Long messagePayloadCompressedSize(ReceiveRequest request) { + return null; + } + + @Nullable + @Override + protected String messageId(ReceiveRequest request, @Nullable GetResponse response) { + return null; + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java new file mode 100644 index 000000000000..fa4036ca272a --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.rabbitmq.client.GetResponse; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import javax.annotation.Nullable; + +class RabbitReceiveExperimentalAttributesExtractor + implements AttributesExtractor { + private static final AttributeKey RABBITMQ_COMMAND = + AttributeKey.stringKey("rabbitmq.command"); + private static final AttributeKey RABBITMQ_QUEUE = + AttributeKey.stringKey("rabbitmq.queue"); + + @Override + public void onStart(AttributesBuilder attributes, ReceiveRequest receiveRequest) { + set(attributes, RABBITMQ_COMMAND, "basic.get"); + set(attributes, RABBITMQ_QUEUE, receiveRequest.getQueue()); + } + + @Override + public void onEnd( + AttributesBuilder attributes, + ReceiveRequest receiveRequest, + @Nullable GetResponse response, + @Nullable Throwable error) {} +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesExtractor.java new file mode 100644 index 000000000000..2945d5aa9d66 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveNetAttributesExtractor.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.rabbitmq.client.GetResponse; +import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor; +import javax.annotation.Nullable; + +public class RabbitReceiveNetAttributesExtractor + extends NetClientAttributesExtractor { + @Nullable + @Override + public String transport(ReceiveRequest request, @Nullable GetResponse response) { + return null; + } + + @Nullable + @Override + public String peerName(ReceiveRequest request, @Nullable GetResponse response) { + return request.getConnection().getAddress().getHostName(); + } + + @Nullable + @Override + public Integer peerPort(ReceiveRequest request, @Nullable GetResponse response) { + return request.getConnection().getPort(); + } + + @Nullable + @Override + public String peerIp(ReceiveRequest request, @Nullable GetResponse response) { + return request.getConnection().getAddress().getHostAddress(); + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java new file mode 100644 index 000000000000..79fc60ef9f02 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.PRODUCER; + +import com.rabbitmq.client.GetResponse; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import java.util.ArrayList; +import java.util.List; + +public class RabbitSingletons { + private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = + Config.get().getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false); + private static final String instrumentationName = "io.opentelemetry.rabbitmq-2.7"; + private static final Instrumenter channelInstrumenter; + private static final Instrumenter receiveInstrumenter; + private static final Instrumenter deliverInstrumenter; + + static { + channelInstrumenter = createChanneInstrumenter(); + receiveInstrumenter = createReceiveInstrumenter(); + deliverInstrumenter = createDeliverInstrumenter(); + } + + public static Instrumenter channelInstrumenter() { + return channelInstrumenter; + } + + public static Instrumenter receiveInstrumenter() { + return receiveInstrumenter; + } + + static Instrumenter deliverInstrumenter() { + return deliverInstrumenter; + } + + private static Instrumenter createChanneInstrumenter() { + return Instrumenter.newBuilder( + GlobalOpenTelemetry.get(), instrumentationName, ChannelAndMethod::getMethod) + .addAttributesExtractors( + new RabbitChannelAttributesExtractor(), new RabbitChannelNetAttributesExtractor()) + .newInstrumenter( + channelAndMethod -> + channelAndMethod.getMethod().equals("Channel.basicPublish") ? PRODUCER : CLIENT); + } + + private static Instrumenter createReceiveInstrumenter() { + List> extractors = new ArrayList<>(); + extractors.add(new RabbitReceiveAttributesExtractor()); + extractors.add(new RabbitReceiveNetAttributesExtractor()); + if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + extractors.add(new RabbitReceiveExperimentalAttributesExtractor()); + } + + return Instrumenter.newBuilder( + GlobalOpenTelemetry.get(), instrumentationName, ReceiveRequest::spanName) + .addAttributesExtractors(extractors) + .newInstrumenter(SpanKindExtractor.alwaysClient()); + } + + private static Instrumenter createDeliverInstrumenter() { + List> extractors = new ArrayList<>(); + extractors.add(new RabbitDeliveryAttributesExtractor()); + extractors.add(new RabbitDeliveryExtraAttributesExtractor()); + if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + extractors.add(new RabbitDeliveryExperimentalAttributesExtractor()); + } + + return Instrumenter.newBuilder( + GlobalOpenTelemetry.get(), instrumentationName, DeliveryRequest::spanName) + .addAttributesExtractors(extractors) + .newConsumerInstrumenter(TextMapExtractAdapter.GETTER); + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitTracer.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitTracer.java deleted file mode 100644 index bde5d6ef779f..000000000000 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitTracer.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.rabbitmq; - -import static io.opentelemetry.api.trace.SpanKind.CLIENT; -import static io.opentelemetry.api.trace.SpanKind.CONSUMER; -import static io.opentelemetry.api.trace.SpanKind.PRODUCER; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.TextMapExtractAdapter.GETTER; - -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Command; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.GetResponse; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.config.Config; -import io.opentelemetry.instrumentation.api.tracer.BaseTracer; -import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -public class RabbitTracer extends BaseTracer { - - private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = - Config.get().getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false); - - private static final RabbitTracer TRACER = new RabbitTracer(); - - public static RabbitTracer tracer() { - return TRACER; - } - - public Context startSpan(String method, Connection connection) { - Context parentContext = Context.current(); - SpanKind kind = method.equals("Channel.basicPublish") ? PRODUCER : CLIENT; - SpanBuilder span = - spanBuilder(parentContext, method, kind) - .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rabbitmq") - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"); - - NetPeerAttributes.INSTANCE.setNetPeer(span, connection.getAddress(), connection.getPort()); - - return parentContext.with(span.startSpan()); - } - - public Context startGetSpan( - String queue, long startTime, GetResponse response, Connection connection) { - Context parentContext = Context.current(); - SpanBuilder spanBuilder = - spanBuilder(parentContext, spanNameOnGet(queue), CLIENT) - .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rabbitmq") - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue") - .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive") - .setStartTimestamp(startTime, TimeUnit.MILLISECONDS); - - if (response != null) { - spanBuilder.setAttribute( - SemanticAttributes.MESSAGING_DESTINATION, - normalizeExchangeName(response.getEnvelope().getExchange())); - spanBuilder.setAttribute( - SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY, - response.getEnvelope().getRoutingKey()); - spanBuilder.setAttribute( - SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, - (long) response.getBody().length); - } - NetPeerAttributes.INSTANCE.setNetPeer( - spanBuilder, connection.getAddress(), connection.getPort()); - onGet(spanBuilder, queue); - - // TODO: withClientSpan()? - return parentContext.with(spanBuilder.startSpan()); - } - - public Context startDeliverySpan( - String queue, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { - Map headers = properties.getHeaders(); - Context parentContext = extract(headers, GETTER); - - long startTimeMillis = System.currentTimeMillis(); - Span span = - spanBuilder(parentContext, spanNameOnDeliver(queue), CONSUMER) - .setStartTimestamp(startTimeMillis, TimeUnit.MILLISECONDS) - .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rabbitmq") - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue") - .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") - .startSpan(); - onDeliver(span, envelope); - - if (body != null) { - span.setAttribute( - SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length); - } - if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES && properties.getTimestamp() != null) { - // this will be set if the sender sets the timestamp, - // or if a plugin is installed on the rabbitmq broker - long produceTimeMillis = properties.getTimestamp().getTime(); - span.setAttribute( - "rabbitmq.record.queue_time_ms", Math.max(0L, startTimeMillis - produceTimeMillis)); - } - - return withConsumerSpan(parentContext, span); - } - - public void onPublish(Span span, String exchange, String routingKey) { - String exchangeName = normalizeExchangeName(exchange); - span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, exchangeName); - span.updateName(exchangeName + " send"); - if (routingKey != null && !routingKey.isEmpty()) { - span.setAttribute(SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY, routingKey); - } - if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - span.setAttribute("rabbitmq.command", "basic.publish"); - } - } - - public void onProps(Span span, AMQP.BasicProperties props) { - if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - Integer deliveryMode = props.getDeliveryMode(); - if (deliveryMode != null) { - span.setAttribute("rabbitmq.delivery_mode", deliveryMode); - } - } - } - - public String spanNameOnGet(String queue) { - return (queue.startsWith("amq.gen-") ? "" : queue) + " receive"; - } - - public void onGet(SpanBuilder span, String queue) { - if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - span.setAttribute("rabbitmq.command", "basic.get"); - span.setAttribute("rabbitmq.queue", queue); - } - } - - public String spanNameOnDeliver(String queue) { - if (queue == null || queue.isEmpty()) { - return " process"; - } else if (queue.startsWith("amq.gen-")) { - return " process"; - } else { - return queue + " process"; - } - } - - public void onDeliver(Span span, Envelope envelope) { - if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - span.setAttribute("rabbitmq.command", "basic.deliver"); - } - - if (envelope != null) { - String exchange = envelope.getExchange(); - span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, normalizeExchangeName(exchange)); - String routingKey = envelope.getRoutingKey(); - if (routingKey != null && !routingKey.isEmpty()) { - span.setAttribute(SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY, routingKey); - } - } - } - - private static String normalizeExchangeName(String exchange) { - return exchange == null || exchange.isEmpty() ? "" : exchange; - } - - public static void onCommand(Span span, Command command) { - String name = command.getMethod().protocolMethodName(); - - if (!name.equals("basic.publish")) { - span.updateName(name); - } - if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - span.setAttribute("rabbitmq.command", name); - } - } - - @Override - protected String getInstrumentationName() { - return "io.opentelemetry.rabbitmq-2.7"; - } -} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java new file mode 100644 index 000000000000..de6fc9d8fb24 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java @@ -0,0 +1,34 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import com.google.auto.value.AutoValue; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.GetResponse; +import javax.annotation.Nullable; + +@AutoValue +public abstract class ReceiveRequest { + + public static ReceiveRequest create( + String queue, long startTime, GetResponse response, Connection connection) { + return new AutoValue_ReceiveRequest(queue, startTime, response, connection); + } + + public abstract String getQueue(); + + public abstract long getStartTime(); + + @Nullable + public abstract GetResponse getResponse(); + + public abstract Connection getConnection(); + + String spanName() { + String queue = getQueue(); + return (queue.startsWith("amq.gen-") ? "" : queue) + " receive"; + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TextMapExtractAdapter.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TextMapExtractAdapter.java index 9288cf48d2ce..917105aca8a5 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TextMapExtractAdapter.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TextMapExtractAdapter.java @@ -7,21 +7,22 @@ import io.opentelemetry.context.propagation.TextMapGetter; import java.util.Collections; -import java.util.Map; -public class TextMapExtractAdapter implements TextMapGetter> { +public class TextMapExtractAdapter implements TextMapGetter { public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); @Override - public Iterable keys(Map carrier) { - return carrier != null ? carrier.keySet() : Collections.emptyList(); + public Iterable keys(DeliveryRequest carrier) { + return carrier != null + ? carrier.getProperties().getHeaders().keySet() + : Collections.emptyList(); } @Override - public String get(Map carrier, String key) { + public String get(DeliveryRequest carrier, String key) { if (carrier != null) { - Object obj = carrier.get(key); + Object obj = carrier.getProperties().getHeaders().get(key); return obj == null ? null : obj.toString(); } else { return null; diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java index 87181bb873f9..238ac1e110fc 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitTracer.tracer; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitSingletons.deliverInstrumenter; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; @@ -58,14 +58,16 @@ public void handleRecoverOk(String consumerTag) { public void handleDelivery( String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - Context context = tracer().startDeliverySpan(queue, envelope, properties, body); + Context parentContext = Context.current(); + DeliveryRequest request = DeliveryRequest.create(queue, envelope, properties, body); + Context context = deliverInstrumenter().start(parentContext, request); try (Scope ignored = context.makeCurrent()) { // Call delegate. delegate.handleDelivery(consumerTag, envelope, properties, body); - tracer().end(context); + deliverInstrumenter().end(context, request, null, null); } catch (Throwable throwable) { - tracer().endExceptionally(context, throwable); + deliverInstrumenter().end(context, request, null, throwable); throw throwable; } } From 801e978cfd373ef477251fbe57ec839c47782574 Mon Sep 17 00:00:00 2001 From: Nikita Salnikov-Tarnovski Date: Tue, 26 Oct 2021 16:36:33 +0300 Subject: [PATCH 2/2] Polish --- .../RabbitChannelInstrumentation.java | 12 ++++--- ...liveryExperimentalAttributesExtractor.java | 4 +-- .../rabbitmq/RabbitInstrumenterHelper.java | 8 +++-- ...eceiveExperimentalAttributesExtractor.java | 4 +-- .../rabbitmq/RabbitSingletons.java | 7 ++-- .../rabbitmq/ReceiveRequest.java | 15 +++++++-- .../instrumentation/rabbitmq/Timer.java | 32 +++++++++++++++++++ .../rabbitmq/TracedDelegatingConsumer.java | 6 ++++ 8 files changed, 70 insertions(+), 18 deletions(-) create mode 100644 instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/Timer.java diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java index 24e1a5fec5f7..3bc4e847990c 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java @@ -188,27 +188,29 @@ public static void setSpanNameAddHeaders( public static class ChannelGetAdvice { @Advice.OnMethodEnter - public static long takeTimestamp(@Advice.Local("otelCallDepth") CallDepth callDepth) { + public static void takeTimestamp( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelTimer") Timer timer) { callDepth = CallDepth.forClass(Channel.class); callDepth.getAndIncrement(); - return System.currentTimeMillis(); + timer = Timer.start(); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void extractAndStartSpan( @Advice.This Channel channel, @Advice.Argument(0) String queue, - @Advice.Enter long startTime, @Advice.Return GetResponse response, @Advice.Thrown Throwable throwable, - @Advice.Local("otelCallDepth") CallDepth callDepth) { + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Local("otelTimer") Timer timer) { if (callDepth.decrementAndGet() > 0) { return; } Context parentContext = Java8BytecodeBridge.currentContext(); ReceiveRequest request = - ReceiveRequest.create(queue, startTime, response, channel.getConnection()); + ReceiveRequest.create(queue, timer, response, channel.getConnection()); if (!receiveInstrumenter().shouldStart(parentContext, request)) { return; } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java index 83b90cdcf765..b124621a0709 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryExperimentalAttributesExtractor.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitInstrumenterHelper.RABBITMQ_COMMAND; + import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; @@ -13,8 +15,6 @@ class RabbitDeliveryExperimentalAttributesExtractor implements AttributesExtractor { - private static final AttributeKey RABBITMQ_COMMAND = - AttributeKey.stringKey("rabbitmq.command"); private static final AttributeKey RABBITMQ_QUEUE_TIME = AttributeKey.longKey("rabbitmq.record.queue_time_ms"); diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java index 52fff0a454d1..9f4f93108a02 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java @@ -8,6 +8,7 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Command; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.config.Config; @@ -15,6 +16,7 @@ import java.util.Map; public class RabbitInstrumenterHelper { + static final AttributeKey RABBITMQ_COMMAND = AttributeKey.stringKey("rabbitmq.command"); private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = Config.get().getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false); @@ -34,7 +36,7 @@ public void onPublish(Span span, String exchange, String routingKey) { span.setAttribute(SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY, routingKey); } if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - span.setAttribute("rabbitmq.command", "basic.publish"); + span.setAttribute(RABBITMQ_COMMAND, "basic.publish"); } } @@ -58,11 +60,11 @@ public static void onCommand(Span span, Command command) { span.updateName(name); } if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { - span.setAttribute("rabbitmq.command", name); + span.setAttribute(RABBITMQ_COMMAND, name); } } - public void inject(Context context, Map headers, TextMapInjectAdapter setter) { + public void inject(Context context, Map headers, MapSetter setter) { GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, headers, setter); } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java index fa4036ca272a..ff39df98b735 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveExperimentalAttributesExtractor.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitInstrumenterHelper.RABBITMQ_COMMAND; + import com.rabbitmq.client.GetResponse; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributesBuilder; @@ -13,8 +15,6 @@ class RabbitReceiveExperimentalAttributesExtractor implements AttributesExtractor { - private static final AttributeKey RABBITMQ_COMMAND = - AttributeKey.stringKey("rabbitmq.command"); private static final AttributeKey RABBITMQ_QUEUE = AttributeKey.stringKey("rabbitmq.queue"); diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java index 79fc60ef9f02..12d0ba552327 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java @@ -44,7 +44,7 @@ static Instrumenter deliverInstrumenter() { } private static Instrumenter createChanneInstrumenter() { - return Instrumenter.newBuilder( + return Instrumenter.builder( GlobalOpenTelemetry.get(), instrumentationName, ChannelAndMethod::getMethod) .addAttributesExtractors( new RabbitChannelAttributesExtractor(), new RabbitChannelNetAttributesExtractor()) @@ -61,9 +61,10 @@ private static Instrumenter createReceiveInstrument extractors.add(new RabbitReceiveExperimentalAttributesExtractor()); } - return Instrumenter.newBuilder( + return Instrumenter.builder( GlobalOpenTelemetry.get(), instrumentationName, ReceiveRequest::spanName) .addAttributesExtractors(extractors) + .setTimeExtractors(ReceiveRequest::startTime, (request, response, error) -> request.now()) .newInstrumenter(SpanKindExtractor.alwaysClient()); } @@ -75,7 +76,7 @@ private static Instrumenter createDeliverInstrumenter() { extractors.add(new RabbitDeliveryExperimentalAttributesExtractor()); } - return Instrumenter.newBuilder( + return Instrumenter.builder( GlobalOpenTelemetry.get(), instrumentationName, DeliveryRequest::spanName) .addAttributesExtractors(extractors) .newConsumerInstrumenter(TextMapExtractAdapter.GETTER); diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java index de6fc9d8fb24..fc143be0aa13 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ReceiveRequest.java @@ -8,19 +8,20 @@ import com.google.auto.value.AutoValue; import com.rabbitmq.client.Connection; import com.rabbitmq.client.GetResponse; +import java.time.Instant; import javax.annotation.Nullable; @AutoValue public abstract class ReceiveRequest { public static ReceiveRequest create( - String queue, long startTime, GetResponse response, Connection connection) { - return new AutoValue_ReceiveRequest(queue, startTime, response, connection); + String queue, Timer timer, GetResponse response, Connection connection) { + return new AutoValue_ReceiveRequest(queue, timer, response, connection); } public abstract String getQueue(); - public abstract long getStartTime(); + public abstract Timer getTimer(); @Nullable public abstract GetResponse getResponse(); @@ -31,4 +32,12 @@ String spanName() { String queue = getQueue(); return (queue.startsWith("amq.gen-") ? "" : queue) + " receive"; } + + Instant startTime() { + return getTimer().startTime(); + } + + Instant now() { + return getTimer().now(); + } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/Timer.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/Timer.java new file mode 100644 index 000000000000..123393dff19d --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/Timer.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +import java.time.Instant; + +public final class Timer { + + public static Timer start() { + return new Timer(Instant.now(), System.nanoTime()); + } + + private final Instant startTime; + private final long startNanoTime; + + private Timer(Instant startTime, long startNanoTime) { + this.startTime = startTime; + this.startNanoTime = startNanoTime; + } + + public Instant startTime() { + return startTime; + } + + public Instant now() { + long durationNanos = System.nanoTime() - startNanoTime; + return startTime().plusNanos(durationNanos); + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java index 238ac1e110fc..b94fb624d5eb 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/TracedDelegatingConsumer.java @@ -60,6 +60,12 @@ public void handleDelivery( throws IOException { Context parentContext = Context.current(); DeliveryRequest request = DeliveryRequest.create(queue, envelope, properties, body); + + if (!deliverInstrumenter().shouldStart(parentContext, request)) { + delegate.handleDelivery(consumerTag, envelope, properties, body); + return; + } + Context context = deliverInstrumenter().start(parentContext, request); try (Scope ignored = context.makeCurrent()) {