Skip to content

Commit

Permalink
Fix eviction logic in the PooledChannelCF
Browse files Browse the repository at this point in the history
The method `destroyObject()` is called by the eviction process of Apache Pool2, 
but it tries to do a logical close that puts the channel back on the pool causing a 
`java.lang.IllegalStateException: Returned object not currently part of this pool and object lost (abandoned).`

* Extract the `targetChannel` from the proxy and call a `physicalClose()` in the `destroyObject()` for a proper pool eviction

**Cherry-pick to `2.4.x` & `2.3.x`**
  • Loading branch information
LeonardoFerreiraa authored Mar 15, 2022
1 parent 909ba57 commit a1aba73
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.amqp.rabbit.connection;

import org.springframework.aop.RawTargetAccess;

import com.rabbitmq.client.Channel;

/**
Expand All @@ -24,9 +26,10 @@
*
* @author Mark Pollack
* @author Gary Russell
* @author Leonardo Ferreira
* @see CachingConnectionFactory
*/
public interface ChannelProxy extends Channel {
public interface ChannelProxy extends Channel, RawTargetAccess {

/**
* Return the target Channel of this proxy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,12 @@ public PooledObject<Channel> makeObject() {

@Override
public void destroyObject(PooledObject<Channel> p) throws Exception {
p.getObject().close();
Channel channel = p.getObject();
if (channel instanceof ChannelProxy) {
channel = ((ChannelProxy) channel).getTargetChannel();
}

ConnectionWrapper.this.physicalClose(channel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Queue;
Expand Down Expand Up @@ -199,6 +203,70 @@ private void createAndCloseConnectionChannelTxAndChannelNonTx(
connection.close();
}

@Test
public void evictShouldCloseAllUnneededChannelsWithoutErrors() throws Exception {
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(new ConnectionFactory());
AtomicReference<GenericObjectPool<Channel>> channelsReference = new AtomicReference<>();
AtomicReference<GenericObjectPool<Channel>> txChannelsReference = new AtomicReference<>();
AtomicInteger swallowedExceptionsCount = new AtomicInteger();
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
channelsReference.set(pool);
}
else {
txChannelsReference.set(pool);
}

pool.setEvictionPolicy((ec, u, idleCount) -> idleCount > ec.getMinIdle());
pool.setSwallowedExceptionListener(ex -> swallowedExceptionsCount.incrementAndGet());
pool.setNumTestsPerEvictionRun(5);

pool.setMinIdle(1);
pool.setMaxIdle(5);
});

createAndCloseFiveChannelTxAndChannelNonTx(pcf);

final GenericObjectPool<Channel> channels = channelsReference.get();
channels.evict();

assertThat(channels.getNumIdle())
.isEqualTo(1);
assertThat(channels.getDestroyedByEvictorCount())
.isEqualTo(4);

final GenericObjectPool<Channel> txChannels = txChannelsReference.get();
txChannels.evict();
assertThat(txChannels.getNumIdle())
.isEqualTo(1);
assertThat(txChannels.getDestroyedByEvictorCount())
.isEqualTo(4);

assertThat(swallowedExceptionsCount.get())
.isZero();
}

private void createAndCloseFiveChannelTxAndChannelNonTx(
org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
int channelAmount = 5;
Connection connection = connectionFactory.createConnection();

List<Channel> channels = new ArrayList<>(channelAmount);
List<Channel> txChannels = new ArrayList<>(channelAmount);

for (int i = 0; i < channelAmount; i++) {
channels.add(connection.createChannel(false));
txChannels.add(connection.createChannel(true));
}

for (int i = 0; i < channelAmount; i++) {
RabbitUtils.closeChannel(channels.get(i));
RabbitUtils.closeChannel(txChannels.get(i));
}

connection.close();
}

@Configuration
public static class Config {

Expand Down

0 comments on commit a1aba73

Please sign in to comment.