Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Service Bus high prefetch count getting stuck and locking receiver processor #31356

Closed
3 tasks done
epomatti opened this issue Oct 9, 2022 · 4 comments
Closed
3 tasks done
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@epomatti
Copy link

epomatti commented Oct 9, 2022

Describe the bug

My Service Bus receiver is getting stuck when I'm using prefetch count and max concurrent calls. Settings are like this:

  • Prefetch count: 100
  • Max concurrent calls: 100

When I add 10,000 messages to the queue the Receiver Processor gets stuck, to the point where no more messages are consumed and remain in the queue.

Restarting the application while keeping the configuration does NOT solve the issue. Only when reducing the prefetch count significantly then messages started to be consumed again.

UPDATE: The error also happens with different combinations. For example: 20 prefetch and 500 max concurrent, but it goes a little longer before failing.

UPDATED 2: Isolated the code to simulate the issue in this repo:

Exception or Stack Trace
I've uploaded the full stack trace here in this gist, and will post excerpts below.

Several information message for "adding credits" start to appear:

2022-10-09 12:07:09.845  INFO 9048 --- [undedElastic-22] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":100,"requested":2,"linkCredits":155,"expectedTotalCredit":100,"queuedMessages":0,"creditsToAdd":0,"messageQueueSize":0}
2022-10-09 12:07:10.131  INFO 9048 --- [undedElastic-22] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":100,"requested":2,"linkCredits":95,"expectedTotalCredit":100,"queuedMessages":1,"creditsToAdd":4,"messageQueueSize":0}
2022-10-09 12:07:10.131  INFO 9048 --- [undedElastic-22] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":100,"requested":2,"linkCredits":95,"expectedTotalCredit":100,"queuedMessages":0,"creditsToAdd":5,"messageQueueSize":0}

Then messages for lock renewal:

2022-10-09 12:08:03.734  INFO 9048 --- [     parallel-6] c.a.m.servicebus.LockRenewalOperation    : {"az.sdk.message":"Starting lock renewal.","isSession":false,"lockToken":"da4b395a-e623-4849-b2fb-4515a90c6f81"}
2022-10-09 12:08:03.738  INFO 9048 --- [     parallel-4] .a.m.s.i.ServiceBusReactorAmqpConnection : {"az.sdk.message":"Creating management node.","linkName":"orders-mgmt","entityPath":"orders","address":"orders/$management"}
2022-10-09 12:08:03.744  INFO 9048 --- [     parallel-6] c.a.m.servicebus.LockRenewalOperation    : {"az.sdk.message":"Starting lock renewal.","isSession":false,"lockToken":"152103d8-5f80-4653-91a7-bfaa5db24a2e"}

A new response channel is started:

2022-10-09 12:08:03.890  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.handler.SessionHandler         : {"az.sdk.message":"onSessionRemoteOpen","connectionId":"MF_4355b1_1665328003966","sessionName":"orders-mgmt-session","sessionIncCapacity":0,"sessionOutgoingWindow":2147483647}
2022-10-09 12:08:03.891  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.ReactorConnection              : {"az.sdk.message":"Emitting new response channel.","connectionId":"MF_4355b1_1665328003966","entityPath":"orders/$management","linkName":"orders-mgmt"}
2022-10-09 12:08:03.891  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Setting next AMQP channel.","connectionId":"MF_4355b1_1665328003966","entityPath":"orders/$management"}
2022-10-09 12:08:03.892  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.ActiveClientTokenManager       : {"az.sdk.message":"Scheduling refresh token task.","scopes":"amqp://bus-serverless-bookstore-dev.servicebus.windows.net/orders"}
2022-10-09 12:08:04.045  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.handler.SendLinkHandler        : {"az.sdk.message":"onLinkRemoteOpen","connectionId":"MF_4355b1_1665328003966","linkName":"orders-mgmt:sender","entityPath":"orders/$management","remoteTarget":"Target{address='orders/$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}"}
2022-10-09 12:08:04.045  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Channel is now active.","connectionId":"MF_4355b1_1665328003966","entityPath":"orders/$management"}
2022-10-09 12:08:04.048  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.handler.ReceiveLinkHandler     : {"az.sdk.message":"onLinkRemoteOpen","connectionId":"MF_4355b1_1665328003966","entityPath":"orders/$management","linkName":"orders-mgmt:receiver","remoteSource":"Source{address='orders/$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}"}
2022-10-09 12:08:04.362  INFO 9048 --- [ctor-executor-1] c.a.m.servicebus.LockRenewalOperation    : {"az.sdk.message":"Starting lock renewal.","isSession":false,"lockToken":"781c6df1-7f3f-4598-ad5b-1e1189eae7f7","nextExpiration":"2022-10-09T15:09:04.375Z","next":"PT1M0.012817073S"}

Warning and errors start to appear:

