diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java index 574e8db71673e..3fbbeded161d6 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java @@ -25,7 +25,7 @@ public interface AmqpReceiveLink extends AmqpLink { Flux receive(); /** - * Adds the specified number of credits to the link. + * Schedule to adds the specified number of credits to the link. * * The number of link credits initialises to zero. It is the application's responsibility to call this method to * allow the receiver to receive {@code credits} more deliveries. @@ -34,6 +34,21 @@ public interface AmqpReceiveLink extends AmqpLink { */ void addCredits(int credits); + /** + * Adds the specified number of credits to the link. + * + * The number of link credits initialises to zero. It is the application's responsibility to call this method to + * allow the receiver to receive {@code credits} more deliveries. + * + * It will update the credits in local memory instantly so {@link #getCredits()} will get + * the updated credits immediately. But the service side may get the credits added with a latency. + * As a contrast, {@link #getCredits()} may return an unchanged value for a short while after + * {@link #addCredits(int)} is called to schedule the credit addition and before the job dispatcher executes it. + * + * @param credits Number of credits to add to the receive link. + */ + void addCreditsInstantly(int credits); + /** * Gets the current number of credits this link has. * diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 673d6deea1131..e89bc72e06088 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -107,6 +107,11 @@ public void addCredits(int credits) { } } + @Override + public void addCreditsInstantly(int credits) { + receiver.flow(credits); + } + @Override public int getCredits() { return receiver.getRemoteCredit(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java index 0970e445b3003..cded96c6b4976 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java @@ -34,9 +34,7 @@ class ServiceBusAsyncConsumer implements AutoCloseable { this.linkProcessor = linkProcessor; this.messageSerializer = messageSerializer; this.processor = linkProcessor - .map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)) - .publish(receiverOptions.getPrefetchCount()) - .autoConnect(1); + .map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 3723c3c170ed0..6a30ce7aa8c76 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -94,7 +94,7 @@ public final class ServiceBusClientBuilder { // Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application // receiving messages at a slow rate. Applications can set it to a higher value if they need better performance. - private static final int DEFAULT_PREFETCH_COUNT = 1; + private static final int DEFAULT_PREFETCH_COUNT = 0; private static final String NAME_KEY = "name"; private static final String VERSION_KEY = "version"; private static final String UNKNOWN = "UNKNOWN"; @@ -670,11 +670,13 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc /** * Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and - * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. + * {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 0. * * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when * and before the application starts the processor. * Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off. + * Using a non-zero prefetch risks of losing messages even though it has better performance. + * @see Service Bus Prefetch * * @param prefetchCount The prefetch count. * @@ -1442,9 +1444,9 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { } private void validateAndThrow(int prefetchCount) { - if (prefetchCount < 1) { + if (prefetchCount < 0) { throw logger.logExceptionAsError(new IllegalArgumentException(String.format( - "prefetchCount (%s) cannot be less than 1.", prefetchCount))); + "prefetchCount (%s) cannot be less than 0.", prefetchCount))); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index d20e0aed23f05..70920dbedb632 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -573,7 +573,18 @@ Flux peekMessagesAt(int maxMessages, long sequenceNum * @return An infinite stream of messages from the Service Bus entity. */ public Flux receiveMessages() { - return receiveMessagesWithContext() + // Without limitRate(), if the user calls receiveMessages().subscribe(), it will call + // ServiceBusReceiveLinkProcessor.request(long request) where request = Long.MAX_VALUE. + // We turn this one-time non-backpressure request to continuous requests with backpressure. + // If receiverOptions.prefetchCount is set to non-zero, it will be passed to ServiceBusReceiveLinkProcessor + // to auto-refill the prefetch buffer. A request will retrieve one message from this buffer. + // If receiverOptions.prefetchCount is 0 (default value), + // the request will add a link credit so one message is retrieved from the service. + return receiveMessagesNoBackPressure().limitRate(1, 0); + } + + Flux receiveMessagesNoBackPressure() { + return receiveMessagesWithContext(0) .handle((serviceBusMessageContext, sink) -> { if (serviceBusMessageContext.hasError()) { sink.error(serviceBusMessageContext.getThrowable()); @@ -598,6 +609,10 @@ public Flux receiveMessages() { * @return An infinite stream of messages from the Service Bus entity. */ Flux receiveMessagesWithContext() { + return receiveMessagesWithContext(1); + } + + Flux receiveMessagesWithContext(int highTide) { final Flux messageFlux = sessionManager != null ? sessionManager.receive() : getOrCreateConsumer().receive().map(ServiceBusMessageContext::new); @@ -610,16 +625,19 @@ Flux receiveMessagesWithContext() { withAutoLockRenewal = messageFlux; } - final Flux withAutoComplete; + Flux result; if (receiverOptions.isEnableAutoComplete()) { - withAutoComplete = new FluxAutoComplete(withAutoLockRenewal, completionLock, + result = new FluxAutoComplete(withAutoLockRenewal, completionLock, context -> context.getMessage() != null ? complete(context.getMessage()) : Mono.empty(), context -> context.getMessage() != null ? abandon(context.getMessage()) : Mono.empty()); } else { - withAutoComplete = withAutoLockRenewal; + result = withAutoLockRenewal; } - return withAutoComplete + if (highTide > 0) { + result = result.limitRate(highTide, 0); + } + return result .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index a584df4c194d3..3b24ee01a1146 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -575,12 +575,12 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext) */ @Override public void close() { - asyncClient.close(); - SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null); if (messageSubscriber != null && !messageSubscriber.isDisposed()) { messageSubscriber.dispose(); } + + asyncClient.close(); } /** @@ -590,19 +590,20 @@ public void close() { private void queueWork(int maximumMessageCount, Duration maxWaitTime, FluxSink emitter) { final long id = idGenerator.getAndIncrement(); - final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter); - + final int prefetch = asyncClient.getReceiverOptions().getPrefetchCount(); + final int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount; + final SynchronousReceiveWork work = new SynchronousReceiveWork(id, + toRequest, + maxWaitTime, emitter); SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get(); if (messageSubscriber == null) { - long prefetch = asyncClient.getReceiverOptions().getPrefetchCount(); - SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work); - + SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(toRequest, work); if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) { newSubscriber.dispose(); SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get(); existing.queueWork(work); } else { - asyncClient.receiveMessages().subscribeWith(newSubscriber); + asyncClient.receiveMessagesNoBackPressure().subscribeWith(newSubscriber); } } else { messageSubscriber.queueWork(work); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index edba47d8a1e4f..f7bb7f9c133db 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -309,7 +309,7 @@ private Flux getSession(Scheduler scheduler, boolean d onSessionRequest(1L); } })) - .publishOn(scheduler); + .publishOn(scheduler, 1); } private Mono getManagementNode() { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index 6915530e3b66b..05ea4d355fde8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -63,14 +63,23 @@ class ServiceBusSessionReceiver implements AutoCloseable { this.receiveLink = receiveLink; this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT); - receiveLink.setEmptyCreditListener(() -> 1); + receiveLink.setEmptyCreditListener(() -> 0); final Flux receivedMessagesFlux = receiveLink .receive() .publishOn(scheduler) .doOnSubscribe(subscription -> { logger.verbose("Adding prefetch to receive link."); - receiveLink.addCredits(prefetch); + if (prefetch > 0) { + receiveLink.addCredits(prefetch); + } + }) + .doOnRequest(request -> { // request is of type long. + if (prefetch == 0) { // add "request" number of credits + receiveLink.addCredits((int) request); + } else { // keep total credits "prefetch" if prefetch is not 0. + receiveLink.addCredits(Math.max(0, prefetch - receiveLink.getCredits())); + } }) .takeUntilOther(cancelReceiveProcessor) .map(message -> { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java index 9b8532507d15f..c717ef0c75506 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java @@ -57,8 +57,6 @@ protected void hookOnSubscribe(Subscription subscription) { if (Operators.setOnce(UPSTREAM, this, subscription)) { this.subscription = subscription; - remaining.addAndGet(requested); - subscription.request(requested); subscriberInitialized = true; drain(); } else { @@ -140,7 +138,6 @@ private void drainQueue() { while ((currentWork = workQueue.peek()) != null && (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) { - // Additional check for safety, but normally this work should never be terminal if (currentWork.isTerminal()) { // This work already finished by either timeout or no more messages to send, process next work. @@ -155,6 +152,9 @@ private void drainQueue() { // timer to complete the currentWork in case of timeout trigger currentTimeoutOperation = getTimeoutOperation(currentWork); currentWork.startedProcessing(); + final long calculatedRequest = currentWork.getNumberOfEvents() - remaining.get(); + remaining.addAndGet(calculatedRequest); + subscription.request(calculatedRequest); } // Send messages to currentWork from buffer @@ -174,15 +174,6 @@ private void drainQueue() { currentTimeoutOperation.dispose(); } logger.verbose("The work [{}] is complete.", currentWork.getId()); - } else { - // Since this work is not complete, find out how much we should request from upstream - long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size()); - if (creditToAdd > 0) { - remaining.addAndGet(creditToAdd); - subscription.request(creditToAdd); - logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd, - currentWork.getId()); - } } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index 93f31e5ff8a21..07e89682e1e0b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -45,7 +45,6 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor linkName = new AtomicReference<>(); // Queue containing all the prefetched messages. @@ -200,12 +199,7 @@ public void onNext(ServiceBusReceiveLink next) { oldSubscription = currentLinkSubscriptions; currentLink = next; - next.setEmptyCreditListener(() -> { - final int creditsToAdd = getCreditsToAdd(0); - linkCreditsAdded.set(creditsToAdd > 0); - - return creditsToAdd; - }); + next.setEmptyCreditListener(() -> 0); currentLinkSubscriptions = Disposables.composite( next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> { @@ -499,6 +493,9 @@ private void drainQueue() { if (receiveMode != ServiceBusReceiveMode.PEEK_LOCK) { pendingMessages.decrementAndGet(); } + if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured. + checkAndAddCredits(currentLink); + } } catch (Exception e) { logger.error("Exception occurred while handling downstream onNext operation.", e); throw logger.logExceptionAsError(Exceptions.propagate( @@ -545,18 +542,14 @@ private void checkAndAddCredits(AmqpReceiveLink link) { return; } - // Credits have already been added to the link. We won't try again. - if (linkCreditsAdded.getAndSet(true)) { - return; - } - - final int credits = getCreditsToAdd(link.getCredits()); - linkCreditsAdded.set(credits > 0); - - logger.info("Link credits to add. Credits: '{}'", credits); + synchronized (lock) { + final int linkCredits = link.getCredits(); + final int credits = getCreditsToAdd(linkCredits); + logger.info("Link credits='{}', Link credits to add: '{}'", linkCredits, credits); - if (credits > 0) { - link.addCredits(credits); + if (credits > 0) { + link.addCredits(credits); + } } } @@ -571,22 +564,40 @@ private int getCreditsToAdd(int linkCredits) { } final int creditsToAdd; - if (messageQueue.isEmpty() && !hasBackpressure) { - creditsToAdd = prefetch; + final int expectedTotalCredit; + if (prefetch == 0) { + if (r <= Integer.MAX_VALUE) { + expectedTotalCredit = (int) r; + } else { + //This won't really happen in reality. + //For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);". + //So it will request one by one from this link processor, even though the user's request has no + //back pressure. + //For sync client, the sync subscriber has back pressure. + //The request count uses the the argument of method receiveMessages(int maxMessages). + //It's at most Integer.MAX_VALUE. + expectedTotalCredit = Integer.MAX_VALUE; + } } else { - synchronized (queueLock) { - final int queuedMessages = pendingMessages.get(); - final int pending = queuedMessages + linkCredits; - - if (hasBackpressure) { - creditsToAdd = Math.max(Long.valueOf(r).intValue() - pending, 0); - } else { - // If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full. - creditsToAdd = minimumNumberOfMessages >= queuedMessages - ? Math.max(prefetch - pending, 1) - : 0; - } + expectedTotalCredit = prefetch; + } + logger.info("linkCredits: '{}', expectedTotalCredit: '{}'", linkCredits, expectedTotalCredit); + + synchronized (queueLock) { + final int queuedMessages = pendingMessages.get(); + final int pending = queuedMessages + linkCredits; + + if (hasBackpressure) { + creditsToAdd = Math.max(expectedTotalCredit - pending, 0); + } else { + // If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full. + creditsToAdd = minimumNumberOfMessages >= queuedMessages + ? Math.max(expectedTotalCredit - pending, 0) + : 0; } + logger.info("prefetch: '{}', requested: '{}', linkCredits: '{}', expectedTotalCredit: '{}', queuedMessages:" + + "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'", getPrefetch(), r, linkCredits, + expectedTotalCredit, queuedMessages, creditsToAdd, messageQueue.size()); } return creditsToAdd; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java index d73562ff7648a..80d57e93ce8c7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java @@ -202,7 +202,7 @@ void invalidPrefetch() { .receiveMode(ServiceBusReceiveMode.PEEK_LOCK); // Act & Assert - assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(0)); + assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(-1)); } @MethodSource("getProxyConfigurations") diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index af78e7ff45c4d..ef7e7effe190f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -849,10 +849,15 @@ void canPerformMultipleReceive() { .expectNextCount(numberOfEvents) .verifyComplete(); - StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) - .then(() -> messages.forEach(m -> messageSink.next(m))) - .expectNextCount(numberOfEvents) - .verifyComplete(); + // TODO: Yijun and Srikanta are thinking of using two links for two subscribers. + // We may not want to support multiple subscribers by using publish and autoConnect. + // After the autoConnect was removed from ServiceBusAsyncConsumer.processor, the receiver doesn't support + // multiple calls of receiver.receiveMessages(). + // For more discussions. +// StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) +// .then(() -> messages.forEach(m -> messageSink.next(m))) +// .expectNextCount(numberOfEvents) +// .verifyComplete(); verify(amqpReceiveLink).addCredits(PREFETCH); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index 4f2623993ec6a..a5a31b52ce4b9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -80,7 +80,7 @@ void setup() { MockitoAnnotations.initMocks(this); when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); - when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 1, null, false)); + when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ServiceBusReceiveMode.PEEK_LOCK, 0, null, false)); when(sessionReceiverOptions.getSessionId()).thenReturn(SESSION_ID); client = new ServiceBusReceiverClient(asyncClient, OPERATION_TIMEOUT); } @@ -658,7 +658,7 @@ void receiveMessagesWithUserSpecifiedTimeout() { sink.complete(); }); }); - when(asyncClient.receiveMessages()).thenReturn(messageSink); + when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink); // Act final IterableStream actual = client.receiveMessages(maxMessages, receiveTimeout); @@ -704,7 +704,7 @@ void receiveMessagesMax() { }); }); - when(asyncClient.receiveMessages()).thenReturn(messageSink); + when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink); // Act final IterableStream actual = client.receiveMessages(maxMessages); @@ -750,7 +750,7 @@ void receiveMessagesTimeout() { sink.complete(); }); }); - when(asyncClient.receiveMessages()).thenReturn(messageSink); + when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink); // Act final IterableStream actual = client.receiveMessages(maxMessages); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index 255fdb67d90d2..c4a7e46b8fc2f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -77,6 +77,7 @@ class ServiceBusReceiveLinkProcessorTest { private final EmitterProcessor messageProcessor = EmitterProcessor.create(); private final FluxSink messageProcessorSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); private ServiceBusReceiveLinkProcessor linkProcessor; + private ServiceBusReceiveLinkProcessor linkProcessorNoPrefetch; @BeforeAll static void beforeAll() { @@ -93,6 +94,7 @@ void setup() { MockitoAnnotations.initMocks(this); linkProcessor = new ServiceBusReceiveLinkProcessor(PREFETCH, retryPolicy, ServiceBusReceiveMode.PEEK_LOCK); + linkProcessorNoPrefetch = new ServiceBusReceiveLinkProcessor(0, retryPolicy, ServiceBusReceiveMode.PEEK_LOCK); when(link1.getEndpointStates()).thenReturn(endpointProcessor); when(link1.receive()).thenReturn(messageProcessor); @@ -139,16 +141,7 @@ void createNewLink() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCredits(eq(PREFETCH)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); - - Supplier value = creditSupplierCaptor.getValue(); - assertNotNull(value); - - final Integer creditValue = value.get(); - // Expecting 5 because it is Long.MAX_VALUE and there are no credits (since we invoked the empty credit - // listener). - assertEquals(PREFETCH, creditValue); + verify(link1).addCredits(eq(PREFETCH - 1)); } /** @@ -160,7 +153,7 @@ void respectsBackpressureInRange() { final int backpressure = 15; // Because one message was emitted. ServiceBusReceiveLinkProcessor processor = Flux.create(sink -> sink.next(link1)) - .subscribeWith(linkProcessor); + .subscribeWith(linkProcessorNoPrefetch); // Act & Assert StepVerifier.create(processor, backpressure) @@ -169,15 +162,7 @@ void respectsBackpressureInRange() { .thenCancel() .verify(); - verify(link1).addCredits(eq(backpressure)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); - - Supplier value = creditSupplierCaptor.getValue(); - assertNotNull(value); - - final Integer creditValue = value.get(); - final int emittedOne = backpressure - 1; - assertTrue(creditValue == emittedOne || creditValue == backpressure); + verify(link1).addCredits(backpressure); // request up to PREFETCH } /** @@ -534,7 +519,7 @@ void receivesUntilFirstLinkClosed() { ServiceBusReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); FluxSink sink = endpointProcessor.sink(); - when(link1.getCredits()).thenReturn(1); + when(link1.getCredits()).thenReturn(0); // Act & Assert StepVerifier.create(processor) @@ -554,15 +539,14 @@ void receivesUntilFirstLinkClosed() { assertNull(processor.getError()); verify(link1).addCredits(eq(PREFETCH)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); + verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); // Add 0 Supplier value = creditSupplierCaptor.getValue(); assertNotNull(value); final Integer creditValue = value.get(); - // Expecting 5 because it is Long.MAX_VALUE and there are no credits (since we invoked the empty credit - // listener). - assertEquals(PREFETCH, creditValue); + + assertEquals(0, creditValue); } @Test @@ -571,7 +555,7 @@ void receivesFromFirstLink() { ServiceBusReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); FluxSink sink = endpointProcessor.sink(); - when(link1.getCredits()).thenReturn(1); + when(link1.getCredits()).thenReturn(0); // Act & Assert StepVerifier.create(processor) @@ -590,15 +574,14 @@ void receivesFromFirstLink() { assertNull(processor.getError()); verify(link1).addCredits(eq(PREFETCH)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); + verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); // Add 0. Supplier value = creditSupplierCaptor.getValue(); assertNotNull(value); final Integer creditValue = value.get(); - // Expecting 5 because it is Long.MAX_VALUE and there are no credits (since we invoked the empty credit - // listener). - assertEquals(PREFETCH, creditValue); + + assertEquals(0, creditValue); } /** @@ -608,7 +591,7 @@ void receivesFromFirstLink() { @Test void backpressureRequestOnlyEmitsThatAmount() { // Arrange - final int backpressure = 10; + final int backpressure = PREFETCH; final int existingCredits = 1; final int expectedCredits = backpressure - existingCredits; ServiceBusReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor);