Skip to content

Commit

Permalink
GH-999: Release permit when channel is closed
Browse files Browse the repository at this point in the history
Resolves #999

The previous commit fixes permit releases when the close is deferred
waiting for confirms.

For logical closes, the `releasePermitIfNecessary()` was moved to
`returnToCache`. However, if `logicalClose()` detects the underlying
channel is already closed, it is not returned to the cache, it is
discarded.

In this case, `logicalClose()` must release the permit.

Add a test case to expose the issue and verify it's corrected.

**cherry-pick to 2.1.x**
  • Loading branch information
garyrussell authored and artembilan committed May 17, 2019
1 parent fb278e5 commit 6d04c21
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,9 @@ private void logicalClose(ChannelProxy proxy) throws IOException, TimeoutExcepti
if (this.channelList.contains(proxy)) {
this.channelList.remove(proxy);
}
else {
releasePermitIfNecessary(proxy);
}
this.target = null;
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.Assert.fail;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
Expand Down Expand Up @@ -677,6 +678,39 @@ private void testCheckoutLimitWithPublisherConfirms(boolean physicalClose) throw
exec.shutdownNow();
}

@Test
public void testCheckoutLimitWithPublisherConfirmsLogicalAlreadyCloses() throws IOException, Exception {
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
Channel mockChannel = mock(Channel.class);

when(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).thenReturn(mockConnection);
when(mockConnection.createChannel()).thenReturn(mockChannel);
when(mockConnection.isOpen()).thenReturn(true);

AtomicBoolean open = new AtomicBoolean(true);
doAnswer(invoc -> {
return open.get();
}).when(mockChannel).isOpen();
when(mockChannel.getNextPublishSeqNo()).thenReturn(1L);
doAnswer(invoc -> {
open.set(false); // so the logical close detects a closed delegate
return null;
}).when(mockChannel).basicPublish(any(), any(), anyBoolean(), any(), any());

CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
ccf.setExecutor(mock(ExecutorService.class));
ccf.setChannelCacheSize(1);
ccf.setChannelCheckoutTimeout(1);
ccf.setPublisherConfirms(true);

RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf);
rabbitTemplate.convertAndSend("foo", "bar");
open.set(true);
rabbitTemplate.convertAndSend("foo", "bar");
verify(mockChannel, times(2)).basicPublish(any(), any(), anyBoolean(), any(), any());
}

@Test
public void testReleaseWithForcedPhysicalClose() throws Exception {
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
Expand Down

0 comments on commit 6d04c21

Please sign in to comment.