Skip to content

Commit

Permalink
Fix Sonar issues and other refactoring
Browse files Browse the repository at this point in the history
- Allow the publishing connection factory to be explicitly set and be
  any type
- Do not propagate properties when the factory is explicitly configured

* Reset flag when this is a publisher CF.
  • Loading branch information
garyrussell authored Nov 30, 2020
1 parent 39c9c44 commit 99ccb1b
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public enum AddressShuffleMode {

private final AtomicInteger defaultConnectionNameStrategyCounter = new AtomicInteger();

private ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();

private AbstractConnectionFactory publisherConnectionFactory;

private RecoveryListener recoveryListener = new RecoveryListener() {
Expand Down Expand Up @@ -156,8 +158,6 @@ public void handleRecovery(Recoverable recoverable) {

private volatile boolean contextStopped;

protected ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger();

/**
* Create a new AbstractConnectionFactory for the given target ConnectionFactory,
* with no publisher connection factory.
Expand All @@ -168,8 +168,21 @@ public AbstractConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitCon
this.rabbitConnectionFactory = rabbitConnectionFactory;
}

protected final void setPublisherConnectionFactory(
AbstractConnectionFactory publisherConnectionFactory) {
/**
* Set a custom publisher connection factory; the type does not need to be the same
* as this factory.
* @param publisherConnectionFactory the factory.
* @since 2.3.2
*/
public void setPublisherConnectionFactory(
@Nullable AbstractConnectionFactory publisherConnectionFactory) {

doSetPublisherConnectionFactory(publisherConnectionFactory);
}

protected final void doSetPublisherConnectionFactory(
@Nullable AbstractConnectionFactory publisherConnectionFactory) {

this.publisherConnectionFactory = publisherConnectionFactory;
}

Expand Down Expand Up @@ -472,6 +485,26 @@ public void setConnectionNameStrategy(ConnectionNameStrategy connectionNameStrat
}
}

/**
* Set the strategy for logging close exceptions; by default, if a channel is closed due to a failed
* passive queue declaration, it is logged at debug level. Normal channel closes (200 OK) are not
* logged. All others are logged at ERROR level (unless access is refused due to an exclusive consumer
* condition, in which case, it is logged at INFO level).
* @param closeExceptionLogger the {@link ConditionalExceptionLogger}.
* @since 1.5
*/
public void setCloseExceptionLogger(ConditionalExceptionLogger closeExceptionLogger) {
Assert.notNull(closeExceptionLogger, "'closeExceptionLogger' cannot be null");
this.closeExceptionLogger = closeExceptionLogger;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setCloseExceptionLogger(closeExceptionLogger);
}
}

protected ConnectionNameStrategy getConnectionNameStrategy() {
return this.connectionNameStrategy;
}

@Override
public void setBeanName(String name) {
this.beanName = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.support.ConditionalExceptionLogger;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
Expand Down Expand Up @@ -184,8 +183,6 @@ public enum ConfirmType {

private final AtomicInteger connectionHighWaterMark = new AtomicInteger();

private final CachingConnectionFactory publisherConnectionFactory;

/** Synchronization monitor for the shared Connection. */
private final Object connectionMonitor = new Object();

Expand All @@ -207,6 +204,8 @@ public enum ConfirmType {

private PublisherCallbackChannelFactory publisherChannelFactory = PublisherCallbackChannelImpl.factory();

private boolean defaultPublisherFactory = true;

private volatile boolean active = true;

private volatile boolean initialized;
Expand Down Expand Up @@ -257,8 +256,7 @@ public CachingConnectionFactory(@Nullable String hostNameArg, int port) {
}
setHost(hostname);
setPort(port);
this.publisherConnectionFactory = new CachingConnectionFactory(getRabbitConnectionFactory(), true);
setPublisherConnectionFactory(this.publisherConnectionFactory);
doSetPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
}

/**
Expand All @@ -269,8 +267,7 @@ public CachingConnectionFactory(@Nullable String hostNameArg, int port) {
public CachingConnectionFactory(URI uri) {
super(newRabbitConnectionFactory());
setUri(uri);
this.publisherConnectionFactory = new CachingConnectionFactory(getRabbitConnectionFactory(), true);
setPublisherConnectionFactory(this.publisherConnectionFactory);
doSetPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
}

/**
Expand All @@ -288,6 +285,7 @@ public CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConn
*/
private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
boolean isPublisherFactory) {

super(rabbitConnectionFactory);
if (!isPublisherFactory) {
if (rabbitConnectionFactory.isAutomaticRecoveryEnabled()) {
Expand All @@ -301,11 +299,11 @@ private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitCon
+ "'getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true)',\n"
+ "but this is discouraged.");
}
this.publisherConnectionFactory = new CachingConnectionFactory(getRabbitConnectionFactory(), true);
setPublisherConnectionFactory(this.publisherConnectionFactory);
super.setPublisherConnectionFactory(new CachingConnectionFactory(getRabbitConnectionFactory(), true));
}
else {
this.publisherConnectionFactory = null;
super.setPublisherConnectionFactory(null);
this.defaultPublisherFactory = false;
}
}

Expand All @@ -315,6 +313,12 @@ private static com.rabbitmq.client.ConnectionFactory newRabbitConnectionFactory(
return connectionFactory;
}

@Override
public void setPublisherConnectionFactory(@Nullable AbstractConnectionFactory publisherConnectionFactory) {
super.setPublisherConnectionFactory(publisherConnectionFactory);
this.defaultPublisherFactory = false;
}

/**
* The number of channels to maintain in the cache. By default, channels are allocated on
* demand (unbounded) and this represents the maximum cache size. To limit the available
Expand All @@ -325,8 +329,8 @@ private static com.rabbitmq.client.ConnectionFactory newRabbitConnectionFactory(
public void setChannelCacheSize(int sessionCacheSize) {
Assert.isTrue(sessionCacheSize >= 1, "Channel cache size must be 1 or higher");
this.channelCacheSize = sessionCacheSize;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setChannelCacheSize(sessionCacheSize);
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).setChannelCacheSize(sessionCacheSize);
}
}

Expand All @@ -342,8 +346,8 @@ public void setCacheMode(CacheMode cacheMode) {
Assert.isTrue(!this.initialized, "'cacheMode' cannot be changed after initialization.");
Assert.notNull(cacheMode, "'cacheMode' must not be null.");
this.cacheMode = cacheMode;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setCacheMode(cacheMode);
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).setCacheMode(cacheMode);
}
}

Expand All @@ -354,8 +358,8 @@ public int getConnectionCacheSize() {
public void setConnectionCacheSize(int connectionCacheSize) {
Assert.isTrue(connectionCacheSize >= 1, "Connection cache size must be 1 or higher.");
this.connectionCacheSize = connectionCacheSize;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setConnectionCacheSize(connectionCacheSize);
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).setConnectionCacheSize(connectionCacheSize);
}
}

Expand All @@ -370,8 +374,8 @@ public void setConnectionCacheSize(int connectionCacheSize) {
public void setConnectionLimit(int connectionLimit) {
Assert.isTrue(connectionLimit >= 1, "Connection limit must be 1 or higher.");
this.connectionLimit = connectionLimit;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setConnectionLimit(connectionLimit);
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).setConnectionLimit(connectionLimit);
}
}

Expand All @@ -387,8 +391,8 @@ public boolean isPublisherReturns() {

public void setPublisherReturns(boolean publisherReturns) {
this.publisherReturns = publisherReturns;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setPublisherReturns(publisherReturns);
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).setPublisherReturns(publisherReturns);
}
}

Expand Down Expand Up @@ -444,8 +448,8 @@ public boolean isSimplePublisherConfirms() {
public void setPublisherConfirmType(ConfirmType confirmType) {
Assert.notNull(confirmType, "'confirmType' cannot be null");
this.confirmType = confirmType;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setPublisherConfirmType(confirmType);
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).setPublisherConfirmType(confirmType);
}
}

Expand All @@ -463,24 +467,9 @@ public void setPublisherConfirmType(ConfirmType confirmType) {
*/
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {
this.channelCheckoutTimeout = channelCheckoutTimeout;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setChannelCheckoutTimeout(channelCheckoutTimeout);
}
}

