Skip to content

Commit

Permalink
GH-2653: Fix deadlock in the DirectMessageListenerContainer
Browse files Browse the repository at this point in the history
Fixes: #2653

When not enough channel in the cache, the `DirectMessageListenerContainer.consume()`
returns null and `adjustConsumers()` goes into an infinite loop, since already active
consumer does not release its channel.

* Fix `DirectMessageListenerContainer.consume()` to re-throw an `AmqpTimeoutException`
which is thrown when no available channels in the cache
* Catch `AmqpTimeoutException` in the `DirectReplyToMessageListenerContainer.getChannelHolder()`
and reset `this.consumerCount--` to allow to try existing consumer until it is available, e.g.
when this one receives a reply or times out.

* Change `DirectReplyToMessageListenerContainer.consumerCount` to `AtomicInteger`
  • Loading branch information
artembilan authored Mar 18, 2024
1 parent 4c4b6b6 commit b19fa8c
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,7 @@
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
Expand Down Expand Up @@ -802,6 +803,9 @@ private SimpleConsumer consume(String queue, int index, Connection connection) {
catch (AmqpApplicationContextClosedException e) {
throw new AmqpConnectException(e);
}
catch (AmqpTimeoutException timeoutException) {
throw timeoutException;
}
catch (Exception e) {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,9 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.MessageListener;
Expand Down Expand Up @@ -47,7 +49,7 @@ public class DirectReplyToMessageListenerContainer extends DirectMessageListener

private final ConcurrentMap<SimpleConsumer, Long> whenUsed = new ConcurrentHashMap<>();

private int consumerCount;
private final AtomicInteger consumerCount = new AtomicInteger();

public DirectReplyToMessageListenerContainer(ConnectionFactory connectionFactory) {
super(connectionFactory);
Expand Down Expand Up @@ -109,7 +111,7 @@ public void setMessageListener(MessageListener messageListener) {
@Override
protected void doStart() {
if (!isRunning()) {
this.consumerCount = 0;
this.consumerCount.set(0);
super.setConsumersPerQueue(0);
super.doStart();
}
Expand All @@ -118,23 +120,24 @@ protected void doStart() {
@Override
protected void processMonitorTask() {
long now = System.currentTimeMillis();
long reduce;
this.consumersLock.lock();
try {
long reduce = this.consumers.stream()
.filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c)
&& this.whenUsed.get(c) < now - getIdleEventInterval())
.count();
if (reduce > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Reducing idle consumes by " + reduce);
}
this.consumerCount = (int) Math.max(0, this.consumerCount - reduce);
super.setConsumersPerQueue(this.consumerCount);
}
reduce = this.consumers.stream()
.filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c)
&& this.whenUsed.get(c) < now - getIdleEventInterval())
.count();
}
finally {
this.consumersLock.unlock();
}
if (reduce > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Reducing idle consumes by " + reduce);
}
super.setConsumersPerQueue(
this.consumerCount.updateAndGet((current) -> (int) Math.max(0, current - reduce)));
}
}

@Override
Expand All @@ -159,13 +162,13 @@ protected void consumerRemoved(SimpleConsumer consumer) {
* @return the channel holder.
*/
public ChannelHolder getChannelHolder() {
this.consumersLock.lock();
try {
ChannelHolder channelHolder = null;
while (channelHolder == null) {
if (!isRunning()) {
throw new IllegalStateException("Direct reply-to container is not running");
}
ChannelHolder channelHolder = null;
while (channelHolder == null) {
if (!isRunning()) {
throw new IllegalStateException("Direct reply-to container is not running");
}
this.consumersLock.lock();
try {
for (SimpleConsumer consumer : this.consumers) {
Channel candidate = consumer.getChannel();
if (candidate.isOpen() && this.inUseConsumerChannels.putIfAbsent(candidate, consumer) == null) {
Expand All @@ -175,16 +178,23 @@ public ChannelHolder getChannelHolder() {
break;
}
}
if (channelHolder == null) {
this.consumerCount++;
super.setConsumersPerQueue(this.consumerCount);
}
finally {
this.consumersLock.unlock();
}
if (channelHolder == null) {
try {
super.setConsumersPerQueue(this.consumerCount.incrementAndGet());
}
catch (AmqpTimeoutException timeoutException) {
// Possibly No available channels in the cache, so come back to consumers
// iteration until existing is available
this.consumerCount.decrementAndGet();
}
}
return channelHolder;
}
finally {
this.consumersLock.unlock();
}
return channelHolder;

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -111,7 +113,8 @@ public void testConvert1ArgDirect() throws Exception {
waitForZeroInUseConsumers();
assertThat(TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
Integer.class)).isEqualTo(2);
AtomicInteger.class).get())
.isEqualTo(2);
final String missingQueue = UUID.randomUUID().toString();
this.asyncDirectTemplate.convertSendAndReceive("", missingQueue, "foo"); // send to nowhere
this.asyncDirectTemplate.stop(); // should clear the inUse channel map
Expand Down Expand Up @@ -168,18 +171,20 @@ public void testMessage1ArgDirect() throws Exception {
waitForZeroInUseConsumers();
assertThat(TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
Integer.class)).isEqualTo(2);
AtomicInteger.class).get())
.isEqualTo(2);
this.asyncDirectTemplate.stop();
this.asyncDirectTemplate.start();
assertThat(TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
Integer.class)).isEqualTo(0);
AtomicInteger.class).get())
.isEqualTo(0);
}

private void waitForZeroInUseConsumers() throws InterruptedException {
private void waitForZeroInUseConsumers() {
Map<?, ?> inUseConsumers = TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.inUseConsumerChannels", Map.class);
await().until(() -> inUseConsumers.size() == 0);
await().until(inUseConsumers::isEmpty);
}

@Test
Expand Down Expand Up @@ -422,6 +427,31 @@ void ctorCoverage() {
.isEqualTo("rq");
}

@Test
public void limitedChannelsAreReleasedOnTimeout() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setChannelCacheSize(1);
connectionFactory.setChannelCheckoutTimeout(500L);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.setReceiveTimeout(500L);
asyncRabbitTemplate.start();

RabbitConverterFuture<String> replyFuture1 = asyncRabbitTemplate.convertSendAndReceive("noReply1");
RabbitConverterFuture<String> replyFuture2 = asyncRabbitTemplate.convertSendAndReceive("noReply2");

assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(() -> replyFuture1.get(10, TimeUnit.SECONDS))
.withCauseInstanceOf(AmqpReplyTimeoutException.class);

assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(() -> replyFuture2.get(10, TimeUnit.SECONDS))
.withCauseInstanceOf(AmqpReplyTimeoutException.class);

asyncRabbitTemplate.stop();
connectionFactory.destroy();
}

private void checkConverterResult(CompletableFuture<String> future, String expected) throws InterruptedException {
final CountDownLatch cdl = new CountDownLatch(1);
final AtomicReference<String> resultRef = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
Expand Down Expand Up @@ -62,6 +63,11 @@ public class EnableRabbitBatchIntegrationTests {
@Autowired
private Listener listener;

@BeforeAll
static void setup() {
System.setProperty("spring.amqp.deserialization.trust.all", "true");
}

@Test
public void simpleList() throws InterruptedException {
this.template.convertAndSend("batch.1", new Foo("foo"));
Expand Down

0 comments on commit b19fa8c

Please sign in to comment.