diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts index 7eb196122d99..31414a6f28a1 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts @@ -17,6 +17,7 @@ dependencies { bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap")) implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) + implementation(project(":instrumentation:spring:spring-kafka-2.7:library")) library("org.springframework.kafka:spring-kafka:2.7.0") diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java index f9324fb69182..5a7a8ebbb11b 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java @@ -5,20 +5,17 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; +import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.telemetry; import static net.bytebuddy.matcher.ElementMatchers.isProtected; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.listener.BatchInterceptor; import org.springframework.kafka.listener.RecordInterceptor; @@ -56,13 +53,13 @@ public static class GetBatchInterceptorAdvice { public static void onExit( @Advice.Return(readOnly = false) BatchInterceptor interceptor) { - if (!(interceptor instanceof InstrumentedBatchInterceptor)) { - VirtualField, Context> receiveContextField = - VirtualField.find(ConsumerRecords.class, Context.class); - VirtualField, State>> stateField = - VirtualField.find(ConsumerRecords.class, State.class); - interceptor = - new InstrumentedBatchInterceptor<>(receiveContextField, stateField, interceptor); + if (interceptor == null + || !interceptor + .getClass() + .getName() + .equals( + "io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedBatchInterceptor")) { + interceptor = telemetry().createBatchInterceptor(interceptor); } } } @@ -74,13 +71,13 @@ public static class GetRecordInterceptorAdvice { public static void onExit( @Advice.Return(readOnly = false) RecordInterceptor interceptor) { - if (!(interceptor instanceof InstrumentedRecordInterceptor)) { - VirtualField, Context> receiveContextField = - VirtualField.find(ConsumerRecord.class, Context.class); - VirtualField, State>> stateField = - VirtualField.find(ConsumerRecord.class, State.class); - interceptor = - new InstrumentedRecordInterceptor<>(receiveContextField, stateField, interceptor); + if (interceptor == null + || !interceptor + .getClass() + .getName() + .equals( + "io.opentelemetry.instrumentation.spring.kafka.v2_7.InstrumentedRecordInterceptor")) { + interceptor = telemetry().createRecordInterceptor(interceptor); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java index f157950ea5a3..b4b58fb6892e 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java @@ -6,42 +6,26 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; public final class SpringKafkaSingletons { - private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; - - private static final Instrumenter, Void> BATCH_PROCESS_INSTRUMENTER; - private static final Instrumenter, Void> PROCESS_INSTRUMENTER; - - static { - KafkaInstrumenterFactory factory = - new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) - .setCaptureExperimentalSpanAttributes( - InstrumentationConfig.get() - .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) - .setPropagationEnabled( - InstrumentationConfig.get() - .getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true)) - .setMessagingReceiveInstrumentationEnabled( - ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) - .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE); - BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter(); - PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter(); - } - - public static Instrumenter, Void> batchProcessInstrumenter() { - return BATCH_PROCESS_INSTRUMENTER; - } - - public static Instrumenter, Void> processInstrumenter() { - return PROCESS_INSTRUMENTER; + private static final SpringKafkaTelemetry TELEMETRY = + SpringKafkaTelemetry.builder(GlobalOpenTelemetry.get()) + .setCaptureExperimentalSpanAttributes( + InstrumentationConfig.get() + .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) + .setPropagationEnabled( + InstrumentationConfig.get() + .getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true)) + .setMessagingReceiveInstrumentationEnabled( + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .build(); + + public static SpringKafkaTelemetry telemetry() { + return TELEMETRY; } private SpringKafkaSingletons() {} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java deleted file mode 100644 index f09bbe61c45b..000000000000 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.spring.kafka; - -import com.google.auto.value.AutoValue; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; - -@AutoValue -public abstract class State { - - public static State create(REQUEST request, Context context, Scope scope) { - return new AutoValue_State<>(request, context, scope); - } - - public abstract REQUEST request(); - - public abstract Context context(); - - public abstract Scope scope(); - - State() {} -} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaTest.java index f507cf70aaf1..a7621f3eac2c 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaTest.java @@ -9,22 +9,40 @@ import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Collections.emptyList; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.opentelemetry.testing.AbstractSpringKafkaTest; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; class SpringKafkaTest extends AbstractSpringKafkaTest { + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected List> additionalSpringConfigs() { + return emptyList(); + } + @Test void shouldCreateSpansForSingleRecordProcess() { testing.runWithSpan( diff --git a/instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts new file mode 100644 index 000000000000..93629588235c --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/library/build.gradle.kts @@ -0,0 +1,21 @@ +plugins { + id("otel.library-instrumentation") +} + +dependencies { + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + + implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) + + compileOnly("org.springframework.kafka:spring-kafka:2.7.0") + + testImplementation(project(":instrumentation:spring:spring-kafka-2.7:testing")) + testImplementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-2.6:library")) + + // 2.7.0 has a bug that makes decorating a Kafka Producer impossible + testImplementation("org.springframework.kafka:spring-kafka:2.7.1") + + testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3") + testLibrary("org.springframework.boot:spring-boot-starter:2.5.3") +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java similarity index 63% rename from instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java rename to instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java index f95d5afa2a14..a35cefd5f4c4 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedBatchInterceptor.java @@ -3,30 +3,31 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.spring.kafka; - -import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.batchProcessInstrumenter; +package io.opentelemetry.instrumentation.spring.kafka.v2_7; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.util.VirtualField; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.listener.BatchInterceptor; -public final class InstrumentedBatchInterceptor implements BatchInterceptor { +final class InstrumentedBatchInterceptor implements BatchInterceptor { + + private static final VirtualField, Context> receiveContextField = + VirtualField.find(ConsumerRecords.class, Context.class); + private static final VirtualField, State>> + stateField = VirtualField.find(ConsumerRecords.class, State.class); - private final VirtualField, Context> receiveContextField; - private final VirtualField, State>> stateField; + private final Instrumenter, Void> batchProcessInstrumenter; @Nullable private final BatchInterceptor decorated; - public InstrumentedBatchInterceptor( - VirtualField, Context> receiveContextField, - VirtualField, State>> stateField, + InstrumentedBatchInterceptor( + Instrumenter, Void> batchProcessInstrumenter, @Nullable BatchInterceptor decorated) { - this.receiveContextField = receiveContextField; - this.stateField = stateField; + this.batchProcessInstrumenter = batchProcessInstrumenter; this.decorated = decorated; } @@ -34,8 +35,8 @@ public InstrumentedBatchInterceptor( public ConsumerRecords intercept(ConsumerRecords records, Consumer consumer) { Context parentContext = getParentContext(records); - if (batchProcessInstrumenter().shouldStart(parentContext, records)) { - Context context = batchProcessInstrumenter().start(parentContext, records); + if (batchProcessInstrumenter.shouldStart(parentContext, records)) { + Context context = batchProcessInstrumenter.start(parentContext, records); Scope scope = context.makeCurrent(); stateField.set(records, State.create(records, context, scope)); } @@ -67,11 +68,11 @@ public void failure(ConsumerRecords records, Exception exception, Consumer } private void end(ConsumerRecords records, @Nullable Throwable error) { - State> state = stateField.get(records); + State> state = stateField.get(records); stateField.set(records, null); if (state != null) { state.scope().close(); - batchProcessInstrumenter().end(state.context(), state.request(), null, error); + batchProcessInstrumenter.end(state.context(), state.request(), null, error); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java similarity index 67% rename from instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java rename to instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java index f3c3719108f6..dccb77578e2b 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedRecordInterceptor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/InstrumentedRecordInterceptor.java @@ -3,30 +3,31 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.spring.kafka; - -import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter; +package io.opentelemetry.instrumentation.spring.kafka.v2_7; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.util.VirtualField; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.RecordInterceptor; -public final class InstrumentedRecordInterceptor implements RecordInterceptor { +final class InstrumentedRecordInterceptor implements RecordInterceptor { + + private static final VirtualField, Context> receiveContextField = + VirtualField.find(ConsumerRecord.class, Context.class); + private static final VirtualField, State>> stateField = + VirtualField.find(ConsumerRecord.class, State.class); - private final VirtualField, Context> receiveContextField; - private final VirtualField, State>> stateField; + private final Instrumenter, Void> processInstrumenter; @Nullable private final RecordInterceptor decorated; - public InstrumentedRecordInterceptor( - VirtualField, Context> receiveContextField, - VirtualField, State>> stateField, + InstrumentedRecordInterceptor( + Instrumenter, Void> processInstrumenter, @Nullable RecordInterceptor decorated) { - this.receiveContextField = receiveContextField; - this.stateField = stateField; + this.processInstrumenter = processInstrumenter; this.decorated = decorated; } @@ -46,8 +47,8 @@ public ConsumerRecord intercept(ConsumerRecord record, Consumer record) { Context parentContext = getParentContext(record); - if (processInstrumenter().shouldStart(parentContext, record)) { - Context context = processInstrumenter().start(parentContext, record); + if (processInstrumenter.shouldStart(parentContext, record)) { + Context context = processInstrumenter.start(parentContext, record); Scope scope = context.makeCurrent(); stateField.set(record, State.create(record, context, scope)); } @@ -77,11 +78,11 @@ public void failure(ConsumerRecord record, Exception exception, Consumer record, @Nullable Throwable error) { - State> state = stateField.get(record); + State> state = stateField.get(record); stateField.set(record, null); if (state != null) { state.scope().close(); - processInstrumenter().end(state.context(), state.request(), null, error); + processInstrumenter.end(state.context(), state.request(), null, error); } } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaErrorCauseExtractor.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java similarity index 89% rename from instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaErrorCauseExtractor.java rename to instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java index a495bb4d6934..c15ba829244c 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaErrorCauseExtractor.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaErrorCauseExtractor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.spring.kafka; +package io.opentelemetry.instrumentation.spring.kafka.v2_7; import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; import org.springframework.kafka.listener.ListenerExecutionFailedException; diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java new file mode 100644 index 000000000000..085bc7b1835a --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetry.java @@ -0,0 +1,84 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.kafka.v2_7; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.kafka.listener.BatchInterceptor; +import org.springframework.kafka.listener.RecordInterceptor; + +/** Entrypoint for instrumenting Spring Kafka listeners. */ +public final class SpringKafkaTelemetry { + + /** Returns a new {@link SpringKafkaTelemetry} configured with the given {@link OpenTelemetry}. */ + public static SpringKafkaTelemetry create(OpenTelemetry openTelemetry) { + return builder(openTelemetry).build(); + } + + /** + * Returns a new {@link SpringKafkaTelemetryBuilder} configured with the given {@link + * OpenTelemetry}. + */ + public static SpringKafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) { + return new SpringKafkaTelemetryBuilder(openTelemetry); + } + + private final Instrumenter, Void> processInstrumenter; + private final Instrumenter, Void> batchProcessInstrumenter; + + SpringKafkaTelemetry( + Instrumenter, Void> processInstrumenter, + Instrumenter, Void> batchProcessInstrumenter) { + this.processInstrumenter = processInstrumenter; + this.batchProcessInstrumenter = batchProcessInstrumenter; + } + + /** + * Returns a new {@link RecordInterceptor} that decorates a message listener with a {@link + * SpanKind#CONSUMER CONSUMER} span. Can be set on a {@link AbstractMessageListenerContainer} + * using the {@link AbstractMessageListenerContainer#setRecordInterceptor(RecordInterceptor)} + * method. + */ + public RecordInterceptor createRecordInterceptor() { + return createRecordInterceptor(null); + } + + /** + * Returns a new {@link RecordInterceptor} that decorates a message listener with a {@link + * SpanKind#CONSUMER CONSUMER} span, and then delegates to a provided {@code + * decoratedInterceptor}. Can be set on a {@link AbstractMessageListenerContainer} using the + * {@link AbstractMessageListenerContainer#setRecordInterceptor(RecordInterceptor)} method. + */ + public RecordInterceptor createRecordInterceptor( + RecordInterceptor decoratedInterceptor) { + return new InstrumentedRecordInterceptor<>(processInstrumenter, decoratedInterceptor); + } + + /** + * Returns a new {@link BatchInterceptor} that decorates a message listener with a {@link + * SpanKind#CONSUMER CONSUMER} span. Can be set on a {@link AbstractMessageListenerContainer} + * using the {@link AbstractMessageListenerContainer#setBatchInterceptor(BatchInterceptor)} + * method. + */ + public BatchInterceptor createBatchInterceptor() { + return createBatchInterceptor(null); + } + + /** + * Returns a new {@link BatchInterceptor} that decorates a message listener with a {@link + * SpanKind#CONSUMER CONSUMER} span, and then delegates to a provided {@code + * decoratedInterceptor}. Can be set on a {@link AbstractMessageListenerContainer} using the + * {@link AbstractMessageListenerContainer#setBatchInterceptor(BatchInterceptor)} method. + */ + public BatchInterceptor createBatchInterceptor( + BatchInterceptor decoratedInterceptor) { + return new InstrumentedBatchInterceptor<>(batchProcessInstrumenter, decoratedInterceptor); + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java new file mode 100644 index 000000000000..4840daae9c86 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.kafka.v2_7; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; + +/** A builder of {@link SpringKafkaTelemetry}. */ +public final class SpringKafkaTelemetryBuilder { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; + + private final OpenTelemetry openTelemetry; + private boolean captureExperimentalSpanAttributes = false; + private boolean propagationEnabled = true; + private boolean messagingReceiveInstrumentationEnabled = false; + + SpringKafkaTelemetryBuilder(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + public SpringKafkaTelemetryBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + public SpringKafkaTelemetryBuilder setPropagationEnabled(boolean propagationEnabled) { + this.propagationEnabled = propagationEnabled; + return this; + } + + public SpringKafkaTelemetryBuilder setMessagingReceiveInstrumentationEnabled( + boolean messagingReceiveInstrumentationEnabled) { + this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled; + return this; + } + + /** + * Returns a new {@link SpringKafkaTelemetry} with the settings of this {@link + * SpringKafkaTelemetryBuilder}. + */ + public SpringKafkaTelemetry build() { + KafkaInstrumenterFactory factory = + new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME) + .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) + .setPropagationEnabled(propagationEnabled) + .setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled) + .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE); + + return new SpringKafkaTelemetry( + factory.createConsumerProcessInstrumenter(), factory.createBatchProcessInstrumenter()); + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java new file mode 100644 index 000000000000..47dbf0a3758b --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/State.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.kafka.v2_7; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +@AutoValue +abstract class State { + + static State create(REQUEST request, Context context, Scope scope) { + return new AutoValue_State<>(request, context, scope); + } + + abstract REQUEST request(); + + abstract Context context(); + + abstract Scope scope(); + + State() {} +} diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java new file mode 100644 index 000000000000..2f051c866ba8 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/library/src/test/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaNoReceiveTelemetryTest.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.spring.kafka.v2_7; + +import static java.util.Collections.singletonList; + +import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.testing.AbstractSpringKafkaNoReceiveTelemetryTest; +import java.util.List; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ContainerCustomizer; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; + +class SpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaNoReceiveTelemetryTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected List> additionalSpringConfigs() { + return singletonList(KafkaInstrumentationConfig.class); + } + + @Configuration + public static class KafkaInstrumentationConfig { + + @Bean + public DefaultKafkaProducerFactoryCustomizer producerInstrumentation() { + KafkaTelemetry kafkaTelemetry = KafkaTelemetry.create(testing.getOpenTelemetry()); + return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap); + } + + @Bean + public ContainerCustomizer> + listenerCustomizer() { + SpringKafkaTelemetry springKafkaTelemetry = + SpringKafkaTelemetry.create(testing.getOpenTelemetry()); + return container -> { + container.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor()); + container.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor()); + }; + } + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java new file mode 100644 index 000000000000..cebff1b4d8ac --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaNoReceiveTelemetryTest.java @@ -0,0 +1,220 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.testing; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.Test; + +public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaTest { + + @Test + void shouldCreateSpansForSingleRecordProcess() { + testing() + .runWithSpan( + "producer", + () -> { + kafkaTemplate.executeInTransaction( + ops -> { + ops.send("testSingleTopic", "10", "testSpan"); + return 0; + }); + }); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @Test + void shouldHandleFailureInKafkaListener() { + testing() + .runWithSpan( + "producer", + () -> { + kafkaTemplate.executeInTransaction( + ops -> { + ops.send("testSingleTopic", "10", "error"); + return 0; + }); + }); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testSingleTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testSingleTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION, "testSingleTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("consumer").hasParent(trace.getSpan(2)))); + } + + @Test + void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { + Map batchMessages = new HashMap<>(); + batchMessages.put("10", "testSpan1"); + batchMessages.put("20", "testSpan2"); + sendBatchMessages(batchMessages); + + AtomicReference producer1 = new AtomicReference<>(); + AtomicReference producer2 = new AtomicReference<>(); + + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"))); + + producer1.set(trace.getSpan(1)); + producer2.set(trace.getSpan(2)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinksSatisfying( + links( + producer1.get().getSpanContext(), + producer2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")), + span -> span.hasName("consumer").hasParent(trace.getSpan(0)))); + } + + @Test + void shouldHandleFailureInKafkaBatchListener() { + testing() + .runWithSpan( + "producer", + () -> { + kafkaTemplate.executeInTransaction( + ops -> { + ops.send("testBatchTopic", "10", "error"); + return 0; + }); + }); + + AtomicReference producer = new AtomicReference<>(); + + testing() + .waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinksSatisfying(links(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")), + span -> span.hasName("consumer").hasParent(trace.getSpan(0)))); + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java index 952061b1bd54..419d42ee8b28 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/AbstractSpringKafkaTest.java @@ -5,14 +5,20 @@ package io.opentelemetry.testing; -import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.LinkData; import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; @@ -26,22 +32,32 @@ public abstract class AbstractSpringKafkaTest { private static final Logger logger = LoggerFactory.getLogger(AbstractSpringKafkaTest.class); - @RegisterExtension - protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); - static KafkaContainer kafka; - static ConfigurableApplicationContext applicationContext; - protected static KafkaTemplate kafkaTemplate; - @SuppressWarnings("unchecked") + ConfigurableApplicationContext applicationContext; + protected KafkaTemplate kafkaTemplate; + @BeforeAll - static void setUp() { + static void setUpKafka() { kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) .withStartupTimeout(Duration.ofMinutes(1)); kafka.start(); + } + + @AfterAll + static void tearDownKafka() { + kafka.stop(); + } + + protected abstract InstrumentationExtension testing(); + protected abstract List> additionalSpringConfigs(); + + @SuppressWarnings("unchecked") + @BeforeEach + void setUpApp() { Map props = new HashMap<>(); props.put("spring.jmx.enabled", false); props.put("spring.main.web-application-type", "none"); @@ -53,16 +69,16 @@ static void setUp() { props.put("spring.kafka.producer.transaction-id-prefix", "test-"); SpringApplication app = new SpringApplication(ConsumerConfig.class); + app.addPrimarySources(additionalSpringConfigs()); app.setDefaultProperties(props); applicationContext = app.run(); kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate.class); } - @AfterAll - static void tearDown() { - kafka.stop(); + @AfterEach + void tearDownApp() { if (applicationContext != null) { - applicationContext.stop(); + applicationContext.close(); } } @@ -75,25 +91,45 @@ protected void sendBatchMessages(Map keyToData) throws Interrupt for (int i = 1; i <= maxAttempts; i++) { BatchRecordListener.reset(); - testing.runWithSpan( - "producer", - () -> { - kafkaTemplate.executeInTransaction( - ops -> { - keyToData.forEach((key, data) -> ops.send("testBatchTopic", key, data)); - return 0; - }); - }); + testing() + .runWithSpan( + "producer", + () -> { + kafkaTemplate.executeInTransaction( + ops -> { + keyToData.forEach((key, data) -> ops.send("testBatchTopic", key, data)); + return 0; + }); + }); BatchRecordListener.waitForMessages(); if (BatchRecordListener.getLastBatchSize() == 2) { break; } else if (i < maxAttempts) { - testing.waitForTraces(2); + testing().waitForTraces(2); Thread.sleep(1_000); // sleep a bit to give time for all the spans to arrive - testing.clearData(); + testing().clearData(); logger.info("Messages weren't received as batch, retrying"); } } } + + protected static Consumer> links(SpanContext... spanContexts) { + return links -> { + assertThat(links).hasSize(spanContexts.length); + for (SpanContext spanContext : spanContexts) { + assertThat(links) + .anySatisfy( + link -> { + assertThat(link.getSpanContext().getTraceId()) + .isEqualTo(spanContext.getTraceId()); + assertThat(link.getSpanContext().getSpanId()).isEqualTo(spanContext.getSpanId()); + assertThat(link.getSpanContext().getTraceFlags()) + .isEqualTo(spanContext.getTraceFlags()); + assertThat(link.getSpanContext().getTraceState()) + .isEqualTo(spanContext.getTraceState()); + }); + } + }; + } } diff --git a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java index 26ec9452997a..f24070bc1bda 100644 --- a/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java +++ b/instrumentation/spring/spring-kafka-2.7/testing/src/main/java/io/opentelemetry/testing/ConsumerConfig.java @@ -6,12 +6,15 @@ package io.opentelemetry.testing; import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.ContainerCustomizer; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; @SpringBootConfiguration @EnableAutoConfiguration @@ -39,7 +42,11 @@ public SingleRecordListener singleRecordListener() { @Bean public ConcurrentKafkaListenerContainerFactory batchFactory( - ConsumerFactory consumerFactory) { + ConsumerFactory consumerFactory, + ObjectProvider< + ContainerCustomizer< + String, String, ConcurrentMessageListenerContainer>> + customizerProvider) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // do not retry failed records @@ -47,12 +54,17 @@ public ConcurrentKafkaListenerContainerFactory batchFactory( factory.setConsumerFactory(consumerFactory); factory.setBatchListener(true); factory.setAutoStartup(true); + customizerProvider.ifAvailable(factory::setContainerCustomizer); return factory; } @Bean public ConcurrentKafkaListenerContainerFactory singleFactory( - ConsumerFactory consumerFactory) { + ConsumerFactory consumerFactory, + ObjectProvider< + ContainerCustomizer< + String, String, ConcurrentMessageListenerContainer>> + customizerProvider) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // do not retry failed records @@ -60,6 +72,7 @@ public ConcurrentKafkaListenerContainerFactory singleFactory( factory.setConsumerFactory(consumerFactory); factory.setBatchListener(false); factory.setAutoStartup(true); + customizerProvider.ifAvailable(factory::setContainerCustomizer); return factory; } } diff --git a/settings.gradle.kts b/settings.gradle.kts index 00533179fbbc..5c40053e99bd 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -431,6 +431,7 @@ include(":instrumentation:spring:spring-integration-4.1:library") include(":instrumentation:spring:spring-integration-4.1:testing") include(":instrumentation:spring:spring-jms-2.0:javaagent") include(":instrumentation:spring:spring-kafka-2.7:javaagent") +include(":instrumentation:spring:spring-kafka-2.7:library") include(":instrumentation:spring:spring-kafka-2.7:testing") include(":instrumentation:spring:spring-rabbit-1.0:javaagent") include(":instrumentation:spring:spring-rmi-4.0:javaagent")