From fd1843a71ab865208371dffa6100619428f60518 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 12 Nov 2020 15:50:38 -0800 Subject: [PATCH 01/31] Link credit size adjustment --- .../servicebus/ServiceBusAsyncConsumer.java | 4 +- .../servicebus/ServiceBusClientBuilder.java | 10 ++-- .../servicebus/ServiceBusReceiverClient.java | 11 ++-- .../SynchronousMessageSubscriber.java | 10 +--- .../ServiceBusReceiveLinkProcessor.java | 52 +++++++++---------- .../ServiceBusClientBuilderTest.java | 2 +- 6 files changed, 40 insertions(+), 49 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java index 0970e445b3003..cded96c6b4976 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumer.java @@ -34,9 +34,7 @@ class ServiceBusAsyncConsumer implements AutoCloseable { this.linkProcessor = linkProcessor; this.messageSerializer = messageSerializer; this.processor = linkProcessor - .map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)) - .publish(receiverOptions.getPrefetchCount()) - .autoConnect(1); + .map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index af6acb706b88f..2bb9add8af95f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -94,7 +94,7 @@ public final class ServiceBusClientBuilder { // Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application // receiving messages at a slow rate. Applications can set it to a higher value if they need better performance. - private static final int DEFAULT_PREFETCH_COUNT = 1; + private static final int DEFAULT_PREFETCH_COUNT = 0; private static final String NAME_KEY = "name"; private static final String VERSION_KEY = "version"; private static final String UNKNOWN = "UNKNOWN"; @@ -670,11 +670,13 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc /** * Sets the prefetch count of the processor. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link - * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. + * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 0. * * Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when * and before the application starts the processor. * Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off. + * Using a non-zero prefetch risks of losing messages even though it has better performance. + * @see Service Bus Prefetch * * @param prefetchCount The prefetch count. * @@ -1441,9 +1443,9 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { } private void validateAndThrow(int prefetchCount) { - if (prefetchCount < 1) { + if (prefetchCount < 0) { throw logger.logExceptionAsError(new IllegalArgumentException(String.format( - "prefetchCount (%s) cannot be less than 1.", prefetchCount))); + "prefetchCount (%s) cannot be less than 0.", prefetchCount))); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 28031e6ea4abd..2f80f3a4e13e6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -590,13 +590,14 @@ public void close() { private void queueWork(int maximumMessageCount, Duration maxWaitTime, FluxSink emitter) { final long id = idGenerator.getAndIncrement(); - final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter); - + int prefetch = asyncClient.getReceiverOptions().getPrefetchCount(); + int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount; + final SynchronousReceiveWork work = new SynchronousReceiveWork(id, + toRequest, + maxWaitTime, emitter); SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get(); if (messageSubscriber == null) { - long prefetch = asyncClient.getReceiverOptions().getPrefetchCount(); - SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work); - + SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(toRequest, work); if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) { newSubscriber.dispose(); SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java index 9b8532507d15f..6aeeb726dcce1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java @@ -86,6 +86,7 @@ void queueWork(SynchronousReceiveWork work) { logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(), work.getTimeout()); workQueue.add(work); + subscription.request(work.getNumberOfEvents()); // Do not drain if another thread want to queue the work before we have subscriber if (subscriberInitialized) { @@ -174,15 +175,6 @@ private void drainQueue() { currentTimeoutOperation.dispose(); } logger.verbose("The work [{}] is complete.", currentWork.getId()); - } else { - // Since this work is not complete, find out how much we should request from upstream - long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size()); - if (creditToAdd > 0) { - remaining.addAndGet(creditToAdd); - subscription.request(creditToAdd); - logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd, - currentWork.getId()); - } } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index 98d2eddd32e95..e8b80cff83b13 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -45,7 +45,6 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor linkName = new AtomicReference<>(); // Queue containing all the prefetched messages. @@ -199,12 +198,7 @@ public void onNext(ServiceBusReceiveLink next) { oldSubscription = currentLinkSubscriptions; currentLink = next; - next.setEmptyCreditListener(() -> { - final int creditsToAdd = getCreditsToAdd(0); - linkCreditsAdded.set(creditsToAdd > 0); - - return creditsToAdd; - }); + next.setEmptyCreditListener(() -> 0); currentLinkSubscriptions = Disposables.composite( next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> { @@ -449,6 +443,9 @@ private void drain() { try { drainQueue(); } finally { + if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured. + checkAndAddCredits(currentLink); + } if (wip.decrementAndGet() != 0) { logger.warning("There is another worker in drainLoop. But there should only be 1 worker."); } @@ -544,13 +541,7 @@ private void checkAndAddCredits(AmqpReceiveLink link) { return; } - // Credits have already been added to the link. We won't try again. - if (linkCreditsAdded.getAndSet(true)) { - return; - } - final int credits = getCreditsToAdd(link.getCredits()); - linkCreditsAdded.set(credits > 0); logger.info("Link credits to add. Credits: '{}'", credits); @@ -570,21 +561,28 @@ private int getCreditsToAdd(int linkCredits) { } final int creditsToAdd; - if (messageQueue.isEmpty() && !hasBackpressure) { - creditsToAdd = prefetch; + final int expectedTotalCredit; + if (prefetch == 0) { + if (r <= Integer.MAX_VALUE) { + expectedTotalCredit = (int) r; + } else { + expectedTotalCredit = Integer.MAX_VALUE; + } } else { - synchronized (queueLock) { - final int queuedMessages = pendingMessages.get(); - final int pending = queuedMessages + linkCredits; - - if (hasBackpressure) { - creditsToAdd = Math.max(Long.valueOf(r).intValue() - pending, 0); - } else { - // If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full. - creditsToAdd = minimumNumberOfMessages >= queuedMessages - ? Math.max(prefetch - pending, 1) - : 0; - } + expectedTotalCredit = prefetch; + } + + synchronized (queueLock) { + final int queuedMessages = pendingMessages.get(); + final int pending = queuedMessages + linkCredits; + + if (hasBackpressure) { + creditsToAdd = Math.max(expectedTotalCredit - pending, 0); + } else { + // If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full. + creditsToAdd = minimumNumberOfMessages >= queuedMessages + ? Math.max(expectedTotalCredit - pending, 0) + : 0; } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java index fae12997fb9c0..c1702f70ce763 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java @@ -203,7 +203,7 @@ void invalidPrefetch() { .receiveMode(ReceiveMode.PEEK_LOCK); // Act & Assert - assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(0)); + assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(-1)); } @MethodSource("getProxyConfigurations") From deadef5e144beb48cf493e529cb816471ea539bf Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 12 Nov 2020 21:59:23 -0800 Subject: [PATCH 02/31] Add API to block and flow link credit to the service. --- .../core/amqp/implementation/AmqpReceiveLink.java | 10 ++++++++++ .../core/amqp/implementation/ReactorReceiver.java | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java index 574e8db71673e..86cb9a524a779 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java @@ -34,6 +34,16 @@ public interface AmqpReceiveLink extends AmqpLink { */ void addCredits(int credits); + /** + * Adds the specified number of credits to the link and block until it's added to the service. + * + * The number of link credits initialises to zero. It is the application's responsibility to call this method to + * allow the receiver to receive {@code credits} more deliveries. + * + * @param credits Number of credits to add to the receive link. + */ + void addCreditsBlocking(int credits); + /** * Gets the current number of credits this link has. * diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 673d6deea1131..35e51bcdc6f02 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -107,6 +107,11 @@ public void addCredits(int credits) { } } + @Override + public void addCreditsBlocking(int credits) { + receiver.flow(credits); + } + @Override public int getCredits() { return receiver.getRemoteCredit(); From 945f1c875d87e27de464e84b9d1e450092c8521e Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 12 Nov 2020 22:01:24 -0800 Subject: [PATCH 03/31] Use addCreditsBlocking instead of addCredits to avoid too many credits added. --- .../implementation/ServiceBusReceiveLinkProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index e8b80cff83b13..2d1b4d87575f4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -443,9 +443,6 @@ private void drain() { try { drainQueue(); } finally { - if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured. - checkAndAddCredits(currentLink); - } if (wip.decrementAndGet() != 0) { logger.warning("There is another worker in drainLoop. But there should only be 1 worker."); } @@ -495,6 +492,9 @@ private void drainQueue() { if (receiveMode != ReceiveMode.PEEK_LOCK) { pendingMessages.decrementAndGet(); } + if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured. + checkAndAddCredits(currentLink); + } } catch (Exception e) { logger.error("Exception occurred while handling downstream onNext operation.", e); throw logger.logExceptionAsError(Exceptions.propagate( @@ -546,7 +546,7 @@ private void checkAndAddCredits(AmqpReceiveLink link) { logger.info("Link credits to add. Credits: '{}'", credits); if (credits > 0) { - link.addCredits(credits); + link.addCreditsBlocking(credits); } } From 98d07dd80a99c59a37624a873734b834e8a22e3a Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 13 Nov 2020 12:38:12 -0800 Subject: [PATCH 04/31] Request fewer credits in the 2nd request if the 1st request returns fewer messages than requested. (by Hemant) --- .../messaging/servicebus/SynchronousMessageSubscriber.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java index 6aeeb726dcce1..4148946be1648 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java @@ -57,8 +57,6 @@ protected void hookOnSubscribe(Subscription subscription) { if (Operators.setOnce(UPSTREAM, this, subscription)) { this.subscription = subscription; - remaining.addAndGet(requested); - subscription.request(requested); subscriberInitialized = true; drain(); } else { @@ -86,7 +84,6 @@ void queueWork(SynchronousReceiveWork work) { logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(), work.getTimeout()); workQueue.add(work); - subscription.request(work.getNumberOfEvents()); // Do not drain if another thread want to queue the work before we have subscriber if (subscriberInitialized) { @@ -141,7 +138,6 @@ private void drainQueue() { while ((currentWork = workQueue.peek()) != null && (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) { - // Additional check for safety, but normally this work should never be terminal if (currentWork.isTerminal()) { // This work already finished by either timeout or no more messages to send, process next work. @@ -156,6 +152,9 @@ private void drainQueue() { // timer to complete the currentWork in case of timeout trigger currentTimeoutOperation = getTimeoutOperation(currentWork); currentWork.startedProcessing(); + final long calculatedRequest = currentWork.getNumberOfEvents() - bufferMessages.size(); + remaining.addAndGet(calculatedRequest); + subscription.request(calculatedRequest); } // Send messages to currentWork from buffer From ef52f71ffcb9c2e8d118a600bae7777f3d67b0c7 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 13 Nov 2020 15:01:54 -0800 Subject: [PATCH 05/31] Rename addCreditsBlocking to addCreditsInstantly --- .../core/amqp/implementation/AmqpReceiveLink.java | 11 ++++++++--- .../core/amqp/implementation/ReactorReceiver.java | 2 +- .../ServiceBusReceiveLinkProcessor.java | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java index 86cb9a524a779..3fbbeded161d6 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpReceiveLink.java @@ -25,7 +25,7 @@ public interface AmqpReceiveLink extends AmqpLink { Flux receive(); /** - * Adds the specified number of credits to the link. + * Schedule to adds the specified number of credits to the link. * * The number of link credits initialises to zero. It is the application's responsibility to call this method to * allow the receiver to receive {@code credits} more deliveries. @@ -35,14 +35,19 @@ public interface AmqpReceiveLink extends AmqpLink { void addCredits(int credits); /** - * Adds the specified number of credits to the link and block until it's added to the service. + * Adds the specified number of credits to the link. * * The number of link credits initialises to zero. It is the application's responsibility to call this method to * allow the receiver to receive {@code credits} more deliveries. * + * It will update the credits in local memory instantly so {@link #getCredits()} will get + * the updated credits immediately. But the service side may get the credits added with a latency. + * As a contrast, {@link #getCredits()} may return an unchanged value for a short while after + * {@link #addCredits(int)} is called to schedule the credit addition and before the job dispatcher executes it. + * * @param credits Number of credits to add to the receive link. */ - void addCreditsBlocking(int credits); + void addCreditsInstantly(int credits); /** * Gets the current number of credits this link has. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 35e51bcdc6f02..e89bc72e06088 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -108,7 +108,7 @@ public void addCredits(int credits) { } @Override - public void addCreditsBlocking(int credits) { + public void addCreditsInstantly(int credits) { receiver.flow(credits); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index 2d1b4d87575f4..f6e8847749632 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -546,7 +546,7 @@ private void checkAndAddCredits(AmqpReceiveLink link) { logger.info("Link credits to add. Credits: '{}'", credits); if (credits > 0) { - link.addCreditsBlocking(credits); + link.addCreditsInstantly(credits); } } From 36c4302a1f9be24ae9ff13748d37852b00f39ff3 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 13 Nov 2020 15:02:30 -0800 Subject: [PATCH 06/31] Update amqp-core dependency version --- sdk/servicebus/azure-messaging-servicebus/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 0ac4ce29f8c4f..9937871495fec 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -52,7 +52,7 @@ com.azure azure-core-amqp - 1.7.0-beta.2 + 1.7.0-beta.3 com.azure From 1a5c4c438972e0db0ab0f22fc17f99f296926d68 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 13 Nov 2020 15:03:24 -0800 Subject: [PATCH 07/31] Use remaining instead of bufferMessages.size() to calculate num of requested. --- .../messaging/servicebus/SynchronousMessageSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java index 4148946be1648..c717ef0c75506 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SynchronousMessageSubscriber.java @@ -152,7 +152,7 @@ private void drainQueue() { // timer to complete the currentWork in case of timeout trigger currentTimeoutOperation = getTimeoutOperation(currentWork); currentWork.startedProcessing(); - final long calculatedRequest = currentWork.getNumberOfEvents() - bufferMessages.size(); + final long calculatedRequest = currentWork.getNumberOfEvents() - remaining.get(); remaining.addAndGet(calculatedRequest); subscription.request(calculatedRequest); } From d43457ead26f42eeaa898432797b8cfc16c7cedd Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Mon, 16 Nov 2020 11:57:00 -0800 Subject: [PATCH 08/31] Add backpressure in request for async receiveMessages() --- .../servicebus/ServiceBusReceiverAsyncClient.java | 11 +++++++++++ .../servicebus/ServiceBusReceiverClient.java | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 3c35ac976a07e..fdc44627ea1e9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -572,6 +572,17 @@ Flux peekMessagesAt(int maxMessages, long sequenceNum * @return An infinite stream of messages from the Service Bus entity. */ public Flux receiveMessages() { + // Without publish->autoConnect, if the user calls receiveMessages().subscribe(), it will call + // ServiceBusReceiveLinkProcessor.request(long request) where request = Long.MAX_VALUE. + // We turn this one-time non-backpressure request to continuous requests with backpressure. + // If receiverOptions.prefetchCount is set to non-zero, it will be passed to ServiceBusReceiveLinkProcessor + // to auto-refill the prefetched account. So here for publish(), using 1 as prefetch. 0 is even better than 1 + // but 0 is not allowed here. + return receiveMessagesNoConnect().publish(1) + .autoConnect(1).cast(ServiceBusReceivedMessage.class); + } + + Flux receiveMessagesNoConnect() { return receiveMessagesWithContext() .handle((serviceBusMessageContext, sink) -> { if (serviceBusMessageContext.hasError()) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 2f80f3a4e13e6..b9eec631a944b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -603,7 +603,7 @@ private void queueWork(int maximumMessageCount, Duration maxWaitTime, SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get(); existing.queueWork(work); } else { - asyncClient.receiveMessages().subscribeWith(newSubscriber); + asyncClient.receiveMessagesNoConnect().subscribeWith(newSubscriber); } } else { messageSubscriber.queueWork(work); From 34c70ce1a03abfe0890c72ae665bc5671c5b3459 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Mon, 16 Nov 2020 12:54:23 -0800 Subject: [PATCH 09/31] Use limitRate instead of autoConnect for back pressure. --- .../servicebus/ServiceBusReceiverAsyncClient.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index fdc44627ea1e9..58539d53a8440 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -572,14 +572,14 @@ Flux peekMessagesAt(int maxMessages, long sequenceNum * @return An infinite stream of messages from the Service Bus entity. */ public Flux receiveMessages() { - // Without publish->autoConnect, if the user calls receiveMessages().subscribe(), it will call + // Without limitRate(), if the user calls receiveMessages().subscribe(), it will call // ServiceBusReceiveLinkProcessor.request(long request) where request = Long.MAX_VALUE. // We turn this one-time non-backpressure request to continuous requests with backpressure. // If receiverOptions.prefetchCount is set to non-zero, it will be passed to ServiceBusReceiveLinkProcessor - // to auto-refill the prefetched account. So here for publish(), using 1 as prefetch. 0 is even better than 1 - // but 0 is not allowed here. - return receiveMessagesNoConnect().publish(1) - .autoConnect(1).cast(ServiceBusReceivedMessage.class); + // to auto-refill the prefetch buffer. A request will retrieve one message from this buffer. + // If receiverOptions.prefetchCount is 0 (default value), + // the request will add a link credit so one message is retrieved from the service. + return receiveMessagesNoConnect().limitRate(1); } Flux receiveMessagesNoConnect() { From e7aa8bcea866d35fd43b2cd3f7652b03469f4366 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Mon, 16 Nov 2020 13:38:59 -0800 Subject: [PATCH 10/31] rename receiveMessagesNoConnection to receiveMessagesNoBackPressure --- .../messaging/servicebus/ServiceBusReceiverAsyncClient.java | 4 ++-- .../azure/messaging/servicebus/ServiceBusReceiverClient.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 58539d53a8440..8949df9cc7b16 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -579,10 +579,10 @@ public Flux receiveMessages() { // to auto-refill the prefetch buffer. A request will retrieve one message from this buffer. // If receiverOptions.prefetchCount is 0 (default value), // the request will add a link credit so one message is retrieved from the service. - return receiveMessagesNoConnect().limitRate(1); + return receiveMessagesNoBackPressure().limitRate(1); } - Flux receiveMessagesNoConnect() { + Flux receiveMessagesNoBackPressure() { return receiveMessagesWithContext() .handle((serviceBusMessageContext, sink) -> { if (serviceBusMessageContext.hasError()) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index b9eec631a944b..8ece264f64767 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -603,7 +603,7 @@ private void queueWork(int maximumMessageCount, Duration maxWaitTime, SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get(); existing.queueWork(work); } else { - asyncClient.receiveMessagesNoConnect().subscribeWith(newSubscriber); + asyncClient.receiveMessagesNoBackPressure().subscribeWith(newSubscriber); } } else { messageSubscriber.queueWork(work); From 152e193bff7ff1a2bbe68954a823cd4cb5a902b8 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Mon, 16 Nov 2020 16:32:41 -0800 Subject: [PATCH 11/31] Add back pressure, adjust link credits using prefetch and request size for session receiver. --- .../servicebus/ServiceBusSessionReceiver.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index 6915530e3b66b..59cb91fb89245 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -63,15 +63,24 @@ class ServiceBusSessionReceiver implements AutoCloseable { this.receiveLink = receiveLink; this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT); - receiveLink.setEmptyCreditListener(() -> 1); + receiveLink.setEmptyCreditListener(() -> 0); final Flux receivedMessagesFlux = receiveLink .receive() .publishOn(scheduler) .doOnSubscribe(subscription -> { logger.verbose("Adding prefetch to receive link."); - receiveLink.addCredits(prefetch); + receiveLink.addCreditsInstantly(prefetch); }) + .doOnRequest(request -> { // request is of type long. + if (prefetch == 0) { // add "request" number of credits + receiveLink.addCreditsInstantly((int) request); + } else { // keep total credits "prefetch" if prefetch is not 0. + receiveLink.addCreditsInstantly(Math.max(0, prefetch - receiveLink.getCredits())); + } + }) + .limitRate(1) // One request at a time so link credit is added one by one + // if no prefetch in doOnRequest above. .takeUntilOther(cancelReceiveProcessor) .map(message -> { final ServiceBusReceivedMessage deserialized = messageSerializer.deserialize(message, From 65cdcf6a0382f996f433fd8c47ba869cecc10d3c Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 01:48:04 -0800 Subject: [PATCH 12/31] Fix unit test --- .../ServiceBusReceiverAsyncClientTest.java | 15 +++--- .../ServiceBusReceiverClientTest.java | 8 +-- .../ServiceBusReceiveLinkProcessorTest.java | 51 +++++++------------ 3 files changed, 30 insertions(+), 44 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index e3d0876fe2d38..9ae2cc8ce693a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -286,7 +286,7 @@ void receivesNumberOfEvents() { .expectNextCount(numberOfEvents) .verifyComplete(); - verify(amqpReceiveLink).addCredits(PREFETCH); + verify(amqpReceiveLink).addCreditsInstantly(PREFETCH); verify(amqpReceiveLink, never()).updateDisposition(eq(lockToken), any()); } @@ -844,12 +844,15 @@ void canPerformMultipleReceive() { .expectNextCount(numberOfEvents) .verifyComplete(); - StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) - .then(() -> messages.forEach(m -> messageSink.next(m))) - .expectNextCount(numberOfEvents) - .verifyComplete(); + // TODO: Do we need to support multiple calls of receiver.receiveMessages()? + // After the autoConnect was removed from ServiceBusAsyncConsumer.processor, the receiver doesn't support + // multiple calls of receiver.receiveMessages(). +// StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) +// .then(() -> messages.forEach(m -> messageSink.next(m))) +// .expectNextCount(numberOfEvents) +// .verifyComplete(); - verify(amqpReceiveLink).addCredits(PREFETCH); + verify(amqpReceiveLink).addCreditsInstantly(PREFETCH); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index 5f9c900459b91..78200ab092a08 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -80,7 +80,7 @@ void setup() { MockitoAnnotations.initMocks(this); when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); - when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, null, false)); + when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 0, null, false)); when(sessionReceiverOptions.getSessionId()).thenReturn(SESSION_ID); client = new ServiceBusReceiverClient(asyncClient, OPERATION_TIMEOUT); } @@ -658,7 +658,7 @@ void receiveMessagesWithUserSpecifiedTimeout() { sink.complete(); }); }); - when(asyncClient.receiveMessages()).thenReturn(messageSink); + when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink); // Act final IterableStream actual = client.receiveMessages(maxMessages, receiveTimeout); @@ -704,7 +704,7 @@ void receiveMessagesMax() { }); }); - when(asyncClient.receiveMessages()).thenReturn(messageSink); + when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink); // Act final IterableStream actual = client.receiveMessages(maxMessages); @@ -750,7 +750,7 @@ void receiveMessagesTimeout() { sink.complete(); }); }); - when(asyncClient.receiveMessages()).thenReturn(messageSink); + when(asyncClient.receiveMessagesNoBackPressure()).thenReturn(messageSink); // Act final IterableStream actual = client.receiveMessages(maxMessages); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index 237638dd18d0c..a681e59b0dd22 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -77,6 +77,7 @@ class ServiceBusReceiveLinkProcessorTest { private final EmitterProcessor messageProcessor = EmitterProcessor.create(); private final FluxSink messageProcessorSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); private ServiceBusReceiveLinkProcessor linkProcessor; + private ServiceBusReceiveLinkProcessor linkProcessorNoPrefetch; @BeforeAll static void beforeAll() { @@ -93,6 +94,7 @@ void setup() { MockitoAnnotations.initMocks(this); linkProcessor = new ServiceBusReceiveLinkProcessor(PREFETCH, retryPolicy, ReceiveMode.PEEK_LOCK); + linkProcessorNoPrefetch = new ServiceBusReceiveLinkProcessor(0, retryPolicy, ReceiveMode.PEEK_LOCK); when(link1.getEndpointStates()).thenReturn(endpointProcessor); when(link1.receive()).thenReturn(messageProcessor); @@ -139,16 +141,7 @@ void createNewLink() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCredits(eq(PREFETCH)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); - - Supplier value = creditSupplierCaptor.getValue(); - assertNotNull(value); - - final Integer creditValue = value.get(); - // Expecting 5 because it is Long.MAX_VALUE and there are no credits (since we invoked the empty credit - // listener). - assertEquals(PREFETCH, creditValue); + verify(link1).addCreditsInstantly(eq(PREFETCH - 1)); } /** @@ -160,7 +153,7 @@ void respectsBackpressureInRange() { final int backpressure = 15; // Because one message was emitted. ServiceBusReceiveLinkProcessor processor = Flux.create(sink -> sink.next(link1)) - .subscribeWith(linkProcessor); + .subscribeWith(linkProcessorNoPrefetch); // Act & Assert StepVerifier.create(processor, backpressure) @@ -169,15 +162,7 @@ void respectsBackpressureInRange() { .thenCancel() .verify(); - verify(link1).addCredits(eq(backpressure)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); - - Supplier value = creditSupplierCaptor.getValue(); - assertNotNull(value); - - final Integer creditValue = value.get(); - final int emittedOne = backpressure - 1; - assertTrue(creditValue == emittedOne || creditValue == backpressure); + verify(link1).addCreditsInstantly(backpressure); // request up to PREFETCH } /** @@ -534,7 +519,7 @@ void receivesUntilFirstLinkClosed() { ServiceBusReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); FluxSink sink = endpointProcessor.sink(); - when(link1.getCredits()).thenReturn(1); + when(link1.getCredits()).thenReturn(0); // Act & Assert StepVerifier.create(processor) @@ -553,16 +538,15 @@ void receivesUntilFirstLinkClosed() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCredits(eq(PREFETCH)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); + verify(link1).addCreditsInstantly(eq(PREFETCH)); + verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); // Add 0 Supplier value = creditSupplierCaptor.getValue(); assertNotNull(value); final Integer creditValue = value.get(); - // Expecting 5 because it is Long.MAX_VALUE and there are no credits (since we invoked the empty credit - // listener). - assertEquals(PREFETCH, creditValue); + + assertEquals(0, creditValue); } @Test @@ -571,7 +555,7 @@ void receivesFromFirstLink() { ServiceBusReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); FluxSink sink = endpointProcessor.sink(); - when(link1.getCredits()).thenReturn(1); + when(link1.getCredits()).thenReturn(0); // Act & Assert StepVerifier.create(processor) @@ -589,16 +573,15 @@ void receivesFromFirstLink() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCredits(eq(PREFETCH)); - verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); + verify(link1).addCreditsInstantly(eq(PREFETCH)); + verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); // Add 0. Supplier value = creditSupplierCaptor.getValue(); assertNotNull(value); final Integer creditValue = value.get(); - // Expecting 5 because it is Long.MAX_VALUE and there are no credits (since we invoked the empty credit - // listener). - assertEquals(PREFETCH, creditValue); + + assertEquals(0, creditValue); } /** @@ -608,7 +591,7 @@ void receivesFromFirstLink() { @Test void backpressureRequestOnlyEmitsThatAmount() { // Arrange - final int backpressure = 10; + final int backpressure = PREFETCH; final int existingCredits = 1; final int expectedCredits = backpressure - existingCredits; ServiceBusReceiveLinkProcessor processor = Flux.just(link1).subscribeWith(linkProcessor); @@ -634,7 +617,7 @@ void backpressureRequestOnlyEmitsThatAmount() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCredits(expectedCredits); + verify(link1).addCreditsInstantly(expectedCredits); verify(link1).setEmptyCreditListener(any()); } From 41d08ab090d8d66d19d58c682ad09b589ae57337 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 10:11:12 -0800 Subject: [PATCH 13/31] Small change (add final to variable) --- .../azure/messaging/servicebus/ServiceBusReceiverClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 40a92305dd44e..cea407eec1adc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -590,8 +590,8 @@ public void close() { private void queueWork(int maximumMessageCount, Duration maxWaitTime, FluxSink emitter) { final long id = idGenerator.getAndIncrement(); - int prefetch = asyncClient.getReceiverOptions().getPrefetchCount(); - int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount; + final int prefetch = asyncClient.getReceiverOptions().getPrefetchCount(); + final int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount; final SynchronousReceiveWork work = new SynchronousReceiveWork(id, toRequest, maxWaitTime, emitter); From 124baabb430f4908388bad5a4b3652e41e30920c Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 10:11:56 -0800 Subject: [PATCH 14/31] Add unreleased core-amqp --- eng/versioning/version_client.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 8a495ac4cb4e4..cf294706a9199 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -177,7 +177,8 @@ com.microsoft:microsoft-opentelemetry-exporter-azuremonitor;1.0.0-beta.1;1.0.0-b # unreleased_:;dependency-version # note: The unreleased dependencies will not be manipulated with the automatic PR creation code. unreleased_com.azure:azure-core-experimental;1.0.0-beta.9 -unreleased_com.azure:azure-messaging-servicebus;7.0.0-beta.7 +unreleased_com.azure:azure-messaging-servicebus;7.0.0-beta.8 +unreleased_com.azure:azure-core-amqp;1.7.0-beta.3 # Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current # version and set the version to the released beta. Released beta dependencies are only valid From 5743dcba32b2057760a1e434faefcea5a5cc899d Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 12:46:31 -0800 Subject: [PATCH 15/31] SessionReceiver uses reactor prefetch 1 instead of default 256 --- .../messaging/servicebus/ServiceBusSessionManager.java | 2 +- .../messaging/servicebus/ServiceBusSessionReceiver.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index edba47d8a1e4f..f7bb7f9c133db 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -309,7 +309,7 @@ private Flux getSession(Scheduler scheduler, boolean d onSessionRequest(1L); } })) - .publishOn(scheduler); + .publishOn(scheduler, 1); } private Mono getManagementNode() { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index 59cb91fb89245..193c829a241b8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -70,7 +70,9 @@ class ServiceBusSessionReceiver implements AutoCloseable { .publishOn(scheduler) .doOnSubscribe(subscription -> { logger.verbose("Adding prefetch to receive link."); - receiveLink.addCreditsInstantly(prefetch); + if (prefetch > 0) { + receiveLink.addCreditsInstantly(prefetch); + } }) .doOnRequest(request -> { // request is of type long. if (prefetch == 0) { // add "request" number of credits @@ -79,8 +81,6 @@ class ServiceBusSessionReceiver implements AutoCloseable { receiveLink.addCreditsInstantly(Math.max(0, prefetch - receiveLink.getCredits())); } }) - .limitRate(1) // One request at a time so link credit is added one by one - // if no prefetch in doOnRequest above. .takeUntilOther(cancelReceiveProcessor) .map(message -> { final ServiceBusReceivedMessage deserialized = messageSerializer.deserialize(message, From 8986b2fb14b4f57778fde7224f0e4fe01b5cea61 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 14:33:18 -0800 Subject: [PATCH 16/31] Dispose SynchronousMessageSubscriber before closing async client in sync client's close() --- .../azure/messaging/servicebus/ServiceBusReceiverClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index cea407eec1adc..3b24ee01a1146 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -575,12 +575,12 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext) */ @Override public void close() { - asyncClient.close(); - SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null); if (messageSubscriber != null && !messageSubscriber.isDisposed()) { messageSubscriber.dispose(); } + + asyncClient.close(); } /** From 39bac24f39c5f3c5ef04adacac103e7f9c69d7f0 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 17:45:45 -0800 Subject: [PATCH 17/31] Prefetch 1 in instead fo default 256 --- .../azure/messaging/servicebus/ServiceBusProcessorClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 883de4a336c0c..c35a14ccbccb3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -150,8 +150,8 @@ private synchronized void receiveMessages() { } ServiceBusReceiverAsyncClient receiverClient = asyncClient.get(); receiverClient.receiveMessagesWithContext() - .parallel(processorOptions.getMaxConcurrentCalls()) - .runOn(Schedulers.boundedElastic()) + .parallel(processorOptions.getMaxConcurrentCalls(), 1) + .runOn(Schedulers.boundedElastic(), 1) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) { From 5572e15137c73ad71ba9909bdbeed95ff0009f21 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 17:46:46 -0800 Subject: [PATCH 18/31] Add some code comments --- .../implementation/ServiceBusReceiveLinkProcessor.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index 770ab382d19a8..e6842e4595d98 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -567,6 +567,13 @@ private int getCreditsToAdd(int linkCredits) { if (r <= Integer.MAX_VALUE) { expectedTotalCredit = (int) r; } else { + //This won't really happen in reality. + //For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);". + //So it will request one by one from this link processor, even though the user's request has no + //back pressure. + //For sync client, the sync subscriber has back pressure. + //The request count uses the the argument of method receiveMessages(int maxMessages). + //It's at most Integer.MAX_VALUE. expectedTotalCredit = Integer.MAX_VALUE; } } else { From 530e258ba4057b366a852c743da4b13fbc26716b Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 18 Nov 2020 17:51:44 -0800 Subject: [PATCH 19/31] set low tide 0 in limitRate() to disable replenish --- .../messaging/servicebus/ServiceBusReceiverAsyncClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 9ef7775315b0e..251f9f9ff856e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -580,7 +580,7 @@ public Flux receiveMessages() { // to auto-refill the prefetch buffer. A request will retrieve one message from this buffer. // If receiverOptions.prefetchCount is 0 (default value), // the request will add a link credit so one message is retrieved from the service. - return receiveMessagesNoBackPressure().limitRate(1); + return receiveMessagesNoBackPressure().limitRate(1, 0); } Flux receiveMessagesNoBackPressure() { From aee463cec1e1e55aeb4673c9324041d968f4d98a Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 19 Nov 2020 00:25:46 -0800 Subject: [PATCH 20/31] use limitRate in async client and remove the limit in processor --- .../servicebus/ServiceBusProcessorClient.java | 4 ++-- .../ServiceBusReceiverAsyncClient.java | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index c35a14ccbccb3..883de4a336c0c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -150,8 +150,8 @@ private synchronized void receiveMessages() { } ServiceBusReceiverAsyncClient receiverClient = asyncClient.get(); receiverClient.receiveMessagesWithContext() - .parallel(processorOptions.getMaxConcurrentCalls(), 1) - .runOn(Schedulers.boundedElastic(), 1) + .parallel(processorOptions.getMaxConcurrentCalls()) + .runOn(Schedulers.boundedElastic()) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription subscription) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 251f9f9ff856e..70920dbedb632 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -584,7 +584,7 @@ public Flux receiveMessages() { } Flux receiveMessagesNoBackPressure() { - return receiveMessagesWithContext() + return receiveMessagesWithContext(0) .handle((serviceBusMessageContext, sink) -> { if (serviceBusMessageContext.hasError()) { sink.error(serviceBusMessageContext.getThrowable()); @@ -609,6 +609,10 @@ Flux receiveMessagesNoBackPressure() { * @return An infinite stream of messages from the Service Bus entity. */ Flux receiveMessagesWithContext() { + return receiveMessagesWithContext(1); + } + + Flux receiveMessagesWithContext(int highTide) { final Flux messageFlux = sessionManager != null ? sessionManager.receive() : getOrCreateConsumer().receive().map(ServiceBusMessageContext::new); @@ -621,16 +625,19 @@ Flux receiveMessagesWithContext() { withAutoLockRenewal = messageFlux; } - final Flux withAutoComplete; + Flux result; if (receiverOptions.isEnableAutoComplete()) { - withAutoComplete = new FluxAutoComplete(withAutoLockRenewal, completionLock, + result = new FluxAutoComplete(withAutoLockRenewal, completionLock, context -> context.getMessage() != null ? complete(context.getMessage()) : Mono.empty(), context -> context.getMessage() != null ? abandon(context.getMessage()) : Mono.empty()); } else { - withAutoComplete = withAutoLockRenewal; + result = withAutoLockRenewal; } - return withAutoComplete + if (highTide > 0) { + result = result.limitRate(highTide, 0); + } + return result .onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE)); } From bafc0e47dc93df26e4ae2cdbe2a7b64e5f8de990 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 19 Nov 2020 12:38:38 -0800 Subject: [PATCH 21/31] autoConnect in receiveMessages() so it can be subscribed multiple times. --- .../messaging/servicebus/ServiceBusReceiverAsyncClient.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 70920dbedb632..a9805396053f3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -108,6 +108,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { // Starting at -1 because that is before the beginning of the stream. private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1); private final AtomicReference consumer = new AtomicReference<>(); + private final AtomicReference> receiveMessagesFlux = new AtomicReference<>(); /** * Creates a receiver that listens to a Service Bus resource. @@ -580,7 +581,10 @@ public Flux receiveMessages() { // to auto-refill the prefetch buffer. A request will retrieve one message from this buffer. // If receiverOptions.prefetchCount is 0 (default value), // the request will add a link credit so one message is retrieved from the service. - return receiveMessagesNoBackPressure().limitRate(1, 0); + receiveMessagesFlux.compareAndSet(null, receiveMessagesNoBackPressure().limitRate(1, 0) + .publish(1) + .autoConnect(1)); + return receiveMessagesFlux.get(); } Flux receiveMessagesNoBackPressure() { From 037cbbcd575ff91a8dca8dd9dae1bd0a365a177f Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 19 Nov 2020 12:41:13 -0800 Subject: [PATCH 22/31] Enable subscribe twice test. --- .../servicebus/ServiceBusReceiverAsyncClientTest.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 521d61d1070d9..516fa4f0efc0f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -849,13 +849,10 @@ void canPerformMultipleReceive() { .expectNextCount(numberOfEvents) .verifyComplete(); - // TODO: Do we need to support multiple calls of receiver.receiveMessages()? - // After the autoConnect was removed from ServiceBusAsyncConsumer.processor, the receiver doesn't support - // multiple calls of receiver.receiveMessages(). -// StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) -// .then(() -> messages.forEach(m -> messageSink.next(m))) -// .expectNextCount(numberOfEvents) -// .verifyComplete(); + StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) + .then(() -> messages.forEach(m -> messageSink.next(m))) + .expectNextCount(numberOfEvents) + .verifyComplete(); verify(amqpReceiveLink).addCreditsInstantly(PREFETCH); } From 09fca43acc2f2b2dcfb4db8eab25bf8c9c2ecadf Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 19 Nov 2020 19:35:59 -0800 Subject: [PATCH 23/31] Use publish / autoConnect to support multiple subscribers. --- .../ServiceBusReceiverAsyncClient.java | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index a9805396053f3..70e0391b13ac1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -108,7 +108,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { // Starting at -1 because that is before the beginning of the stream. private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1); private final AtomicReference consumer = new AtomicReference<>(); - private final AtomicReference> receiveMessagesFlux = new AtomicReference<>(); + private final AtomicReference> receiveMessagesContextFlux = new AtomicReference<>(); /** * Creates a receiver that listens to a Service Bus resource. @@ -581,14 +581,14 @@ public Flux receiveMessages() { // to auto-refill the prefetch buffer. A request will retrieve one message from this buffer. // If receiverOptions.prefetchCount is 0 (default value), // the request will add a link credit so one message is retrieved from the service. - receiveMessagesFlux.compareAndSet(null, receiveMessagesNoBackPressure().limitRate(1, 0) - .publish(1) - .autoConnect(1)); - return receiveMessagesFlux.get(); + return receiveMessagesNoBackPressure(true).limitRate(1, 0); } Flux receiveMessagesNoBackPressure() { - return receiveMessagesWithContext(0) + return receiveMessagesNoBackPressure(false); + } + Flux receiveMessagesNoBackPressure(boolean autoConnect) { + return receiveMessagesWithContext(0, autoConnect) .handle((serviceBusMessageContext, sink) -> { if (serviceBusMessageContext.hasError()) { sink.error(serviceBusMessageContext.getThrowable()); @@ -613,14 +613,27 @@ Flux receiveMessagesNoBackPressure() { * @return An infinite stream of messages from the Service Bus entity. */ Flux receiveMessagesWithContext() { - return receiveMessagesWithContext(1); + return receiveMessagesWithContext(1, true); } - Flux receiveMessagesWithContext(int highTide) { - final Flux messageFlux = sessionManager != null - ? sessionManager.receive() - : getOrCreateConsumer().receive().map(ServiceBusMessageContext::new); - + Flux receiveMessagesWithContext(int highTide, boolean autoConnect) { + final Flux messageFlux; + if (sessionManager != null) { + messageFlux = sessionManager.receive(); + } else { + if (receiveMessagesContextFlux.get() == null) { + Flux consumerFlux = getOrCreateConsumer().receive(); + if (autoConnect) { + // In PEEK_LOCK mode, prefetch 1 may cause receiving stuck. 2 works fine and replenish is also 1. + final int publishPefetch = receiverOptions.getReceiveMode() == + ServiceBusReceiveMode.PEEK_LOCK ? 2 : 1; + consumerFlux = consumerFlux.publish(publishPefetch).autoConnect(1); + } + Flux contextFlux = consumerFlux.map(ServiceBusMessageContext::new); + receiveMessagesContextFlux.compareAndSet(null, contextFlux); + } + messageFlux = receiveMessagesContextFlux.get(); + } final Flux withAutoLockRenewal; if (receiverOptions.isAutoLockRenewEnabled()) { withAutoLockRenewal = new FluxAutoLockRenew(messageFlux, receiverOptions.getMaxLockRenewDuration(), From fe7ee04541be8b3c11664899092dc5275db91513 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 19 Nov 2020 20:38:18 -0800 Subject: [PATCH 24/31] put addCreditInstantly in synchronized block. --- .../implementation/ServiceBusReceiveLinkProcessor.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index e6842e4595d98..f4e41cb4753c3 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -542,12 +542,14 @@ private void checkAndAddCredits(AmqpReceiveLink link) { return; } - final int credits = getCreditsToAdd(link.getCredits()); + synchronized (lock) { + final int credits = getCreditsToAdd(link.getCredits()); - logger.info("Link credits to add. Credits: '{}'", credits); + logger.info("Link credits to add. Credits: '{}'", credits); - if (credits > 0) { - link.addCreditsInstantly(credits); + if (credits > 0) { + link.addCreditsInstantly(credits); + } } } From 59aee7bd5168ba4eff9e84dd4126eb701be4f730 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Thu, 19 Nov 2020 20:53:47 -0800 Subject: [PATCH 25/31] Format change for checkstyle --- .../messaging/servicebus/ServiceBusReceiverAsyncClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 70e0391b13ac1..392656165310f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -625,8 +625,8 @@ Flux receiveMessagesWithContext(int highTide, boolean Flux consumerFlux = getOrCreateConsumer().receive(); if (autoConnect) { // In PEEK_LOCK mode, prefetch 1 may cause receiving stuck. 2 works fine and replenish is also 1. - final int publishPefetch = receiverOptions.getReceiveMode() == - ServiceBusReceiveMode.PEEK_LOCK ? 2 : 1; + final int publishPefetch = receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK + ? 2 : 1; consumerFlux = consumerFlux.publish(publishPefetch).autoConnect(1); } Flux contextFlux = consumerFlux.map(ServiceBusMessageContext::new); From 8ea5ba65832f8952314706c68f59cb44df70719b Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 20 Nov 2020 02:32:33 -0800 Subject: [PATCH 26/31] Use addCredits instead of addCreditsInstantly. --- .../messaging/servicebus/ServiceBusSessionReceiver.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java index 193c829a241b8..05ea4d355fde8 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiver.java @@ -71,14 +71,14 @@ class ServiceBusSessionReceiver implements AutoCloseable { .doOnSubscribe(subscription -> { logger.verbose("Adding prefetch to receive link."); if (prefetch > 0) { - receiveLink.addCreditsInstantly(prefetch); + receiveLink.addCredits(prefetch); } }) .doOnRequest(request -> { // request is of type long. if (prefetch == 0) { // add "request" number of credits - receiveLink.addCreditsInstantly((int) request); + receiveLink.addCredits((int) request); } else { // keep total credits "prefetch" if prefetch is not 0. - receiveLink.addCreditsInstantly(Math.max(0, prefetch - receiveLink.getCredits())); + receiveLink.addCredits(Math.max(0, prefetch - receiveLink.getCredits())); } }) .takeUntilOther(cancelReceiveProcessor) From 24e34217562cdb82e2b8f8f7916bb1f8dd395d62 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 20 Nov 2020 02:34:02 -0800 Subject: [PATCH 27/31] Use addCredits instead of addCreditsInstantly --- .../ServiceBusReceiveLinkProcessor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index f4e41cb4753c3..c103a4186027f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -543,12 +543,12 @@ private void checkAndAddCredits(AmqpReceiveLink link) { } synchronized (lock) { - final int credits = getCreditsToAdd(link.getCredits()); - - logger.info("Link credits to add. Credits: '{}'", credits); + final int linkCredits = link.getCredits(); + final int credits = getCreditsToAdd(linkCredits); + logger.info("Link credits='{}', Link credits to add: '{}'", linkCredits, credits); if (credits > 0) { - link.addCreditsInstantly(credits); + link.addCredits(credits); } } } @@ -581,6 +581,7 @@ private int getCreditsToAdd(int linkCredits) { } else { expectedTotalCredit = prefetch; } + logger.info("linkCredits: '{}', expectedTotalCredit: '{}'", linkCredits, expectedTotalCredit); synchronized (queueLock) { final int queuedMessages = pendingMessages.get(); @@ -594,6 +595,9 @@ private int getCreditsToAdd(int linkCredits) { ? Math.max(expectedTotalCredit - pending, 0) : 0; } + logger.info("prefetch: '{}', requested: '{}', linkCredits: '{}', expectedTotalCredit: '{}', queuedMessages:" + + "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'" , getPrefetch(), r, linkCredits, + expectedTotalCredit, queuedMessages, creditsToAdd, messageQueue.size()); } return creditsToAdd; From de311a8ae5fdab3b1fdbe15e02a9287a0c3f24d9 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 20 Nov 2020 02:35:03 -0800 Subject: [PATCH 28/31] Remove autoConnect --- .../ServiceBusReceiverAsyncClient.java | 33 +++++-------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 392656165310f..70920dbedb632 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -108,7 +108,6 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { // Starting at -1 because that is before the beginning of the stream. private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1); private final AtomicReference consumer = new AtomicReference<>(); - private final AtomicReference> receiveMessagesContextFlux = new AtomicReference<>(); /** * Creates a receiver that listens to a Service Bus resource. @@ -581,14 +580,11 @@ public Flux receiveMessages() { // to auto-refill the prefetch buffer. A request will retrieve one message from this buffer. // If receiverOptions.prefetchCount is 0 (default value), // the request will add a link credit so one message is retrieved from the service. - return receiveMessagesNoBackPressure(true).limitRate(1, 0); + return receiveMessagesNoBackPressure().limitRate(1, 0); } Flux receiveMessagesNoBackPressure() { - return receiveMessagesNoBackPressure(false); - } - Flux receiveMessagesNoBackPressure(boolean autoConnect) { - return receiveMessagesWithContext(0, autoConnect) + return receiveMessagesWithContext(0) .handle((serviceBusMessageContext, sink) -> { if (serviceBusMessageContext.hasError()) { sink.error(serviceBusMessageContext.getThrowable()); @@ -613,27 +609,14 @@ Flux receiveMessagesNoBackPressure(boolean autoConnec * @return An infinite stream of messages from the Service Bus entity. */ Flux receiveMessagesWithContext() { - return receiveMessagesWithContext(1, true); + return receiveMessagesWithContext(1); } - Flux receiveMessagesWithContext(int highTide, boolean autoConnect) { - final Flux messageFlux; - if (sessionManager != null) { - messageFlux = sessionManager.receive(); - } else { - if (receiveMessagesContextFlux.get() == null) { - Flux consumerFlux = getOrCreateConsumer().receive(); - if (autoConnect) { - // In PEEK_LOCK mode, prefetch 1 may cause receiving stuck. 2 works fine and replenish is also 1. - final int publishPefetch = receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK - ? 2 : 1; - consumerFlux = consumerFlux.publish(publishPefetch).autoConnect(1); - } - Flux contextFlux = consumerFlux.map(ServiceBusMessageContext::new); - receiveMessagesContextFlux.compareAndSet(null, contextFlux); - } - messageFlux = receiveMessagesContextFlux.get(); - } + Flux receiveMessagesWithContext(int highTide) { + final Flux messageFlux = sessionManager != null + ? sessionManager.receive() + : getOrCreateConsumer().receive().map(ServiceBusMessageContext::new); + final Flux withAutoLockRenewal; if (receiverOptions.isAutoLockRenewEnabled()) { withAutoLockRenewal = new FluxAutoLockRenew(messageFlux, receiverOptions.getMaxLockRenewDuration(), From 29085d3d5e59bb1397e36bab521ff0010e9a772e Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 20 Nov 2020 02:47:48 -0800 Subject: [PATCH 29/31] Remove test case that subscribe receiveMessages() twice. --- .../ServiceBusReceiverAsyncClientTest.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 516fa4f0efc0f..ef7e7effe190f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -287,7 +287,7 @@ void receivesNumberOfEvents() { .expectNextCount(numberOfEvents) .verifyComplete(); - verify(amqpReceiveLink).addCreditsInstantly(PREFETCH); + verify(amqpReceiveLink).addCredits(PREFETCH); verify(amqpReceiveLink, never()).updateDisposition(eq(lockToken), any()); } @@ -849,12 +849,17 @@ void canPerformMultipleReceive() { .expectNextCount(numberOfEvents) .verifyComplete(); - StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) - .then(() -> messages.forEach(m -> messageSink.next(m))) - .expectNextCount(numberOfEvents) - .verifyComplete(); - - verify(amqpReceiveLink).addCreditsInstantly(PREFETCH); + // TODO: Yijun and Srikanta are thinking of using two links for two subscribers. + // We may not want to support multiple subscribers by using publish and autoConnect. + // After the autoConnect was removed from ServiceBusAsyncConsumer.processor, the receiver doesn't support + // multiple calls of receiver.receiveMessages(). + // For more discussions. +// StepVerifier.create(receiver.receiveMessages().take(numberOfEvents)) +// .then(() -> messages.forEach(m -> messageSink.next(m))) +// .expectNextCount(numberOfEvents) +// .verifyComplete(); + + verify(amqpReceiveLink).addCredits(PREFETCH); } /** From 4c837b5eac822a6ad19cbcbcf6f471b4d81bcecb Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 20 Nov 2020 03:19:16 -0800 Subject: [PATCH 30/31] Use addCredits instead of addCreditsInstantly (update test) --- .../ServiceBusReceiveLinkProcessorTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java index a5013622a9b4f..c4a7e46b8fc2f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java @@ -141,7 +141,7 @@ void createNewLink() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCreditsInstantly(eq(PREFETCH - 1)); + verify(link1).addCredits(eq(PREFETCH - 1)); } /** @@ -162,7 +162,7 @@ void respectsBackpressureInRange() { .thenCancel() .verify(); - verify(link1).addCreditsInstantly(backpressure); // request up to PREFETCH + verify(link1).addCredits(backpressure); // request up to PREFETCH } /** @@ -538,7 +538,7 @@ void receivesUntilFirstLinkClosed() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCreditsInstantly(eq(PREFETCH)); + verify(link1).addCredits(eq(PREFETCH)); verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); // Add 0 Supplier value = creditSupplierCaptor.getValue(); @@ -573,7 +573,7 @@ void receivesFromFirstLink() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCreditsInstantly(eq(PREFETCH)); + verify(link1).addCredits(eq(PREFETCH)); verify(link1).setEmptyCreditListener(creditSupplierCaptor.capture()); // Add 0. Supplier value = creditSupplierCaptor.getValue(); @@ -617,7 +617,7 @@ void backpressureRequestOnlyEmitsThatAmount() { assertFalse(processor.hasError()); assertNull(processor.getError()); - verify(link1).addCreditsInstantly(expectedCredits); + verify(link1).addCredits(expectedCredits); verify(link1).setEmptyCreditListener(any()); } From 3c235427004215e7084300c9ec7d778db9206da4 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Fri, 20 Nov 2020 03:49:23 -0800 Subject: [PATCH 31/31] Checkstyle --- .../implementation/ServiceBusReceiveLinkProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java index c103a4186027f..07e89682e1e0b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessor.java @@ -596,7 +596,7 @@ private int getCreditsToAdd(int linkCredits) { : 0; } logger.info("prefetch: '{}', requested: '{}', linkCredits: '{}', expectedTotalCredit: '{}', queuedMessages:" - + "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'" , getPrefetch(), r, linkCredits, + + "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'", getPrefetch(), r, linkCredits, expectedTotalCredit, queuedMessages, creditsToAdd, messageQueue.size()); }