From b867ae2432e5d61ebc86fb2c43b46449d9d8e224 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Mon, 16 Nov 2020 16:28:26 -0800 Subject: [PATCH] Adding in tests for a missing case in the async client tests. --- .../ServiceBusSenderAsyncClient.java | 15 ++++++++--- .../ServiceBusSenderAsyncClientTest.java | 25 +++++++++++++++---- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index b026fec9de706..30650203b721d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -291,7 +291,7 @@ public Mono createMessageBatch(CreateMessageBatchOptions return Mono.just( new ServiceBusMessageBatch(batchSize, link::getErrorContext, tracerProvider, messageSerializer, entityName, getFullyQualifiedNamespace())); - })); + })).onErrorMap(this::mapError); } /** @@ -619,8 +619,7 @@ private Mono sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact if (isTracingEnabled) { tracerProvider.endSpan(parentContext.get(), signal); } - }); - + }).onErrorMap(this::mapError); } private Mono sendInternal(Flux messages, ServiceBusTransactionContext transactionContext) { @@ -634,7 +633,8 @@ private Mono sendInternal(Flux messages, ServiceBusTran link::getErrorContext, tracerProvider, messageSerializer, entityName, link.getHostname())); }) - .flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext))); + .flatMap(list -> sendInternalBatch(Flux.fromIterable(list), transactionContext))) + .onErrorMap(this::mapError); } private Mono sendInternalBatch(Flux eventBatches, @@ -658,6 +658,13 @@ private Mono getSendLink() { .doOnNext(next -> linkName.compareAndSet(null, next.getLinkName())); } + private Throwable mapError(Throwable throwable) { + if (!(throwable instanceof ServiceBusException)) { + return new ServiceBusException(throwable, ServiceBusErrorSource.SENDING); + } + return throwable; + } + private static class AmqpMessageCollector implements Collector, List> { private final int maxMessageSize; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index f918ff65952d9..4ebb319329ec5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -9,8 +9,6 @@ import com.azure.core.amqp.AmqpTransaction; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyOptions; -import com.azure.core.amqp.exception.AmqpErrorCondition; -import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.AmqpSendLink; import com.azure.core.amqp.implementation.CbsAuthorizationType; import com.azure.core.amqp.implementation.ConnectionOptions; @@ -243,7 +241,7 @@ void createBatchWhenSizeTooBig() { // Act & Assert StepVerifier.create(sender.createMessageBatch(options)) - .expectError(IllegalArgumentException.class) + .expectError(ServiceBusException.class) .verify(); } @@ -523,8 +521,25 @@ void sendMessagesListExceedSize() { // Act & Assert StepVerifier.create(sender.sendMessages(messages)) - .verifyErrorMatches(error -> error instanceof AmqpException - && ((AmqpException) error).getErrorCondition() == AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED); + .verifyErrorMatches(error -> error instanceof ServiceBusException + && ((ServiceBusException) error).getReason() == ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED); + + verify(sendLink, never()).send(anyList()); + } + + @Test + void sendSingleMessageThatExceedsSize() { + // arrange + ServiceBusMessage message = TestUtils.getServiceBusMessages(1, UUID.randomUUID().toString()).get(0); + + when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull())) + .thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(1)); + + // Act & Assert + StepVerifier.create(sender.sendMessage(message)) + .verifyErrorMatches(error -> error instanceof ServiceBusException + && ((ServiceBusException) error).getReason() == ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED); verify(sendLink, never()).send(anyList()); }