Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock when reaching channel limit in DirectMessageListenerContainer #2653

Closed
utikeev opened this issue Mar 13, 2024 · 5 comments · Fixed by #2655
Closed

Deadlock when reaching channel limit in DirectMessageListenerContainer #2653

utikeev opened this issue Mar 13, 2024 · 5 comments · Fixed by #2655

Comments

@utikeev
Copy link

utikeev commented Mar 13, 2024

In what version(s) of Spring AMQP are you seeing this issue?

2.4.11 (Reproduction example is present for 3.1.2)

Describe the bug

We use convertSendAndReceive for asynchronous RPC in our project. Invoking such a method requires getting a new consumer (and thus requiring a channel to communicate with the broker).
In case when the load is high and all the channels are occupied, the consumers are tried to be created under consumersLock in the while loop, always failing to create a new one (DirectMessageListenerContainer.adjustConsumers). With the lock being acquired by the adjustConsumers, the timed out consumers can't be cancelled in DirectMessageListenerContainer.checkConsumers thus never releasing any channels.

To Reproduce

Run the example RabbitmqDeadlockApplication and observe that TaskSubmitter never has its futures completed. main thread is blocking asyncRabbitTemplate-1 thread that tries to check the consumers and probably cancel some of them.

Expected behavior

Timed out requests release underlying channels leading to eventually completing all futures.

Sample

https://github.com/utikeev/spring-amqp-directmessagelistenercontainer-deadlock
The sample has a limit for channels set to 2 to make it easier to reproduce (with just 5 convertSendAndReceive). In our case, we hit the broker limit of 2047 channels per connection.
We also set the MessageListener programmatically, but that shouldn't be the issue as far as I understand.


The problem might be about the wrong usage of Spring AMQP API. In that case, if you could recommend a better way to handle that use-case, I'd consider that a solution as well.

@artembilan
Copy link
Member

Thank you for the report!
Modified your application to this simple unit test:

	@Test
	public void limitedChannelsAreReleasedOnTimeout() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
		connectionFactory.setChannelCacheSize(1);
		connectionFactory.setChannelCheckoutTimeout(500L);
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		rabbitTemplate.setReceiveTimeout(100L);
		AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
		asyncRabbitTemplate.start();

		RabbitConverterFuture<String> replyFuture1 = asyncRabbitTemplate.convertSendAndReceive("noReply1");
		RabbitConverterFuture<String> replyFuture2 = asyncRabbitTemplate.convertSendAndReceive("noReply2");

		assertThatExceptionOfType(TimeoutException.class)
				.isThrownBy(() -> CompletableFuture.allOf(replyFuture1, replyFuture2).get(2, TimeUnit.SECONDS));

		asyncRabbitTemplate.stop();
		connectionFactory.destroy();
	}

And it indeed never ends:

2024-03-14 10:46:50,711  WARN org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer [main] : basicConsume failed, scheduling consumer for queue amq.rabbitmq.reply-to for restart
org.springframework.amqp.AmqpTimeoutException: No available channels
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.obtainPermits(CachingConnectionFactory.java:556) ~[classes/:?]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:522) ~[classes/:?]
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1503) ~[classes/:?]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.consume(DirectMessageListenerContainer.java:793) ~[classes/:?]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.doConsumeFromQueue(DirectMessageListenerContainer.java:759) ~[classes/:?]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.adjustConsumers(DirectMessageListenerContainer.java:358) ~[classes/:?]
	at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.setConsumersPerQueue(DirectMessageListenerContainer.java:176) ~[classes/:?]
	at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.getChannelHolder(DirectReplyToMessageListenerContainer.java:180) ~[classes/:?]
	at org.springframework.amqp.rabbit.AsyncRabbitTemplate.convertSendAndReceive(AsyncRabbitTemplate.java:498) ~[classes/:?]
	at org.springframework.amqp.rabbit.AsyncRabbitTemplate.convertSendAndReceive(AsyncRabbitTemplate.java:437) ~[classes/:?]
	at org.springframework.amqp.rabbit.AsyncRabbitTemplate.convertSendAndReceive(AsyncRabbitTemplate.java:408) ~[classes/:?]
	at org.springframework.amqp.rabbit.AsyncRabbitTemplateTests.limitedChannelsAreReleasedOnTimeout(AsyncRabbitTemplateTests.java:438) ~[classes/:?]

As you pointed correctly we never exit from the adjustConsumers() loop because the downstream consume() swallows exceptions and returns back to the loop.
So, that scheduled checkConsumers() is never able to release idled channel because it cannot this.consumersLock.lock();.

I'll try to play with a ReadWriteLock instead for more fine-grained concurrency control.

Any other ideas?

@utikeev
Copy link
Author

utikeev commented Mar 14, 2024

Yeah, I think a fair-locking is a way to go, allowing other operations with consumers (either it be normal dispose or cancellation) to change the state.

@utikeev
Copy link
Author

utikeev commented Mar 14, 2024

Will it also be possible to backport the fix to 2.4.x version? Or are they no longer maintained with non-security fixes? 3.x requires Spring update which we can do, but it'd be nice if this bug wasn't the main call to action :)

@artembilan
Copy link
Member

That version is out of Open Source support: https://spring.io/projects/spring-amqp#support.
So, it really might be better for your project to upgrade to the latest Spring version and so on.

artembilan added a commit to artembilan/spring-amqp that referenced this issue Mar 14, 2024
…ainer

Fixes: spring-projects#2653

When not enough channel in the cache, the `DirectMessageListenerContainer.consume()`
returns null and `adjustConsumers()` goes into an infinite loop, since already active
consumer does not release its channel.

* Fix `DirectMessageListenerContainer.consume()` to re-throw an `AmqpTimeoutException`
which is thrown when no available channels in the cache
* Catch `AmqpTimeoutException` in the `DirectReplyToMessageListenerContainer.getChannelHolder()`
and reset `this.consumerCount--` to allow to try existing consumer until it is available, e.g.
when this one receives a reply or times out.
@artembilan
Copy link
Member

Hi @utikeev !

Please, take a look into PR I propose: #2655.

Thanks

artembilan added a commit that referenced this issue Mar 18, 2024
Fixes: #2653

When not enough channel in the cache, the `DirectMessageListenerContainer.consume()`
returns null and `adjustConsumers()` goes into an infinite loop, since already active
consumer does not release its channel.

* Fix `DirectMessageListenerContainer.consume()` to re-throw an `AmqpTimeoutException`
which is thrown when no available channels in the cache
* Catch `AmqpTimeoutException` in the `DirectReplyToMessageListenerContainer.getChannelHolder()`
and reset `this.consumerCount--` to allow to try existing consumer until it is available, e.g.
when this one receives a reply or times out.

* Change `DirectReplyToMessageListenerContainer.consumerCount` to `AtomicInteger`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants