diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java index 7cdc31697a..b90c8c0f14 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import org.springframework.amqp.AmqpConnectException; import org.springframework.amqp.AmqpException; import org.springframework.amqp.AmqpIOException; +import org.springframework.amqp.AmqpTimeoutException; import org.springframework.amqp.ImmediateAcknowledgeAmqpException; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Message; @@ -802,6 +803,9 @@ private SimpleConsumer consume(String queue, int index, Connection connection) { catch (AmqpApplicationContextClosedException e) { throw new AmqpConnectException(e); } + catch (AmqpTimeoutException timeoutException) { + throw timeoutException; + } catch (Exception e) { RabbitUtils.closeChannel(channel); RabbitUtils.closeConnection(connection); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectReplyToMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectReplyToMessageListenerContainer.java index b639e11e33..96762ab96d 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectReplyToMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectReplyToMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.springframework.amqp.AmqpTimeoutException; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Address; import org.springframework.amqp.core.MessageListener; @@ -118,23 +119,24 @@ protected void doStart() { @Override protected void processMonitorTask() { long now = System.currentTimeMillis(); + long reduce; this.consumersLock.lock(); try { - long reduce = this.consumers.stream() - .filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c) - && this.whenUsed.get(c) < now - getIdleEventInterval()) - .count(); - if (reduce > 0) { - if (logger.isDebugEnabled()) { - logger.debug("Reducing idle consumes by " + reduce); - } - this.consumerCount = (int) Math.max(0, this.consumerCount - reduce); - super.setConsumersPerQueue(this.consumerCount); - } + reduce = this.consumers.stream() + .filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c) + && this.whenUsed.get(c) < now - getIdleEventInterval()) + .count(); } finally { this.consumersLock.unlock(); } + if (reduce > 0) { + if (logger.isDebugEnabled()) { + logger.debug("Reducing idle consumes by " + reduce); + } + this.consumerCount = (int) Math.max(0, this.consumerCount - reduce); + super.setConsumersPerQueue(this.consumerCount); + } } @Override @@ -159,13 +161,13 @@ protected void consumerRemoved(SimpleConsumer consumer) { * @return the channel holder. */ public ChannelHolder getChannelHolder() { - this.consumersLock.lock(); - try { - ChannelHolder channelHolder = null; - while (channelHolder == null) { - if (!isRunning()) { - throw new IllegalStateException("Direct reply-to container is not running"); - } + ChannelHolder channelHolder = null; + while (channelHolder == null) { + if (!isRunning()) { + throw new IllegalStateException("Direct reply-to container is not running"); + } + this.consumersLock.lock(); + try { for (SimpleConsumer consumer : this.consumers) { Channel candidate = consumer.getChannel(); if (candidate.isOpen() && this.inUseConsumerChannels.putIfAbsent(candidate, consumer) == null) { @@ -175,16 +177,23 @@ public ChannelHolder getChannelHolder() { break; } } - if (channelHolder == null) { - this.consumerCount++; - super.setConsumersPerQueue(this.consumerCount); + } + finally { + this.consumersLock.unlock(); + } + if (channelHolder == null) { + try { + super.setConsumersPerQueue(++this.consumerCount); + } + catch (AmqpTimeoutException timeoutException) { + // Possibly No available channels in the cache, so come back to consumers + // iteration until existing is available + this.consumerCount--; } } - return channelHolder; - } - finally { - this.consumersLock.unlock(); } + return channelHolder; + } /** diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java index eccba25c8f..009074c664 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java @@ -17,6 +17,7 @@ package org.springframework.amqp.rabbit; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; @@ -422,6 +423,31 @@ void ctorCoverage() { .isEqualTo("rq"); } + @Test + public void limitedChannelsAreReleasedOnTimeout() { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); + connectionFactory.setChannelCacheSize(1); + connectionFactory.setChannelCheckoutTimeout(500L); + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate); + asyncRabbitTemplate.setReceiveTimeout(500L); + asyncRabbitTemplate.start(); + + RabbitConverterFuture replyFuture1 = asyncRabbitTemplate.convertSendAndReceive("noReply1"); + RabbitConverterFuture replyFuture2 = asyncRabbitTemplate.convertSendAndReceive("noReply2"); + + assertThatExceptionOfType(ExecutionException.class) + .isThrownBy(() -> replyFuture1.get(10, TimeUnit.SECONDS)) + .withCauseInstanceOf(AmqpReplyTimeoutException.class); + + assertThatExceptionOfType(ExecutionException.class) + .isThrownBy(() -> replyFuture2.get(10, TimeUnit.SECONDS)) + .withCauseInstanceOf(AmqpReplyTimeoutException.class); + + asyncRabbitTemplate.stop(); + connectionFactory.destroy(); + } + private void checkConverterResult(CompletableFuture future, String expected) throws InterruptedException { final CountDownLatch cdl = new CountDownLatch(1); final AtomicReference resultRef = new AtomicReference<>(); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java index 4bcb6f92bc..8121e395bb 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitBatchIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy; @@ -62,6 +63,11 @@ public class EnableRabbitBatchIntegrationTests { @Autowired private Listener listener; + @BeforeAll + static void setup() { + System.setProperty("spring.amqp.deserialization.trust.all", "true"); + } + @Test public void simpleList() throws InterruptedException { this.template.convertAndSend("batch.1", new Foo("foo"));