From 5885efda8c025c51754b6f77979cc3ce0773bb50 Mon Sep 17 00:00:00 2001 From: Srikanta <51379715+srnagar@users.noreply.github.com> Date: Thu, 19 Nov 2020 16:44:40 -0800 Subject: [PATCH] Add tracing support for Service Bus processor (#17684) * Add tracing support for SB processor * Make addContext packag-private * Resolve merge conflict --- .../servicebus/ServiceBusClientBuilder.java | 9 ++- .../servicebus/ServiceBusProcessorClient.java | 79 +++++++++++++++++++ .../servicebus/ServiceBusReceivedMessage.java | 19 +++++ .../ServiceBusProcessorClientOptions.java | 23 ++++++ .../servicebus/ServiceBusProcessorTest.java | 73 +++++++++++++++++ 5 files changed, 200 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 0f6d075668a63..db0c55e3d8873 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -646,9 +646,10 @@ public final class ServiceBusSessionProcessorClientBuilder { private ServiceBusSessionProcessorClientBuilder() { sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder(); - processorClientOptions = new ServiceBusProcessorClientOptions(); + processorClientOptions = new ServiceBusProcessorClientOptions() + .setMaxConcurrentCalls(1) + .setTracerProvider(tracerProvider); sessionReceiverClientBuilder.maxConcurrentSessions(1); - processorClientOptions.setMaxConcurrentCalls(1); } /** @@ -1101,7 +1102,9 @@ public final class ServiceBusProcessorClientBuilder { private ServiceBusProcessorClientBuilder() { serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder(); - processorClientOptions = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + processorClientOptions = new ServiceBusProcessorClientOptions() + .setMaxConcurrentCalls(1) + .setTracerProvider(tracerProvider); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 883de4a336c0c..a4223439393d4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -3,13 +3,21 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.tracing.ProcessKind; import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.publisher.Signal; import reactor.core.scheduler.Schedulers; +import java.io.Closeable; +import java.io.IOException; +import java.util.Locale; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -17,6 +25,16 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; +import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; +import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY; +import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY; +import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; +import static com.azure.core.util.tracing.Tracer.SCOPE_KEY; +import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE; +import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME; + /** * The processor client for processing Service Bus messages. {@link ServiceBusProcessorClient * ServiceBusProcessorClients} provides a push-based mechanism that invokes the message processing callback when a @@ -44,6 +62,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private final AtomicReference receiverSubscription = new AtomicReference<>(); private final AtomicReference asyncClient = new AtomicReference<>(); private final AtomicBoolean isRunning = new AtomicBoolean(); + private final TracerProvider tracerProvider; private ScheduledExecutorService scheduledExecutor; /** @@ -65,6 +84,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null"); this.asyncClient.set(sessionReceiverBuilder.buildAsyncClientForProcessor()); this.receiverBuilder = null; + this.tracerProvider = processorOptions.getTracerProvider(); } /** @@ -84,6 +104,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { this.processorOptions = Objects.requireNonNull(processorOptions, "'processorOptions' cannot be null"); this.asyncClient.set(receiverBuilder.buildAsyncClient()); this.sessionReceiverBuilder = null; + this.tracerProvider = processorOptions.getTracerProvider(); } /** @@ -164,12 +185,22 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) { if (serviceBusMessageContext.hasError()) { handleError(serviceBusMessageContext.getThrowable()); } else { + Context processSpanContext = null; try { ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext); + + processSpanContext = + startProcessTracingSpan(serviceBusMessageContext.getMessage(), + receiverClient.getEntityPath(), receiverClient.getFullyQualifiedNamespace()); + if (processSpanContext.getData(SPAN_CONTEXT_KEY).isPresent()) { + serviceBusMessageContext.getMessage().addContext(SPAN_CONTEXT_KEY, processSpanContext); + } processMessage.accept(serviceBusReceivedMessageContext); + endProcessTracingSpan(processSpanContext, Signal.complete()); } catch (Exception ex) { handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); + endProcessTracingSpan(processSpanContext, Signal.error(ex)); if (!processorOptions.isDisableAutoComplete()) { logger.warning("Error when processing message. Abandoning message.", ex); abandonMessage(serviceBusMessageContext, receiverClient); @@ -201,6 +232,54 @@ public void onComplete() { }); } + private void endProcessTracingSpan(Context processSpanContext, Signal signal) { + if (processSpanContext == null) { + return; + } + + Optional spanScope = processSpanContext.getData(SCOPE_KEY); + // Disposes of the scope when the trace span closes. + if (!spanScope.isPresent() || !tracerProvider.isEnabled()) { + return; + } + if (spanScope.get() instanceof Closeable) { + Closeable close = (Closeable) processSpanContext.getData(SCOPE_KEY).get(); + try { + close.close(); + tracerProvider.endSpan(processSpanContext, signal); + } catch (IOException ioException) { + logger.error("endTracingSpan().close() failed with an error %s", ioException); + } + + } else { + logger.warning(String.format(Locale.US, + "Process span scope type is not of type Closeable, but type: %s. Not closing the scope and span", + spanScope.get() != null ? spanScope.getClass() : "null")); + } + } + + private Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage, String entityPath, + String fullyQualifiedNamespace) { + + Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY); + if (diagnosticId == null || !tracerProvider.isEnabled()) { + return Context.NONE; + } + + Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE); + + spanContext = spanContext + .addData(ENTITY_PATH_KEY, entityPath) + .addData(HOST_NAME_KEY, fullyQualifiedNamespace) + .addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE); + spanContext = receivedMessage.getEnqueuedTime() == null + ? spanContext + : spanContext.addData(MESSAGE_ENQUEUED_TIME, + receivedMessage.getEnqueuedTime().toInstant().getEpochSecond()); + + return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, ProcessKind.PROCESS); + } + private void abandonMessage(ServiceBusMessageContext serviceBusMessageContext, ServiceBusReceiverAsyncClient receiverClient) { try { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java index 2b6b15213c099..f11dc60bc3302 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceivedMessage.java @@ -10,6 +10,7 @@ import com.azure.core.amqp.models.AmqpMessageBodyType; import com.azure.core.amqp.models.AmqpMessageId; import com.azure.core.experimental.util.BinaryData; +import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; @@ -40,10 +41,12 @@ public final class ServiceBusReceivedMessage { private final AmqpAnnotatedMessage amqpAnnotatedMessage; private UUID lockToken; private boolean isSettled = false; + private Context context; ServiceBusReceivedMessage(BinaryData body) { Objects.requireNonNull(body, "'body' cannot be null."); amqpAnnotatedMessage = new AmqpAnnotatedMessage(AmqpMessageBody.fromData(body.toBytes())); + context = Context.NONE; } /** @@ -438,6 +441,22 @@ public String getTo() { return to; } + /** + * Adds a new key value pair to the existing context on Message. + * + * @param key The key for this context object + * @param value The value for this context object. + * + * @return The updated {@link ServiceBusMessage}. + * @throws NullPointerException if {@code key} or {@code value} is null. + */ + ServiceBusReceivedMessage addContext(String key, Object value) { + Objects.requireNonNull(key, "The 'key' parameter cannot be null."); + Objects.requireNonNull(value, "The 'value' parameter cannot be null."); + this.context = context.addData(key, value); + return this; + } + /** * Gets whether the message has been settled. * diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/models/ServiceBusProcessorClientOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/models/ServiceBusProcessorClientOptions.java index 6c2c0ad35baf2..ce1b6094d02bf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/models/ServiceBusProcessorClientOptions.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/models/ServiceBusProcessorClientOptions.java @@ -3,6 +3,7 @@ package com.azure.messaging.servicebus.implementation.models; +import com.azure.core.amqp.implementation.TracerProvider; import com.azure.core.annotation.Fluent; import com.azure.messaging.servicebus.ServiceBusProcessorClient; @@ -15,6 +16,8 @@ public final class ServiceBusProcessorClientOptions { private int maxConcurrentCalls = 1; private boolean disableAutoComplete; + private TracerProvider tracerProvider; + /** * Returns true if the auto-complete and auto-abandon feature is disabled. * @return true if the auto-complete and auto-abandon feature is disabled. @@ -50,4 +53,24 @@ public ServiceBusProcessorClientOptions setMaxConcurrentCalls(int maxConcurrentC this.maxConcurrentCalls = maxConcurrentCalls; return this; } + + /** + * Returns the {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}. + * + * @return The {@link TracerProvider} instance that is used in {@link ServiceBusProcessorClient}. + */ + public TracerProvider getTracerProvider() { + return tracerProvider; + } + + /** + * Sets the {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}. + * + * @param tracerProvider The {@link TracerProvider} instance to use in {@link ServiceBusProcessorClient}. + * @return The updated instance of {@link ServiceBusProcessorClientOptions}. + */ + public ServiceBusProcessorClientOptions setTracerProvider(TracerProvider tracerProvider) { + this.tracerProvider = tracerProvider; + return this; + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java index 334b2929bb64d..56f5970ffd66b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorTest.java @@ -3,7 +3,11 @@ package com.azure.messaging.servicebus; +import com.azure.core.amqp.implementation.TracerProvider; import com.azure.core.experimental.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.tracing.ProcessKind; +import com.azure.core.util.tracing.Tracer; import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -11,7 +15,10 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import java.io.Closeable; +import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -19,9 +26,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY; +import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME; +import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY; +import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -319,6 +332,66 @@ public void testUserMessageHandlerErrorWithAutoCompleteDisabled() throws Interru verify(asyncClient, never()).abandon(any(ServiceBusReceivedMessage.class)); } + @Test + public void testProcessorWithTracingEnabled() throws InterruptedException { + final Tracer tracer = mock(Tracer.class); + final List tracers = Collections.singletonList(tracer); + TracerProvider tracerProvider = new TracerProvider(tracers); + + String diagnosticId = "00-08ee063508037b1719dddcbf248e30e2-1365c684eb25daed-01"; + + when(tracer.extractContext(eq(diagnosticId), any())).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + return passed.addData(SPAN_CONTEXT_KEY, "value"); + } + ); + when(tracer.start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS))).thenAnswer( + invocation -> { + Context passed = invocation.getArgument(1, Context.class); + assertTrue(passed.getData(MESSAGE_ENQUEUED_TIME).isPresent()); + return passed.addData(SPAN_CONTEXT_KEY, "value1").addData("scope", (Closeable) () -> { + return; + }).addData(PARENT_SPAN_KEY, "value2"); + } + ); + Flux messageFlux = + Flux.create(emitter -> { + for (int i = 0; i < 5; i++) { + ServiceBusReceivedMessage serviceBusReceivedMessage = + new ServiceBusReceivedMessage(BinaryData.fromString("hello")); + serviceBusReceivedMessage.setMessageId(String.valueOf(i)); + serviceBusReceivedMessage.setEnqueuedTime(OffsetDateTime.now()); + serviceBusReceivedMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, diagnosticId); + ServiceBusMessageContext serviceBusMessageContext = + new ServiceBusMessageContext(serviceBusReceivedMessage); + emitter.next(serviceBusMessageContext); + } + }); + + ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux); + + AtomicInteger messageId = new AtomicInteger(); + CountDownLatch countDownLatch = new CountDownLatch(5); + ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder, + messageContext -> { + assertEquals(String.valueOf(messageId.getAndIncrement()), messageContext.getMessage().getMessageId()); + countDownLatch.countDown(); + }, + error -> Assertions.fail("Error occurred when receiving messages from the processor"), + new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1).setTracerProvider(tracerProvider)); + + serviceBusProcessorClient.start(); + boolean success = countDownLatch.await(5, TimeUnit.SECONDS); + serviceBusProcessorClient.close(); + + assertTrue(success, "Failed to receive all expected messages"); + verify(tracer, times(5)).extractContext(eq(diagnosticId), any()); + verify(tracer, times(5)).start(eq("ServiceBus.process"), any(), eq(ProcessKind.PROCESS)); + verify(tracer, times(5)).end(eq("success"), isNull(), any()); + + } + private ServiceBusClientBuilder.ServiceBusReceiverClientBuilder getBuilder( Flux messageFlux) {