Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retreive OpenTelemetry instance through CDI injection instead of relying on GlobalOpenTelemetry.get #2540

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
Expand Down Expand Up @@ -121,6 +122,9 @@ public class AmqpConnector implements InboundConnector, OutboundConnector, Healt
@Any
private Instance<SSLContext> clientSslContexts;

@Inject
private Instance<OpenTelemetry> openTelemetryInstance;

private final List<AmqpClient> clients = new CopyOnWriteArrayList<>();

/**
Expand Down Expand Up @@ -230,7 +234,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
AmqpFailureHandler onNack = createFailureHandler(ic);

if (tracing && amqpInstrumenter == null) {
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector();
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector(openTelemetryInstance);
}

Multi<? extends Message<?>> multi = holder.getOrEstablishConnection()
Expand Down Expand Up @@ -318,7 +322,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
this,
holder,
oc,
getSender);
getSender,
openTelemetryInstance);
processors.put(oc.getChannel(), processor);

return MultiUtils.via(processor, m -> m.onFailure().invoke(t -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
Expand Down Expand Up @@ -57,7 +60,8 @@ public class AmqpCreditBasedSender implements Processor<Message<?>, Message<?>>,
private volatile boolean creditRetrievalInProgress = false;

public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender) {
AmqpConnectorOutgoingConfiguration configuration, Uni<AmqpSender> retrieveSender,
Instance<OpenTelemetry> openTelemetryInstance) {
this.connector = connector;
this.holder = holder;
this.retrieveSender = retrieveSender;
Expand All @@ -75,7 +79,7 @@ public AmqpCreditBasedSender(AmqpConnector connector, ConnectionHolder holder,
this.retryInterval = configuration.getReconnectInterval();

if (tracingEnabled) {
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender();
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForSender(openTelemetryInstance);
} else {
amqpInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.smallrye.reactive.messaging.amqp.tracing;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -19,20 +21,20 @@ private AmqpOpenTelemetryInstrumenter(Instrumenter<AmqpMessage<?>, Void> instrum
this.instrumenter = instrumenter;
}

public static AmqpOpenTelemetryInstrumenter createForConnector() {
return create(false);
public static AmqpOpenTelemetryInstrumenter createForConnector(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false);
}

public static AmqpOpenTelemetryInstrumenter createForSender() {
return create(true);
public static AmqpOpenTelemetryInstrumenter createForSender(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true);
}

private static AmqpOpenTelemetryInstrumenter create(boolean sender) {
private static AmqpOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean sender) {
MessageOperation messageOperation = sender ? MessageOperation.PUBLISH : MessageOperation.RECEIVE;
AmqpAttributesExtractor amqpAttributesExtractor = new AmqpAttributesExtractor();
MessagingAttributesGetter<AmqpMessage<?>, Void> messagingAttributesGetter = amqpAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
InstrumenterBuilder<AmqpMessage<?>, Void> builder = Instrumenter.builder(openTelemetry,
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -161,6 +162,9 @@ public SpanBuilder spanBuilder(final String spanName) {
@Any
Instance<KafkaFailureHandler.Factory> failureHandlerFactories;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Inject
KafkaCDIEvents kafkaCDIEvents;

Expand Down Expand Up @@ -209,7 +213,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
});

if (partitions == 1) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic,
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, -1);
Expand All @@ -231,7 +235,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
// create an instance of source per partitions.
List<Publisher<? extends Message<?>>> streams = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic,
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, group, ic, openTelemetryInstance,
commitHandlerFactories, failureHandlerFactories,
consumerRebalanceListeners,
kafkaCDIEvents, deserializationFailureHandlers, i);
Expand Down Expand Up @@ -268,7 +272,8 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
if (oc.getHealthReadinessTimeout().isPresent()) {
log.deprecatedConfig("health-readiness-timeout", "health-topic-verification-timeout");
}
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, serializationFailureHandlers, producerInterceptors);
KafkaSink sink = new KafkaSink(oc, kafkaCDIEvents, openTelemetryInstance,
serializationFailureHandlers, producerInterceptors);
sinks.add(sink);
return sink.getSink();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
Expand Down Expand Up @@ -78,7 +79,9 @@ public class KafkaSink {

private final KafkaOpenTelemetryInstrumenter kafkaInstrumenter;

public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents,
public KafkaSink(KafkaConnectorOutgoingConfiguration config,
KafkaCDIEvents kafkaCDIEvents,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<SerializationFailureHandler<?>> serializationFailureHandlers,
Instance<ProducerInterceptor<?, ?>> producerInterceptors) {
this.isTracingEnabled = config.getTracingEnabled();
Expand Down Expand Up @@ -134,7 +137,7 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk
}));

if (isTracingEnabled) {
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink();
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
} else {
kafkaInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Header;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class KafkaSource<K, V> {
public KafkaSource(Vertx vertx,
String consumerGroup,
KafkaConnectorIncomingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<KafkaCommitHandler.Factory> commitHandlerFactories,
Instance<KafkaFailureHandler.Factory> failureHandlerFactories,
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners,
Expand Down Expand Up @@ -227,7 +229,7 @@ public KafkaSource(Vertx vertx,
}

if (isTracingEnabled) {
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource();
kafkaInstrumenter = KafkaOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
} else {
kafkaInstrumenter = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Channel;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.EmitterFactory;
Expand Down Expand Up @@ -75,10 +76,13 @@ public class KafkaRequestReplyFactory implements EmitterFactory<KafkaRequestRepl
@Any
Instance<Map<String, Object>> configurations;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

@Override
public KafkaRequestReplyImpl<Object, Object> createEmitter(EmitterConfiguration configuration, long defaultBufferSize) {
return new KafkaRequestReplyImpl<>(configuration, defaultBufferSize, config.get(), configurations, holder.vertx(),
kafkaCDIEvents, commitStrategyFactories, failureStrategyFactories, failureHandlers,
kafkaCDIEvents, openTelemetryInstance, commitStrategyFactories, failureStrategyFactories, failureHandlers,
correlationIdHandlers, replyFailureHandlers, rebalanceListeners);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -80,6 +81,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
Instance<Map<String, Object>> configurations,
Vertx vertx,
KafkaCDIEvents kafkaCDIEvents,
Instance<OpenTelemetry> openTelemetryInstance,
Instance<KafkaCommitHandler.Factory> commitHandlerFactory,
Instance<KafkaFailureHandler.Factory> failureHandlerFactories,
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers,
Expand Down Expand Up @@ -116,7 +118,7 @@ public KafkaRequestReplyImpl(EmitterConfiguration config,
String consumerGroup = consumerConfig.getGroupId().orElseGet(() -> UUID.randomUUID().toString());
this.waitForPartitions = getWaitForPartitions(consumerConfig);
this.gracefulShutdown = consumerConfig.getGracefulShutdown();
this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig,
this.replySource = new KafkaSource<>(vertx, consumerGroup, consumerConfig, openTelemetryInstance,
commitHandlerFactory, failureHandlerFactories, rebalanceListeners, kafkaCDIEvents,
deserializationFailureHandlers, -1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.smallrye.reactive.messaging.kafka.tracing;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
Expand All @@ -23,22 +25,22 @@ private KafkaOpenTelemetryInstrumenter(Instrumenter<KafkaTrace, Void> instrument
this.instrumenter = instrumenter;
}

public static KafkaOpenTelemetryInstrumenter createForSource() {
return create(true);
public static KafkaOpenTelemetryInstrumenter createForSource(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true);
}

public static KafkaOpenTelemetryInstrumenter createForSink() {
return create(false);
public static KafkaOpenTelemetryInstrumenter createForSink(Instance<OpenTelemetry> openTelemetryInstance) {
return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false);
}

private static KafkaOpenTelemetryInstrumenter create(boolean source) {
private static KafkaOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) {

MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH;

KafkaAttributesExtractor kafkaAttributesExtractor = new KafkaAttributesExtractor();
MessagingAttributesGetter<KafkaTrace, Void> messagingAttributesGetter = kafkaAttributesExtractor
.getMessagingAttributesGetter();
InstrumenterBuilder<KafkaTrace, Void> builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
InstrumenterBuilder<KafkaTrace, Void> builder = Instrumenter.builder(openTelemetry,
"io.smallrye.reactive.messaging",
MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void testSinkUsingInteger() {
.with("channel-name", "testSinkUsingInteger");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand All @@ -84,6 +85,7 @@ public void testSinkUsingIntegerAndChannelName() {
.with("partition", 0);
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand All @@ -106,6 +108,7 @@ public void testSinkUsingString() {
.with("channel-name", "testSinkUsingString");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());

Flow.Subscriber<? extends Message<?>> subscriber = sink.getSink();
Expand Down Expand Up @@ -237,7 +240,8 @@ public void testInvalidPayloadType() {
.with("retries", 0L); // disable retry.
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
CountKafkaCdiEvents testCdiEvents = new CountKafkaCdiEvents();
sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());
sink = new KafkaSink(oc, testCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());

await().until(() -> {
HealthReport.HealthReportBuilder builder = HealthReport.builder();
Expand Down Expand Up @@ -286,8 +290,8 @@ public void testInvalidTypeWithDefaultInflightMessages() {
.with("retries", 0L)
.with("channel-name", "testInvalidTypeWithDefaultInflightMessages");
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance(),
UnsatisfiedInstance.instance());
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents,
UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance(), UnsatisfiedInstance.instance());

Flow.Subscriber subscriber = sink.getSink();
Multi.createFrom().range(0, 5)
Expand Down
Loading
Loading