Skip to content

Commit

Permalink
API ServiceBusErrorSource to represent source of error (#16710)
Browse files Browse the repository at this point in the history
ErrorSource to populate when autocomplete is on and for processor model.
  • Loading branch information
hemanttanwar authored Oct 30, 2020
1 parent 3b5d04c commit 249ecc1
Show file tree
Hide file tree
Showing 5 changed files with 297 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.amqp.exception.AmqpException;

/**
* Defines {@link ServiceBusAmqpException} which has additional information about the operation that caused the error.
*
* @see ServiceBusErrorSource
*/
public final class ServiceBusAmqpException extends AmqpException {
private final transient ServiceBusErrorSource errorSource;

/**
* @param amqpException for the error hapened.
* @param errorSource indicating which api caused the error.
*/
ServiceBusAmqpException(AmqpException amqpException, ServiceBusErrorSource errorSource) {
super(amqpException.isTransient(), amqpException.getErrorCondition(), amqpException.getMessage(),
amqpException.getCause(), amqpException.getContext());
this.errorSource = errorSource;
}

/**
* Gets the {@link ServiceBusErrorSource} in case of any errors.
*
* @return the {@link ServiceBusErrorSource}
*/
public ServiceBusErrorSource getErrorSource() {
return errorSource;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;

import com.azure.core.util.ExpandableStringEnum;

/**
* Represent the operation this sdk was performing when the error happened.
*/
public final class ServiceBusErrorSource extends ExpandableStringEnum<ServiceBusErrorSource> {

/** Error while abandoning the message.*/
public static final ServiceBusErrorSource ABANDONED = fromString("ABANDONED", ServiceBusErrorSource.class);

/** Error while completing the message.*/
public static final ServiceBusErrorSource COMPLETE = fromString("COMPLETE", ServiceBusErrorSource.class);

/** Error while receiving the message(s).*/
public static final ServiceBusErrorSource RECEIVE = fromString("RECEIVE", ServiceBusErrorSource.class);

/** Error while renewing lock.*/
public static final ServiceBusErrorSource RENEW_LOCK = fromString("RENEW_LOCK", ServiceBusErrorSource.class);

/** Error when we could not determine the source.*/
public static final ServiceBusErrorSource UNKNOWN = fromString("UNKNOWN", ServiceBusErrorSource.class);

/** Error while user's code is running for a message.*/
public static final ServiceBusErrorSource USER_CALLBACK = fromString("USER_CALLBACK",
ServiceBusErrorSource.class);

/** Error while session is accepted.*/
public static final ServiceBusErrorSource ACCEPT_SESSION = fromString("ACCEPT_SESSION",
ServiceBusErrorSource.class);

/** Error while session is closed.*/
public static final ServiceBusErrorSource CLOSE_SESSION = fromString("CLOSE_SESSION",
ServiceBusErrorSource.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public long getSequenceNumber() {
* @return Session Id of the {@link ServiceBusReceivedMessage}.
*/
public String getSessionId() {
return amqpAnnotatedMessage.getProperties().getGroupId();
return getAmqpAnnotatedMessage().getProperties().getGroupId();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ public Flux<ServiceBusReceivedMessageContext> receiveMessages() {
withAutoComplete = withAutoLockRenewal;
}

return withAutoComplete;
return withAutoComplete
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE));
}

/**
Expand Down Expand Up @@ -711,7 +712,8 @@ public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
String.format("Cannot renew message lock [%s] for a session receiver.", message.getLockToken())));
}

return renewMessageLock(message.getLockToken());
return renewMessageLock(message.getLockToken())
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

/**
Expand Down Expand Up @@ -768,7 +770,8 @@ public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration m
renewalContainer.addOrUpdate(message.getLockToken(), OffsetDateTime.now().plus(maxLockRenewalDuration),
operation);

return operation.getCompletionOperation();
return operation.getCompletionOperation()
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

/**
Expand Down Expand Up @@ -975,9 +978,10 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
renewalContainer.remove(lockToken);
}));

Mono<Void> updateDispositionOperation;
if (sessionManager != null) {
return sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, propertiesToModify,
deadLetterReason, deadLetterErrorDescription, transactionContext)
updateDispositionOperation = sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus,
propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext)
.flatMap(isSuccess -> {
if (isSuccess) {
renewalContainer.remove(lockToken);
Expand All @@ -987,20 +991,38 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
logger.info("Could not perform on session manger. Performing on management node.");
return performOnManagement;
});
}

final ServiceBusAsyncConsumer existingConsumer = consumer.get();
if (isManagementToken(lockToken) || existingConsumer == null) {
return performOnManagement;
} else {
return existingConsumer.updateDisposition(lockToken, dispositionStatus, deadLetterReason,
deadLetterErrorDescription, propertiesToModify, transactionContext)
.then(Mono.fromRunnable(() -> {
logger.info("{}: Update completed. Disposition: {}. Lock: {}.",
entityPath, dispositionStatus, lockToken);
renewalContainer.remove(lockToken);
}));
final ServiceBusAsyncConsumer existingConsumer = consumer.get();
if (isManagementToken(lockToken) || existingConsumer == null) {
updateDispositionOperation = performOnManagement;
} else {
updateDispositionOperation = existingConsumer.updateDisposition(lockToken, dispositionStatus,
deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext)
.then(Mono.fromRunnable(() -> {
logger.info("{}: Update completed. Disposition: {}. Lock: {}.",
entityPath, dispositionStatus, lockToken);
renewalContainer.remove(lockToken);
}));
}
}
return updateDispositionOperation
.onErrorMap(throwable -> {
// We only populate ErrorSource only when AutoComplete is enabled.
if (receiverOptions.isEnableAutoComplete() && throwable instanceof AmqpException) {
switch (dispositionStatus) {
case COMPLETED:
return new ServiceBusAmqpException((AmqpException) throwable,
ServiceBusErrorSource.COMPLETE);
case ABANDONED:
return new ServiceBusAmqpException((AmqpException) throwable,
ServiceBusErrorSource.ABANDONED);
default:
// Do nothing
}
}
return throwable;

});
}

private ServiceBusAsyncConsumer getOrCreateConsumer() {
Expand Down Expand Up @@ -1077,7 +1099,8 @@ Mono<OffsetDateTime> renewSessionLock(String sessionId) {

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(channel -> channel.renewSessionLock(sessionId, linkName));
.flatMap(channel -> channel.renewSessionLock(sessionId, linkName))
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
Expand All @@ -1101,7 +1124,8 @@ Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
maxLockRenewalDuration, true, this::renewSessionLock);

renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
return operation.getCompletionOperation();
return operation.getCompletionOperation()
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RENEW_LOCK));
}

Mono<Void> setSessionState(String sessionId, byte[] sessionState) {
Expand Down Expand Up @@ -1135,4 +1159,15 @@ Mono<byte[]> getSessionState(String sessionId) {
.flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId)));
}
}

/**
* Map the error to {@link ServiceBusAmqpException}
*/
private Throwable mapError(Throwable throwable, ServiceBusErrorSource errorSource) {
if ((throwable instanceof ServiceBusAmqpException) || !(throwable instanceof AmqpException)) {
return throwable;
} else {
return new ServiceBusAmqpException((AmqpException) throwable, errorSource);
}
}
}
Loading

0 comments on commit 249ecc1

Please sign in to comment.