Skip to content

Commit

Permalink
Spring Kafka library instrumentation (open-telemetry#6283)
Browse files Browse the repository at this point in the history
* Spring Kafka library instrumentation

* Merge and fix prior merge

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
2 people authored and LironKS committed Oct 23, 2022
1 parent b9aee50 commit 81aec46
Show file tree
Hide file tree
Showing 17 changed files with 623 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {

bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap"))
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
implementation(project(":instrumentation:spring:spring-kafka-2.7:library"))

library("org.springframework.kafka:spring-kafka:2.7.0")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,17 @@

package io.opentelemetry.javaagent.instrumentation.spring.kafka;

import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.telemetry;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;

Expand Down Expand Up @@ -56,13 +53,13 @@ public static class GetBatchInterceptorAdvice {
public static <K, V> void onExit(
@Advice.Return(readOnly = false) BatchInterceptor<K, V> interceptor) {

if (!(interceptor instanceof InstrumentedBatchInterceptor)) {
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecords.class, Context.class);
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField =
VirtualField.find(ConsumerRecords.class, State.class);
interceptor =
new InstrumentedBatchInterceptor<>(receiveContextField, stateField, interceptor);
if (interceptor == null
|| !interceptor
.getClass()
.getName()
.equals(
"io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedBatchInterceptor")) {
interceptor = telemetry().createBatchInterceptor(interceptor);
}
}
}
Expand All @@ -74,13 +71,13 @@ public static class GetRecordInterceptorAdvice {
public static <K, V> void onExit(
@Advice.Return(readOnly = false) RecordInterceptor<K, V> interceptor) {

if (!(interceptor instanceof InstrumentedRecordInterceptor)) {
VirtualField<ConsumerRecord<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecord.class, Context.class);
VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField =
VirtualField.find(ConsumerRecord.class, State.class);
interceptor =
new InstrumentedRecordInterceptor<>(receiveContextField, stateField, interceptor);
if (interceptor == null
|| !interceptor
.getClass()
.getName()
.equals(
"io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedRecordInterceptor")) {
interceptor = telemetry().createRecordInterceptor(interceptor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,26 @@
package io.opentelemetry.javaagent.instrumentation.spring.kafka;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public final class SpringKafkaSingletons {

private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7";

private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER;
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER;

static {
KafkaInstrumenterFactory factory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.setPropagationEnabled(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE);
BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter();
PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter();
}

public static Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter() {
return BATCH_PROCESS_INSTRUMENTER;
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter() {
return PROCESS_INSTRUMENTER;
private static final SpringKafkaTelemetry TELEMETRY =
SpringKafkaTelemetry.builder(GlobalOpenTelemetry.get())
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
.setPropagationEnabled(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.build();

public static SpringKafkaTelemetry telemetry() {
return TELEMETRY;
}

private SpringKafkaSingletons() {}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,40 @@
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Collections.emptyList;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.opentelemetry.testing.AbstractSpringKafkaTest;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class SpringKafkaTest extends AbstractSpringKafkaTest {

@RegisterExtension
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@Override
protected InstrumentationExtension testing() {
return testing;
}

@Override
protected List<Class<?>> additionalSpringConfigs() {
return emptyList();
}

@Test
void shouldCreateSpansForSingleRecordProcess() {
testing.runWithSpan(
Expand Down
21 changes: 21 additions & 0 deletions instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
plugins {
id("otel.library-instrumentation")
}

dependencies {
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))

compileOnly("org.springframework.kafka:spring-kafka:2.7.0")

testImplementation(project(":instrumentation:spring:spring-kafka-2.7:testing"))
testImplementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library"))

// 2.7.0 has a bug that makes decorating a Kafka Producer impossible
testImplementation("org.springframework.kafka:spring-kafka:2.7.1")

testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3")
testLibrary("org.springframework.boot:spring-boot-starter:2.5.3")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,40 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.kafka;

import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.batchProcessInstrumenter;
package io.opentelemetry.instrumentation.spring.kafka.v2_7;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.listener.BatchInterceptor;

public final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {

private static final VirtualField<ConsumerRecords<?, ?>, Context> receiveContextField =
VirtualField.find(ConsumerRecords.class, Context.class);
private static final VirtualField<ConsumerRecords<?, ?>, State<ConsumerRecords<?, ?>>>
stateField = VirtualField.find(ConsumerRecords.class, State.class);

private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextField;
private final VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField;
private final Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter;
@Nullable private final BatchInterceptor<K, V> decorated;

public InstrumentedBatchInterceptor(
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField,
VirtualField<ConsumerRecords<K, V>, State<ConsumerRecords<K, V>>> stateField,
InstrumentedBatchInterceptor(
Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter,
@Nullable BatchInterceptor<K, V> decorated) {
this.receiveContextField = receiveContextField;
this.stateField = stateField;
this.batchProcessInstrumenter = batchProcessInstrumenter;
this.decorated = decorated;
}

@Override
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
Context parentContext = getParentContext(records);

if (batchProcessInstrumenter().shouldStart(parentContext, records)) {
Context context = batchProcessInstrumenter().start(parentContext, records);
if (batchProcessInstrumenter.shouldStart(parentContext, records)) {
Context context = batchProcessInstrumenter.start(parentContext, records);
Scope scope = context.makeCurrent();
stateField.set(records, State.create(records, context, scope));
}
Expand Down Expand Up @@ -67,11 +68,11 @@ public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer
}

private void end(ConsumerRecords<K, V> records, @Nullable Throwable error) {
State<ConsumerRecords<K, V>> state = stateField.get(records);
State<ConsumerRecords<?, ?>> state = stateField.get(records);
stateField.set(records, null);
if (state != null) {
state.scope().close();
batchProcessInstrumenter().end(state.context(), state.request(), null, error);
batchProcessInstrumenter.end(state.context(), state.request(), null, error);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,31 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.kafka;

import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter;
package io.opentelemetry.instrumentation.spring.kafka.v2_7;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;

public final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K, V> {

private static final VirtualField<ConsumerRecord<?, ?>, Context> receiveContextField =
VirtualField.find(ConsumerRecord.class, Context.class);
private static final VirtualField<ConsumerRecord<?, ?>, State<ConsumerRecord<?, ?>>> stateField =
VirtualField.find(ConsumerRecord.class, State.class);

private final VirtualField<ConsumerRecord<K, V>, Context> receiveContextField;
private final VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField;
private final Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter;
@Nullable private final RecordInterceptor<K, V> decorated;

public InstrumentedRecordInterceptor(
VirtualField<ConsumerRecord<K, V>, Context> receiveContextField,
VirtualField<ConsumerRecord<K, V>, State<ConsumerRecord<K, V>>> stateField,
InstrumentedRecordInterceptor(
Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter,
@Nullable RecordInterceptor<K, V> decorated) {
this.receiveContextField = receiveContextField;
this.stateField = stateField;
this.processInstrumenter = processInstrumenter;
this.decorated = decorated;
}

Expand All @@ -46,8 +47,8 @@ public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V
private void start(ConsumerRecord<K, V> record) {
Context parentContext = getParentContext(record);

if (processInstrumenter().shouldStart(parentContext, record)) {
Context context = processInstrumenter().start(parentContext, record);
if (processInstrumenter.shouldStart(parentContext, record)) {
Context context = processInstrumenter.start(parentContext, record);
Scope scope = context.makeCurrent();
stateField.set(record, State.create(record, context, scope));
}
Expand Down Expand Up @@ -77,11 +78,11 @@ public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K
}

private void end(ConsumerRecord<K, V> record, @Nullable Throwable error) {
State<ConsumerRecord<K, V>> state = stateField.get(record);
State<ConsumerRecord<?, ?>> state = stateField.get(record);
stateField.set(record, null);
if (state != null) {
state.scope().close();
processInstrumenter().end(state.context(), state.request(), null, error);
processInstrumenter.end(state.context(), state.request(), null, error);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.kafka;
package io.opentelemetry.instrumentation.spring.kafka.v2_7;

import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
Expand Down
Loading

0 comments on commit 81aec46

Please sign in to comment.