Skip to content

Commit

Permalink
[Service Bus] Prepare tracing methods for message processor and sched…
Browse files Browse the repository at this point in the history
…uleMessage (#16524)
  • Loading branch information
YijunXieMS authored Oct 28, 2020
1 parent 6ed2321 commit 44c0103
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,21 @@
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import reactor.core.publisher.Signal;

import java.nio.BufferOverflowException;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;

/**
* A class for aggregating {@link ServiceBusMessage messages} into a single, size-limited, batch. It is treated as a
* single AMQP message when sent to the Azure Service Bus service.
*/
public final class ServiceBusMessageBatch {
private static final String AZ_TRACING_NAMESPACE_VALUE = "Microsoft.ServiceBus";
private final ClientLogger logger = new ClientLogger(ServiceBusMessageBatch.class);
private final Object lock = new Object();
private final int maxMessageSize;
Expand Down Expand Up @@ -102,7 +92,10 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
throw logger.logExceptionAsWarning(new NullPointerException("'serviceBusMessage' cannot be null"));
}
ServiceBusMessage serviceBusMessageUpdated =
tracerProvider.isEnabled() ? traceMessageSpan(serviceBusMessage) : serviceBusMessage;
tracerProvider.isEnabled()
? traceMessageSpan(serviceBusMessage, serviceBusMessage.getContext(), hostname, entityPath,
tracerProvider)
: serviceBusMessage;

final int size;
try {
Expand Down Expand Up @@ -135,39 +128,6 @@ List<ServiceBusMessage> getMessages() {
return serviceBusMessageList;
}

/**
* Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the
* message.
*
* @param serviceBusMessage The Message to add tracing span for.
*
* @return the updated Message data object.
*/
private ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage) {
Optional<Object> eventContextData = serviceBusMessage.getContext().getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return serviceBusMessage;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context messageContext = serviceBusMessage.getContext()
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname);
Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, messageContext,
ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get()
.toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}

return serviceBusMessage;
}

private int getSize(final ServiceBusMessage serviceBusMessage, final boolean isFirst) {
Objects.requireNonNull(serviceBusMessage, "'serviceBusMessage' cannot be null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
Expand All @@ -15,16 +21,32 @@
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import reactor.core.publisher.Signal;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.MESSAGE_ENQUEUED_TIME;
import static com.azure.core.util.tracing.Tracer.SCOPE_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;


/**
* Contains helper methods for message conversions, reading status codes, and getting delivery state.
*/
Expand Down Expand Up @@ -253,4 +275,79 @@ private static TransactionalState getTransactionState(ByteBuffer transactionId,
transactionalState.setOutcome(outcome);
return transactionalState;
}

/**
* Used in ServiceBusMessageBatch.tryAddMessage() to start tracing for to-be-sent out messages.
*/
public static ServiceBusMessage traceMessageSpan(ServiceBusMessage serviceBusMessage,
Context messageContext, String hostname, String entityPath, TracerProvider tracerProvider) {
Optional<Object> eventContextData = messageContext.getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return serviceBusMessage;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context newMessageContext = messageContext
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname);
Context eventSpanContext = tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, newMessageContext,
ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
serviceBusMessage.getApplicationProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get()
.toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
serviceBusMessage.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}
return serviceBusMessage;
}

/*
* Starts a new process tracing span and attaches the returned context to the ServiceBusReceivedMessage object for
* users.
*/
public static Context startProcessTracingSpan(ServiceBusReceivedMessage receivedMessage,
String hostname, String entityPath, TracerProvider tracerProvider, ProcessKind processKind) {
Object diagnosticId = receivedMessage.getApplicationProperties().get(DIAGNOSTIC_ID_KEY);
if (diagnosticId == null || !tracerProvider.isEnabled()) {
return Context.NONE;
}

Context spanContext = tracerProvider.extractContext(diagnosticId.toString(), Context.NONE)
.addData(ENTITY_PATH_KEY, entityPath)
.addData(HOST_NAME_KEY, hostname)
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
spanContext = receivedMessage.getEnqueuedTime() == null
? spanContext
: spanContext.addData(MESSAGE_ENQUEUED_TIME, receivedMessage.getEnqueuedTime().toEpochSecond());
return tracerProvider.startSpan(AZ_TRACING_SERVICE_NAME, spanContext, processKind);
}

/*
* Ends the process tracing span and the scope of that span.
*/
public static void endProcessTracingSpan(Context processSpanContext, Signal<Void> signal,
TracerProvider tracerProvider, ClientLogger logger) {
if (processSpanContext != null) {
Optional<Object> spanScope = processSpanContext.getData(SCOPE_KEY);
// Disposes of the scope when the trace span closes.
if (tracerProvider.isEnabled() && spanScope.isPresent()) {
if (spanScope.get() instanceof Closeable) {
Closeable close = (Closeable) spanScope.get();
try {
close.close();
tracerProvider.endSpan(processSpanContext, signal);
} catch (IOException ioException) {
logger.error(Messages.MESSAGE_PROCESSOR_RUN_END, ioException);
}

} else {
logger.warning(String.format(Locale.US,
Messages.PROCESS_SPAN_SCOPE_TYPE_ERROR, spanScope.getClass()));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class Messages {
public static final String INVALID_LOCK_TOKEN_STRING = getMessage("INVALID_LOCK_TOKEN_STRING");
public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE");
public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID");
public static final String PROCESS_SPAN_SCOPE_TYPE_ERROR = getMessage("PROCESS_SPAN_SCOPE_TYPE_ERROR");
public static final String MESSAGE_PROCESSOR_RUN_END = getMessage("MESSAGE_PROCESSOR_RUN_END");

private static synchronized Properties getProperties() {
if (properties != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not set
REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}.
INVALID_OPERATION_DISPOSED_RECEIVER=Cannot perform operation '%s' on a disposed receiver.
INVALID_LOCK_TOKEN_STRING=Invalid lock token '%s'.
PROCESS_SPAN_SCOPE_TYPE_ERROR=Process span scope type is not of type Closeable, but type: %s. Not closing the scope\
and span
MESSAGE_PROCESSOR_RUN_END=MessageProcessor.run() endTracingSpan().close() failed with an error %s

0 comments on commit 44c0103

Please sign in to comment.