/**
* Set the strategy for logging close exceptions; by default, if a channel is closed due to a failed
* passive queue declaration, it is logged at debug level. Normal channel closes (200 OK) are not
* logged. All others are logged at ERROR level (unless access is refused due to an exclusive consumer
* condition, in which case, it is logged at INFO level).
* @param closeExceptionLogger the {@link ConditionalExceptionLogger}.
* @since 1.5
*/
public void setCloseExceptionLogger(ConditionalExceptionLogger closeExceptionLogger) {
Assert.notNull(closeExceptionLogger, "'closeExceptionLogger' cannot be null");
this.closeExceptionLogger = closeExceptionLogger;
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setCloseExceptionLogger(closeExceptionLogger);
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory())
.setChannelCheckoutTimeout(channelCheckoutTimeout);
}
}

Expand All @@ -502,8 +491,8 @@ public void afterPropertiesSet() {
"When the cache mode is 'CHANNEL', the connection cache size cannot be configured.");
}
initCacheWaterMarks();
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.afterPropertiesSet();
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).afterPropertiesSet();
}
}

Expand Down Expand Up @@ -901,8 +890,8 @@ public void resetConnection() {
this.channelHighWaterMarks.values().forEach(count -> count.set(0));
this.connectionHighWaterMark.set(0);
}
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.resetConnection();
if (this.defaultPublisherFactory) {
((CachingConnectionFactory) getPublisherConnectionFactory()).resetConnection();
}
}