2022-10-09 12:08:04.530  WARN 9048 --- [ctor-executor-1] c.a.m.s.i.ManagementChannel              : {"az.sdk.message":"Operation not successful.","entityPath":"orders","status":"GONE","description":"The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:7a445cfd-ce57-4b79-92f1-d61ea30d0add, TrackingId:12c7e983-be26-41f5-9715-14f6cf542ed6_B29, SystemTracker:bus-serverless-bookstore-dev:Queue:orders, Timestamp:2022-10-09T15:08:04","condition":"com.microsoft:message-lock-lost"}
2022-10-09 12:08:04.533 ERROR 9048 --- [ctor-executor-1] c.a.m.servicebus.LockRenewalOperation    : {"az.sdk.message":"Error occurred while renewing lock token.","exception":"The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. For more information please see https://aka.ms/ServiceBusExceptions . Reference:7a445cfd-ce57-4b79-92f1-d61ea30d0add, TrackingId:12c7e983-be26-41f5-9715-14f6cf542ed6_B29, SystemTracker:bus-serverless-bookstore-dev:Queue:orders, Timestamp:2022-10-09T15:08:04, errorContext[NAMESPACE: bus-serverless-bookstore-dev.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: orders/$management, REFERENCE_ID: orders-mgmt:receiver, LINK_CREDIT: 48]","isSession":false,"lockToken":"ff9480c4-2c92-4079-81f3-dbcbecc5299d"}

Then an "Update disposition request timed out" error:

2022-10-09 12:08:45.802 ERROR 9048 --- [     parallel-6] c.a.m.s.i.ServiceBusReactorReceiver      : {"az.sdk.message":"Update disposition request timed out., errorContext[NAMESPACE: bus-serverless-bookstore-dev.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: orders, REFERENCE_ID: orders_58b3b4_1665328004011, LINK_CREDIT: 84]","exception":"Update disposition request timed out., errorContext[NAMESPACE: bus-serverless-bookstore-dev.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: orders, REFERENCE_ID: orders_58b3b4_1665328004011, LINK_CREDIT: 84]","entityPath":"orders","linkName":"orders_58b3b4_1665328004011"}

Information log when remote link is closed:

2022-10-09 12:08:45.959  INFO 9048 --- [ctor-executor-1] c.a.c.a.i.handler.ReceiveLinkHandler     : {"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_4355b1_1665328003966","errorCondition":null,"errorDescription":null,"linkName":"orders_58b3b4_1665328004011","entityPath":"orders"}
2022-10-09 12:08:45.963  INFO 9048 --- [ctor-executor-1] c.a.c.a.implementation.ReactorSession    : {"az.sdk.message":"Complete. Removing receive link.","connectionId":"MF_4355b1_1665328003966","linkName":"orders_58b3b4_1665328004011","entityPath":"orders"}
2022-10-09 12:08:45.963  INFO 9048 --- [ctor-executor-1] c.a.m.s.i.ServiceBusReceiveLinkProcessor : Receive link endpoint states are closed. Requesting another.
2022-10-09 12:08:45.963  INFO 9048 --- [ctor-executor-1] c.a.m.s.i.ServiceBusReceiveLinkProcessor : Requesting a new AmqpReceiveLink from upstream.
2022-10-09 12:08:45.963  INFO 9048 --- [ctor-executor-1] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Setting next AMQP receive link.","linkName":"orders_58b3b4_1665328004011","entityPath":"orders"}
2022-10-09 12:08:45.964  INFO 9048 --- [ctor-executor-1] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"Adding credits.","prefetch":100,"requested":1,"linkCredits":0,"expectedTotalCredit":100,"queuedMessages":0,"creditsToAdd":100,"messageQueueSize":0}
2022-10-09 12:08:45.964  INFO 9048 --- [undedElastic-34] c.a.m.s.i.ServiceBusReceiveLinkProcessor : Receive link endpoint states are closed. Requesting another.

Finally, the error "Cannot perform operations on a disposed receiver." and "Cannot update disposition with no link":

2022-10-09 12:08:45.965 ERROR 9048 --- [undedElastic-72] c.a.m.s.i.ServiceBusReactorReceiver      : {"az.sdk.message":"Cannot perform operations on a disposed receiver.","exception":"Cannot perform operations on a disposed receiver.","entityPath":"orders","linkName":"orders_58b3b4_1665328004011"}

2022-10-09 12:08:45.968 ERROR 9048 --- [undedElastic-39] c.a.m.s.i.ServiceBusReceiveLinkProcessor : {"az.sdk.message":"","exception":"lockToken[6d05f6c3-4eba-4dad-8614-8ff5aad87a1a]. state[Modified{deliveryFailed=null, undeliverableHere=null, messageAnnotations=null}]. Cannot update disposition with no link.","lockToken":"6d05f6c3-4eba-4dad-8614-8ff5aad87a1a","deliveryState":"Modified{deliveryFailed=null, undeliverableHere=null, messageAnnotations=null}"}

To Reproduce
Steps to reproduce the behavior:

  1. Create a Service Bus namespace with Standard SKU.
  2. Create a queue - both enabled and disabled partitioning cause the problem.
  3. Implement a Java client as detailed in the code snippets.
  4. Set the consumer to have 100 prefetch and 100 max concurrent.
  5. Start the application.
  6. Add 10,000 messages to the queue.
  7. The client will get stuck at some point and to be able to continue consuming messages.

Code Snippet

Application configuration:

