Skip to content

Commit

Permalink
Conditionally create CONSUMER spans instead of always INTERNAL
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek committed Jun 15, 2021
1 parent 83de65e commit 9434914
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.opentelemetry.instrumentation.api.InstrumentationVersion;
import io.opentelemetry.instrumentation.api.internal.SupportabilityMetrics;
import io.opentelemetry.instrumentation.api.tracer.ClientSpan;
import io.opentelemetry.instrumentation.api.tracer.ConsumerSpan;
import io.opentelemetry.instrumentation.api.tracer.ServerSpan;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -89,10 +90,11 @@ public boolean shouldStart(Context parentContext, REQUEST request) {
SpanKind spanKind = spanKindExtractor.extract(request);
switch (spanKind) {
case SERVER:
suppressed = ServerSpan.fromContextOrNull(parentContext) != null;
case CONSUMER:
suppressed = ServerSpan.exists(parentContext) || ConsumerSpan.exists(parentContext);
break;
case CLIENT:
suppressed = ClientSpan.fromContextOrNull(parentContext) != null;
suppressed = ClientSpan.exists(parentContext);
break;
default:
break;
Expand Down Expand Up @@ -146,6 +148,8 @@ public Context start(Context parentContext, REQUEST request) {
return ServerSpan.with(context, span);
case CLIENT:
return ClientSpan.with(context, span);
case CONSUMER:
return ConsumerSpan.with(context, span);
default:
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ public final boolean shouldStartSpan(Context context, SpanKind proposedKind) {
boolean suppressed = false;
switch (proposedKind) {
case CLIENT:
suppressed = inClientSpan(context);
suppressed = ClientSpan.exists(context);
break;
case SERVER:
suppressed = inServerSpan(context);
case CONSUMER:
suppressed = ServerSpan.exists(context) || ConsumerSpan.exists(context);
break;
default:
break;
Expand All @@ -120,14 +121,6 @@ public final boolean shouldStartSpan(Context context, SpanKind proposedKind) {
return !suppressed;
}

private static boolean inClientSpan(Context context) {
return ClientSpan.fromContextOrNull(context) != null;
}

private static boolean inServerSpan(Context context) {
return ServerSpan.fromContextOrNull(context) != null;
}

/**
* Returns a {@link Context} inheriting from {@code Context.current()} that contains a new span
* with name {@code spanName} and kind {@link SpanKind#INTERNAL}.
Expand Down Expand Up @@ -178,6 +171,16 @@ protected final Context withServerSpan(Context parentContext, Span span) {
return ServerSpan.with(parentContext.with(span), span);
}

/**
* Returns a {@link Context} containing the passed {@code span} marked as the current {@link
* SpanKind#CONSUMER} span.
*
* @see #shouldStartSpan(Context, SpanKind)
*/
protected final Context withConsumerSpan(Context parentContext, Span span) {
return ConsumerSpan.with(parentContext.with(span), span);
}

/**
* This method is used to generate an acceptable span (operation) name based on a given method
* reference. Anonymous classes are named based on their parent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@
import io.opentelemetry.context.ContextKey;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* This class encapsulates the context key for storing the current {@link SpanKind#CLIENT span in
* the {@link Context}.
*/
public final class ClientSpan {
// Keeps track of the client span in a subtree corresponding to a client request.
private static final ContextKey<Span> KEY =
ContextKey.named("opentelemetry-traces-client-span-key");

/** Returns true when a {@link SpanKind#CLIENT} span is present in the passed {@code context}. */
public static boolean exists(Context context) {
return fromContextOrNull(context) != null;
}

/**
* Returns span of type {@link SpanKind#CLIENT} from the given context or {@code null} if not
* found.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.tracer;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* This class encapsulates the context key for storing the current {@link SpanKind#CONSUMER} span in
* the {@link Context}.
*/
public final class ConsumerSpan {
// Keeps track of the consumer span for the current trace.
private static final ContextKey<Span> KEY =
ContextKey.named("opentelemetry-traces-consumer-span-key");

/**
* Returns true when a {@link SpanKind#CONSUMER} span is present in the passed {@code context}.
*/
public static boolean exists(Context context) {
return fromContextOrNull(context) != null;
}

/**
* Returns span of type {@link SpanKind#CONSUMER} from the given context or {@code null} if not
* found.
*/
@Nullable
public static Span fromContextOrNull(Context context) {
return context.get(KEY);
}

public static Context with(Context context, Span consumerSpan) {
return context.with(KEY, consumerSpan);
}

private ConsumerSpan() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public final class ServerSpan {
private static final ContextKey<Span> KEY =
ContextKey.named("opentelemetry-traces-server-span-key");

/** Returns true when a {@link SpanKind#SERVER} span is present in the passed {@code context}. */
public static boolean exists(Context context) {
return fromContextOrNull(context) != null;
}

/**
* Returns span of type {@link SpanKind#SERVER} from the given context or {@code null} if not
* found.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class BaseTracerTest extends Specification {

where:
kind | context | expected
SpanKind.CLIENT | root | true
SpanKind.CLIENT | root | true
SpanKind.SERVER | root | true
SpanKind.INTERNAL | root | true
SpanKind.PRODUCER | root | true
Expand All @@ -54,7 +54,7 @@ class BaseTracerTest extends Specification {
SpanKind.PRODUCER | tracer.withClientSpan(root, existingSpan) | true
SpanKind.SERVER | tracer.withServerSpan(root, existingSpan) | false
SpanKind.INTERNAL | tracer.withServerSpan(root, existingSpan) | true
SpanKind.CONSUMER | tracer.withServerSpan(root, existingSpan) | true
SpanKind.CONSUMER | tracer.withServerSpan(root, existingSpan) | false
SpanKind.PRODUCER | tracer.withServerSpan(root, existingSpan) | true
SpanKind.CLIENT | tracer.withServerSpan(root, existingSpan) | true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Context startSpan(ConsumerRecord<?, ?> record) {
.startSpan();

onConsume(span, now, record);
return parentContext.with(span);
return withConsumerSpan(parentContext, span);
}

private Context extractParent(ConsumerRecord<?, ?> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public Context startSpan(StampedRecord record) {
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.startSpan();
onConsume(span, record);
return parentContext.with(span);
return withConsumerSpan(parentContext, span);
}

public String spanNameForConsume(StampedRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Context startDeliverySpan(
"rabbitmq.record.queue_time_ms", Math.max(0L, startTimeMillis - produceTimeMillis));
}

return parentContext.with(span);
return withConsumerSpan(parentContext, span);
}

public void onPublish(Span span, String exchange, String routingKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ protected String getInstrumentationName() {
Context startSpan(Context parentContext, List<MessageExt> msgs) {
if (msgs.size() == 1) {
SpanBuilder spanBuilder = startSpanBuilder(extractParent(msgs.get(0)), msgs.get(0));
return parentContext.with(spanBuilder.startSpan());
return withConsumerSpan(parentContext, spanBuilder.startSpan());
} else {
SpanBuilder spanBuilder =
spanBuilder(parentContext, "multiple_sources receive", CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive");
Context rootContext = parentContext.with(spanBuilder.startSpan());
Context rootContext = withConsumerSpan(parentContext, spanBuilder.startSpan());
for (MessageExt message : msgs) {
createChildSpan(rootContext, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ apply from: "$rootDir/gradle/instrumentation.gradle"

ext {
// context "leak" here is intentional: spring-integration instrumentation will always override
// "local" span context with one extracted from the incoming message
// "local" span context with one extracted from the incoming message when it decides to start a
// CONSUMER span
failOnContextLeak = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.api.trace.SpanKind.SERVER
import static io.opentelemetry.instrumentation.test.server.ServerTraceUtils.runUnderServerTrace

import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification

Expand All @@ -20,16 +22,20 @@ class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification i

def "should cooperate with existing RabbitMQ instrumentation"() {
when:
producerContext.getBean("producer", Runnable).run()
// simulate the workflow being triggered by HTTP request
runUnderServerTrace("HTTP GET") {
producerContext.getBean("producer", Runnable).run()
}

then:
assertTraces(2) {
trace(0, 7) {
span(0) {
name "producer"
name "HTTP GET"
kind SERVER
}
span(1) {
name "testProducer.output"
name "producer"
childOf span(0)
}
span(2) {
Expand All @@ -42,6 +48,9 @@ class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification i
childOf span(1)
kind PRODUCER
}
// spring-cloud-stream-binder-rabbit listener puts all messages into a BlockingQueue immediately after receiving
// that's why the rabbitmq CONSUMER span will never have any child span (and propagate context, actually)
// and that's why spring-integration creates another CONSUMER span
span(4) {
name ~/testTopic.anonymous.[-\w]+ process/
childOf span(3)
Expand All @@ -50,6 +59,7 @@ class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification i
span(5) {
name "testConsumer.input"
childOf span(3)
kind CONSUMER
}
span(6) {
name "consumer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;

/** Entrypoint for tracing Spring Integration {@link MessageChannel}s. */
/** Entrypoint for instrumenting Spring Integration {@link MessageChannel}s. */
public final class SpringIntegrationTracing {

/**
Expand Down Expand Up @@ -40,8 +40,9 @@ public static SpringIntegrationTracingBuilder newBuilder(OpenTelemetry openTelem
}

/**
* Returns a new {@link ChannelInterceptor} that traces {@link MessageChannel#send(Message)} calls
* and propagates context through {@link Message}s.
* Returns a new {@link ChannelInterceptor} that propagates context through {@link Message}s and
* when no other messaging instrumentation is detected, traces {@link
* MessageChannel#send(Message)} calls.
*
* @see org.springframework.integration.channel.ChannelInterceptorAware
* @see org.springframework.messaging.support.InterceptableChannel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package io.opentelemetry.instrumentation.spring.integration;

import static io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor.alwaysInternal;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
Expand Down Expand Up @@ -44,7 +42,7 @@ public SpringIntegrationTracing build() {
Instrumenter.<MessageWithChannel, Void>newBuilder(
openTelemetry, INSTRUMENTATION_NAME, new MessageChannelSpanNameExtractor())
.addAttributesExtractors(additionalAttributeExtractors)
.newUpstreamPropagatingInstrumenter(alwaysInternal(), MessageHeadersGetter.INSTANCE);
.newConsumerInstrumenter(MessageHeadersGetter.INSTANCE);
return new SpringIntegrationTracing(openTelemetry.getPropagators(), instrumenter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,28 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
Context parentContext = Context.current();
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);

if (!instrumenter.shouldStart(parentContext, messageWithChannel)) {
return message;
final Context context;
MessageHeaderAccessor messageHeaderAccessor = createMutableHeaderAccessor(message);

// when there's no CONSUMER span created by another instrumentation, start it; there's no other
// messaging instrumentation that can do this, so spring-integration must ensure proper context
// propagation
// the new CONSUMER span will use the span context extracted from the incoming message as the
// parent
if (instrumenter.shouldStart(parentContext, messageWithChannel)) {
context = instrumenter.start(parentContext, messageWithChannel);
messageHeaderAccessor.setHeader(CONTEXT_AND_SCOPE_KEY, ContextAndScope.makeCurrent(context));
} else {
// if there was a top-level span detected it means that there's another messaging
// instrumentation that creates CONSUMER/PRODUCER spans; in that case, back off and just
// inject the current context into the message
context = parentContext;
messageHeaderAccessor.setHeader(SCOPE_KEY, context.makeCurrent());
}
Context context = instrumenter.start(parentContext, messageWithChannel);

MessageHeaderAccessor messageHeaderAccessor = createMutableHeaderAccessor(message);
propagators
.getTextMapPropagator()
.inject(context, messageHeaderAccessor, MessageHeadersSetter.INSTANCE);
messageHeaderAccessor.setHeader(CONTEXT_AND_SCOPE_KEY, ContextAndScope.makeCurrent(context));
return createMessageWithHeaders(message, messageHeaderAccessor);
}

Expand All @@ -57,6 +69,12 @@ public void postSend(Message<?> message, MessageChannel messageChannel, boolean
@Override
public void afterSendCompletion(
Message<?> message, MessageChannel messageChannel, boolean sent, Exception e) {
endConsumerSpanIfPresent(message, messageChannel, e);
closeScopeIfPresent(message);
}

private void endConsumerSpanIfPresent(
Message<?> message, MessageChannel messageChannel, Exception e) {
ContextAndScope contextAndScope =
message.getHeaders().get(CONTEXT_AND_SCOPE_KEY, ContextAndScope.class);
if (contextAndScope != null) {
Expand All @@ -66,6 +84,13 @@ public void afterSendCompletion(
}
}

private static void closeScopeIfPresent(Message<?> message) {
Scope scope = message.getHeaders().get(SCOPE_KEY, Scope.class);
if (scope != null) {
scope.close();
}
}

@Override
public boolean preReceive(MessageChannel messageChannel) {
return true;
Expand All @@ -88,19 +113,15 @@ public Message<?> beforeHandle(
propagators
.getTextMapPropagator()
.extract(Context.current(), messageWithChannel, MessageHeadersGetter.INSTANCE);
Scope scope = context.makeCurrent();
MessageHeaderAccessor messageHeaderAccessor = MessageHeaderAccessor.getMutableAccessor(message);
messageHeaderAccessor.setHeader(SCOPE_KEY, scope);
messageHeaderAccessor.setHeader(SCOPE_KEY, context.makeCurrent());
return createMessageWithHeaders(message, messageHeaderAccessor);
}

@Override
public void afterMessageHandled(
Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
Scope scope = message.getHeaders().get(SCOPE_KEY, Scope.class);
if (scope != null) {
scope.close();
}
closeScopeIfPresent(message);
}

private static MessageHeaderAccessor createMutableHeaderAccessor(Message<?> message) {
Expand Down
Loading

0 comments on commit 9434914

Please sign in to comment.