Expand Down Expand Up @@ -995,8 +984,8 @@ public Properties getCacheProperties() {
*/
@ManagedAttribute
public Properties getPublisherConnectionFactoryCacheProperties() {
if (this.publisherConnectionFactory != null) {
return this.publisherConnectionFactory.getCacheProperties();
if (this.defaultPublisherFactory) {
return ((CachingConnectionFactory) getPublisherConnectionFactory()).getCacheProperties();
}
return new Properties();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import com.rabbitmq.client.Channel;
Expand All @@ -60,6 +61,8 @@ public class PooledChannelConnectionFactory extends AbstractConnectionFactory im

private BiConsumer<GenericObjectPool<Channel>, Boolean> poolConfigurer = (pool, tx) -> { };

private boolean defaultPublisherFactory = true;

/**
* Construct an instance.
* @param rabbitConnectionFactory the rabbitmq connection factory.
Expand All @@ -78,6 +81,15 @@ private PooledChannelConnectionFactory(ConnectionFactory rabbitConnectionFactory
if (!isPublisher) {
setPublisherConnectionFactory(new PooledChannelConnectionFactory(rabbitConnectionFactory, true));
}
else {
this.defaultPublisherFactory = false;
}
}

@Override
public void setPublisherConnectionFactory(@Nullable AbstractConnectionFactory publisherConnectionFactory) {
super.setPublisherConnectionFactory(publisherConnectionFactory);
this.defaultPublisherFactory = false;
}

/**
Expand All @@ -88,6 +100,9 @@ private PooledChannelConnectionFactory(ConnectionFactory rabbitConnectionFactory
public void setPoolConfigurer(BiConsumer<GenericObjectPool<Channel>, Boolean> poolConfigurer) {
Assert.notNull(poolConfigurer, "'poolConfigurer' cannot be null");
this.poolConfigurer = poolConfigurer; // NOSONAR - sync inconsistency
if (this.defaultPublisherFactory) {
((PooledChannelConnectionFactory) getPublisherConnectionFactory()).setPoolConfigurer(poolConfigurer);
}
}

@Override
Expand All @@ -101,6 +116,10 @@ public boolean isSimplePublisherConfirms() {
*/
public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
this.simplePublisherConfirms = simplePublisherConfirms;
if (this.defaultPublisherFactory) {
((ThreadChannelConnectionFactory) getPublisherConnectionFactory())
.setSimplePublisherConfirms(simplePublisherConfirms);
}
}

@Override
Expand All @@ -117,7 +136,7 @@ public synchronized Connection createConnection() throws AmqpException {
if (this.connection == null || !this.connection.isOpen()) {
Connection bareConnection = createBareConnection(); // NOSONAR - see destroy()
this.connection = new ConnectionWrapper(bareConnection.getDelegate(), getCloseTimeout(), // NOSONAR
this.simplePublisherConfirms, this.poolConfigurer, getChannelListener());
this.simplePublisherConfirms, this.poolConfigurer, getChannelListener()); // NOSONAR
getConnectionListener().onCreate(this.connection);
}
return this.connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
import org.springframework.lang.Nullable;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
Expand All @@ -47,6 +48,8 @@ public class ThreadChannelConnectionFactory extends AbstractConnectionFactory im

private boolean simplePublisherConfirms;

private boolean defaultPublisherFactory = true;

/**
* Construct an instance.
* @param rabbitConnectionFactory the rabbitmq connection factory.
Expand All @@ -65,6 +68,15 @@ private ThreadChannelConnectionFactory(ConnectionFactory rabbitConnectionFactory
if (!isPublisher) {
setPublisherConnectionFactory(new ThreadChannelConnectionFactory(rabbitConnectionFactory, true));
}
else {
this.defaultPublisherFactory = false;
}
}

@Override
public void setPublisherConnectionFactory(@Nullable AbstractConnectionFactory publisherConnectionFactory) {
super.setPublisherConnectionFactory(publisherConnectionFactory);
this.defaultPublisherFactory = false;
}

@Override
Expand All @@ -78,6 +90,10 @@ public boolean isSimplePublisherConfirms() {
*/
public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
this.simplePublisherConfirms = simplePublisherConfirms;
if (this.defaultPublisherFactory) {
((ThreadChannelConnectionFactory) getPublisherConnectionFactory())
.setSimplePublisherConfirms(simplePublisherConfirms);
}
}

@Override
Expand Down
Loading

0 comments on commit 99ccb1b

Please sign in to comment.