# Service Bus
azure.servicebus.connectionstring: ${AZURE_SERVICEBUS_CONNECTION_STRING}
azure.servicebus.prefetchCount: 100
azure.servicebus.maxConcurrentCalls: 100

Service Bus consumer:

import java.time.LocalDateTime;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;

@Service
public class DeliveryConsumer {

  Logger logger = LoggerFactory.getLogger(DeliveryConsumer.class);

  @Value("${azure.servicebus.connectionstring}")
  private String connectionString;

  @Value("${azure.servicebus.prefetchCount}")
  private Integer prefetchCount;

  @Value("${azure.servicebus.maxConcurrentCalls}")
  private Integer maxConcurrentCalls;

  private final static String ORDERS_QUEUE = "orders";

  public void start() {
    Consumer<ServiceBusReceivedMessageContext> processMessage = messageContext -> {
      try {
        var payload = messageContext.getMessage().getBody().toString();
        messageContext.complete();
      } catch (Exception ex) {
        messageContext.abandon();
      }
    };

    Consumer<ServiceBusErrorContext> processError = errorContext -> {
      System.err.println("Error occurred while receiving message: " + errorContext.getException());
    };

    ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
        .connectionString(connectionString)
        .processor()
        .maxConcurrentCalls(maxConcurrentCalls)
        .prefetchCount(prefetchCount)
        .queueName(ORDERS_QUEUE)
        .processMessage(processMessage)
        .processError(processError)
        .disableAutoComplete()
        .buildProcessorClient();

    processorClient.start();
  }

}

Screenshots

Inserting 10,000 messages:

image

Messages stuck. Not even restarting the client solves the issue.

image

Setup (please complete the following information):

  • OS: Linux
  • IDE: VSCode
  • Library/Libraries: com.azure:azure-messaging-servicebus:7.11.0
  • Java version: openjdk 17.0.4 2022-07-19
  • App Server/Environment: Spring Boot either running locally or on Container Apps
  • Frameworks: Spring Boot

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Oct 9, 2022
@epomatti epomatti changed the title [BUG] Service Bus high prefetch count getting stuck and locking the consumer [BUG] Service Bus high prefetch count getting stuck and locking receiver processor Oct 9, 2022
@joshfree joshfree added Service Bus Client This issue points to a problem in the data-plane of the library. labels Oct 10, 2022
@ghost ghost removed the needs-triage This is a new issue that needs to be triaged to the appropriate team. label Oct 10, 2022
@joshfree
Copy link
Member

@liukun-msft could you please take a look at this issue?

@liukun-msft
Copy link
Contributor

liukun-msft commented Oct 11, 2022

Hi @epomatti

Thanks for providing the detailed information.

This is a known issue #30483. The application got stuck because the default thread pool size (8 * cpu cores) is smaller than the maxConcurrentCalls.

Can you try to set vm option "-Dreactor.schedulers.defaultBoundedElasticSize=x" and x is greater than maxConcurrentCalls? like add vm option "-Dreactor.schedulers.defaultBoundedElasticSize=200" for your application when prefetch = 100 and maxConcurrentCalls = 100. But this solution works when prefetch count is small. We are currently working on a fix from the code side.

@epomatti
Copy link
Author

epomatti commented Oct 11, 2022

Thanks @liukun-msft that seem to have worked. I'll follow the main thread for updates. I'll try higher values and span my nodes horizontally if I need higher throughput.

Probably related, when closing the processor client there are still errors:

Caused by: java.lang.IllegalStateException: Cannot perform operations on a disposed receiver.
        at com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver.updateDisposition(ServiceBusReactorReceiver.java:143)
        at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.updateDisposition(ServiceBusReceiveLinkProcessor.java:131)
        at com.azure.messaging.servicebus.ServiceBusAsyncConsumer.updateDisposition(ServiceBusAsyncConsumer.java:69)
        at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.updateDisposition(ServiceBusReceiverAsyncClient.java:1417)
        at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.abandon(ServiceBusReceiverAsyncClient.java:363)
        ... 13 more
com.azure.messaging.servicebus.ServiceBusException: Cannot perform operations on a disposed receiver.
        at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
        at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:333)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.azure.messaging.servicebus.ServiceBusException: Cannot perform operations on a disposed receiver.
        at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$52(ServiceBusReceiverAsyncClient.java:1442)

@epomatti
Copy link
Author

epomatti commented Oct 11, 2022

For anyone using Spring Boot, the property needs to be registered when using Maven during development:

<plugin>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-maven-plugin</artifactId>
	<configuration>
		<systemPropertyVariables>
			<reactor.schedulers.defaultBoundedElasticSize>${reactor.schedulers.defaultBoundedElasticSize}</reactor.schedulers.defaultBoundedElasticSize>
		</systemPropertyVariables>
	</configuration>
</plugin>

I found it easier to just pass it like this and avoid changing the pom.xml code:

mvn spring-boot:run -Dspring-boot.run.profiles=dev -Dspring-boot.run.jvmArguments="-Dreactor.schedulers.defaultBoundedElasticSize=200"

@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

4 participants
@joshfree @epomatti @liukun-msft and others