Skip to content

Commit

Permalink
spring-projectsGH-2653: Fix deadlock in the DirectMessageListenerCont…
Browse files Browse the repository at this point in the history
…ainer

Fixes: spring-projects#2653

When not enough channel in the cache, the `DirectMessageListenerContainer.consume()`
returns null and `adjustConsumers()` goes into an infinite loop, since already active
consumer does not release its channel.

* Fix `DirectMessageListenerContainer.consume()` to re-throw an `AmqpTimeoutException`
which is thrown when no available channels in the cache
* Catch `AmqpTimeoutException` in the `DirectReplyToMessageListenerContainer.getChannelHolder()`
and reset `this.consumerCount--` to allow to try existing consumer until it is available, e.g.
when this one receives a reply or times out.
  • Loading branch information
artembilan committed Mar 14, 2024
1 parent 22d22b9 commit 32c85ae
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,7 @@
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
Expand Down Expand Up @@ -802,6 +803,9 @@ private SimpleConsumer consume(String queue, int index, Connection connection) {
catch (AmqpApplicationContextClosedException e) {
throw new AmqpConnectException(e);
}
catch (AmqpTimeoutException timeoutException) {
throw timeoutException;
}
catch (Exception e) {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.MessageListener;
Expand Down Expand Up @@ -118,23 +119,24 @@ protected void doStart() {
@Override
protected void processMonitorTask() {
long now = System.currentTimeMillis();
long reduce;
this.consumersLock.lock();
try {
long reduce = this.consumers.stream()
.filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c)
&& this.whenUsed.get(c) < now - getIdleEventInterval())
.count();
if (reduce > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Reducing idle consumes by " + reduce);
}
this.consumerCount = (int) Math.max(0, this.consumerCount - reduce);
super.setConsumersPerQueue(this.consumerCount);
}
reduce = this.consumers.stream()
.filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c)
&& this.whenUsed.get(c) < now - getIdleEventInterval())
.count();
}
finally {
this.consumersLock.unlock();
}
if (reduce > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Reducing idle consumes by " + reduce);
}
this.consumerCount = (int) Math.max(0, this.consumerCount - reduce);
super.setConsumersPerQueue(this.consumerCount);
}
}

@Override
Expand All @@ -159,13 +161,13 @@ protected void consumerRemoved(SimpleConsumer consumer) {
* @return the channel holder.
*/
public ChannelHolder getChannelHolder() {
this.consumersLock.lock();
try {
ChannelHolder channelHolder = null;
while (channelHolder == null) {
if (!isRunning()) {
throw new IllegalStateException("Direct reply-to container is not running");
}
ChannelHolder channelHolder = null;
while (channelHolder == null) {
if (!isRunning()) {
throw new IllegalStateException("Direct reply-to container is not running");
}
this.consumersLock.lock();
try {
for (SimpleConsumer consumer : this.consumers) {
Channel candidate = consumer.getChannel();
if (candidate.isOpen() && this.inUseConsumerChannels.putIfAbsent(candidate, consumer) == null) {
Expand All @@ -175,16 +177,23 @@ public ChannelHolder getChannelHolder() {
break;
}
}
if (channelHolder == null) {
this.consumerCount++;
super.setConsumersPerQueue(this.consumerCount);
}
finally {
this.consumersLock.unlock();
}
if (channelHolder == null) {
try {
super.setConsumersPerQueue(++this.consumerCount);
}
catch (AmqpTimeoutException timeoutException) {
// Possibly No available channels in the cache, so come back to consumers
// iteration until existing is available
this.consumerCount--;
}
}
return channelHolder;
}
finally {
this.consumersLock.unlock();
}
return channelHolder;

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -422,6 +423,31 @@ void ctorCoverage() {
.isEqualTo("rq");
}

@Test
public void limitedChannelsAreReleasedOnTimeout() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setChannelCacheSize(1);
connectionFactory.setChannelCheckoutTimeout(500L);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.setReceiveTimeout(500L);
asyncRabbitTemplate.start();

RabbitConverterFuture<String> replyFuture1 = asyncRabbitTemplate.convertSendAndReceive("noReply1");
RabbitConverterFuture<String> replyFuture2 = asyncRabbitTemplate.convertSendAndReceive("noReply2");

assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(() -> replyFuture1.get(10, TimeUnit.SECONDS))
.withCauseInstanceOf(AmqpReplyTimeoutException.class);

assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(() -> replyFuture2.get(10, TimeUnit.SECONDS))
.withCauseInstanceOf(AmqpReplyTimeoutException.class);

asyncRabbitTemplate.stop();
connectionFactory.destroy();
}

private void checkConverterResult(CompletableFuture<String> future, String expected) throws InterruptedException {
final CountDownLatch cdl = new CountDownLatch(1);
final AtomicReference<String> resultRef = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
Expand Down Expand Up @@ -62,6 +63,11 @@ public class EnableRabbitBatchIntegrationTests {
@Autowired
private Listener listener;

@BeforeAll
static void setup() {
System.setProperty("spring.amqp.deserialization.trust.all", "true");
}

@Test
public void simpleList() throws InterruptedException {
this.template.convertAndSend("batch.1", new Foo("foo"));
Expand Down

0 comments on commit 32c85ae

Please sign in to comment.