Skip to content

Commit

Permalink
Adding in tests for a missing case in the async client tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
richardpark-msft committed Nov 17, 2020
1 parent a1c04b8 commit b867ae2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public Mono<ServiceBusMessageBatch> createMessageBatch(CreateMessageBatchOptions
return Mono.just(
new ServiceBusMessageBatch(batchSize, link::getErrorContext, tracerProvider, messageSerializer,
entityName, getFullyQualifiedNamespace()));
}));
})).onErrorMap(this::mapError);
}

/**
Expand Down Expand Up @@ -619,8 +619,7 @@ private Mono<Void> sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact
if (isTracingEnabled) {
tracerProvider.endSpan(parentContext.get(), signal);
}
});

}).onErrorMap(this::mapError);
}

private Mono<Void> sendInternal(Flux<ServiceBusMessage> messages, ServiceBusTransactionContext transactionContext) {
Expand All @@ -634,7 +633,8 @@ private Mono<Void> sendInternal(Flux<ServiceBusMessage> messages, ServiceBusTran
link::getErrorContext, tracerProvider, messageSerializer, entityName,
link.getHostname()));
})
.flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext)));
.flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext)))
.onErrorMap(this::mapError);
}

private Mono<Void> sendInternalBatch(Flux<ServiceBusMessageBatch> eventBatches,
Expand All @@ -658,6 +658,13 @@ private Mono<AmqpSendLink> getSendLink() {
.doOnNext(next -> linkName.compareAndSet(null, next.getLinkName()));
}

private Throwable mapError(Throwable throwable) {
if (!(throwable instanceof ServiceBusException)) {
return new ServiceBusException(throwable, ServiceBusErrorSource.SENDING);
}
return throwable;
}

private static class AmqpMessageCollector implements Collector<ServiceBusMessage, List<ServiceBusMessageBatch>,
List<ServiceBusMessageBatch>> {
private final int maxMessageSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.CbsAuthorizationType;
import com.azure.core.amqp.implementation.ConnectionOptions;
Expand Down Expand Up @@ -243,7 +241,7 @@ void createBatchWhenSizeTooBig() {

// Act & Assert
StepVerifier.create(sender.createMessageBatch(options))
.expectError(IllegalArgumentException.class)
.expectError(ServiceBusException.class)
.verify();
}

Expand Down Expand Up @@ -523,8 +521,25 @@ void sendMessagesListExceedSize() {

// Act & Assert
StepVerifier.create(sender.sendMessages(messages))
.verifyErrorMatches(error -> error instanceof AmqpException
&& ((AmqpException) error).getErrorCondition() == AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED);
.verifyErrorMatches(error -> error instanceof ServiceBusException
&& ((ServiceBusException) error).getReason() == ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED);

verify(sendLink, never()).send(anyList());
}

@Test
void sendSingleMessageThatExceedsSize() {
// arrange
ServiceBusMessage message = TestUtils.getServiceBusMessages(1, UUID.randomUUID().toString()).get(0);

when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull()))
.thenReturn(Mono.just(sendLink));
when(sendLink.getLinkSize()).thenReturn(Mono.just(1));

// Act & Assert
StepVerifier.create(sender.sendMessage(message))
.verifyErrorMatches(error -> error instanceof ServiceBusException
&& ((ServiceBusException) error).getReason() == ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED);

verify(sendLink, never()).send(anyList());
}
Expand Down

0 comments on commit b867ae2

Please sign in to comment.