From 6d04c21f447e1a0944aeb6320249eee0a4f6702e Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 17 May 2019 11:00:01 -0400 Subject: [PATCH] GH-999: Release permit when channel is closed Resolves https://github.com/spring-projects/spring-amqp/issues/999 The previous commit fixes permit releases when the close is deferred waiting for confirms. For logical closes, the `releasePermitIfNecessary()` was moved to `returnToCache`. However, if `logicalClose()` detects the underlying channel is already closed, it is not returned to the cache, it is discarded. In this case, `logicalClose()` must release the permit. Add a test case to expose the issue and verify it's corrected. **cherry-pick to 2.1.x** --- .../connection/CachingConnectionFactory.java | 3 ++ .../CachingConnectionFactoryTests.java | 34 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java index 3efadedf24..1d6492c5d1 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java @@ -1211,6 +1211,9 @@ private void logicalClose(ChannelProxy proxy) throws IOException, TimeoutExcepti if (this.channelList.contains(proxy)) { this.channelList.remove(proxy); } + else { + releasePermitIfNecessary(proxy); + } this.target = null; return; } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java index e0cc6d7eb6..b51dce63a0 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java @@ -28,6 +28,7 @@ import static org.junit.Assert.fail; import static org.mockito.AdditionalMatchers.aryEq; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -677,6 +678,39 @@ private void testCheckoutLimitWithPublisherConfirms(boolean physicalClose) throw exec.shutdownNow(); } + @Test + public void testCheckoutLimitWithPublisherConfirmsLogicalAlreadyCloses() throws IOException, Exception { + com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class); + com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class); + Channel mockChannel = mock(Channel.class); + + when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection); + when(mockConnection.createChannel()).thenReturn(mockChannel); + when(mockConnection.isOpen()).thenReturn(true); + + AtomicBoolean open = new AtomicBoolean(true); + doAnswer(invoc -> { + return open.get(); + }).when(mockChannel).isOpen(); + when(mockChannel.getNextPublishSeqNo()).thenReturn(1L); + doAnswer(invoc -> { + open.set(false); // so the logical close detects a closed delegate + return null; + }).when(mockChannel).basicPublish(any(), any(), anyBoolean(), any(), any()); + + CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory); + ccf.setExecutor(mock(ExecutorService.class)); + ccf.setChannelCacheSize(1); + ccf.setChannelCheckoutTimeout(1); + ccf.setPublisherConfirms(true); + + RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf); + rabbitTemplate.convertAndSend("foo", "bar"); + open.set(true); + rabbitTemplate.convertAndSend("foo", "bar"); + verify(mockChannel, times(2)).basicPublish(any(), any(), anyBoolean(), any(), any()); + } + @Test public void testReleaseWithForcedPhysicalClose() throws Exception { com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);