From 7bf27de1528ab7520f7303099aae078f4dbe194b Mon Sep 17 00:00:00 2001 From: Will Droste Date: Thu, 12 Jul 2018 13:44:12 -0500 Subject: [PATCH] AMQP-824: Name for deferredCloseExec thread pool JIRA https://jira.spring.io/browse/AMQP-824 Taking the comments into account Fix build * Polishing for code style **Cherry-pick to 2.0.x & 2.1.x** --- .../connection/AbstractConnectionFactory.java | 13 ++- .../connection/CachingConnectionFactory.java | 87 ++++++++++++++----- ...stenerContainerPlaceholderParserTests.java | 14 ++- 3 files changed, 85 insertions(+), 29 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java index 066bef4aac..38d4124cdf 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,6 +51,7 @@ * @author Gary Russell * @author Steve Powell * @author Artem Bilan + * @author Will Droste * */ public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware { @@ -343,6 +344,16 @@ public void setBeanName(String name) { this.beanName = name; } + /** + * Return a bean name of the component or null if not a bean. + * @return the bean name or null. + * @since 1.7.9 + */ + protected String getBeanName() { + return this.beanName; + } + + protected final Connection createBareConnection() { try { String connectionName = this.connectionNameStrategy == null ? null diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java index fa9ff653cc..54a5e8c002 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +57,7 @@ import org.springframework.context.event.ContextClosedEvent; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedResource; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -92,14 +94,22 @@ * @author Gary Russell * @author Artem Bilan * @author Steve Powell + * @author Will Droste */ @ManagedResource public class CachingConnectionFactory extends AbstractConnectionFactory implements InitializingBean, ShutdownListener, ApplicationContextAware, ApplicationListener, - PublisherCallbackChannelConnectionFactory { + PublisherCallbackChannelConnectionFactory { private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25; + private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-"; + + /** + * Create a unique ID for the pool + */ + private static final AtomicInteger threadPoolId = new AtomicInteger(); + private static final Set txStarts = new HashSet(Arrays.asList("basicPublish", "basicAck", "basicNack", "basicReject")); @@ -122,10 +132,10 @@ public enum CacheMode { new HashSet(); private final Map> - allocatedConnectionNonTransactionalChannels = new HashMap>(); + allocatedConnectionNonTransactionalChannels = new HashMap>(); private final Map> - allocatedConnectionTransactionalChannels = new HashMap>(); + allocatedConnectionTransactionalChannels = new HashMap>(); private final BlockingDeque idleConnections = new LinkedBlockingDeque(); @@ -166,12 +176,15 @@ public enum CacheMode { private volatile ConditionalExceptionLogger closeExceptionLogger = new DefaultChannelCloseLogger(); - /** Synchronization monitor for the shared Connection */ + /** + * Synchronization monitor for the shared Connection + */ private final Object connectionMonitor = new Object(); - /** Executor used for deferred close if no explicit executor set. */ - private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool(); - + /** + * Executor used for deferred close if no explicit executor set. + */ + private ExecutorService deferredCloseExecutor; /** * Create a new CachingConnectionFactory initializing the hostname to be the value returned from @@ -406,7 +419,7 @@ private Channel getChannel(ChannelCachingConnectionProxy connection, boolean tra } if (logger.isDebugEnabled()) { logger.debug( - "Acquired permit for " + connection + ", remaining:" + checkoutPermits.availablePermits()); + "Acquired permit for " + connection + ", remaining:" + checkoutPermits.availablePermits()); } } catch (InterruptedException e) { @@ -486,7 +499,7 @@ private Channel getChannel(ChannelCachingConnectionProxy connection, boolean tra checkoutPermits.release(); if (logger.isDebugEnabled()) { logger.debug("Could not get channel; released permit for " + connection + ", remaining:" - + checkoutPermits.availablePermits()); + + checkoutPermits.availablePermits()); } } throw e; @@ -692,7 +705,9 @@ public final void destroy() { resetConnection(); if (this.contextStopped) { this.stopped = true; - this.deferredCloseExecutor.shutdownNow(); + if (this.deferredCloseExecutor != null) { + this.deferredCloseExecutor.shutdownNow(); + } } } @@ -820,12 +835,12 @@ public Properties getCacheProperties() { props.setProperty("connectionCacheSize", Integer.toString(this.connectionCacheSize)); props.setProperty("openConnections", Integer.toString(countOpenConnections())); props.setProperty("idleConnections", Integer.toString(this.idleConnections.size())); - props.setProperty("idleConnectionsHighWater", Integer.toString(this.connectionHighWaterMark.get())); + props.setProperty("idleConnectionsHighWater", Integer.toString(this.connectionHighWaterMark.get())); for (ChannelCachingConnectionProxy proxy : this.allocatedConnections) { putConnectionName(props, proxy, ":" + proxy.getLocalPort()); } for (Entry> entry : - this.allocatedConnectionTransactionalChannels.entrySet()) { + this.allocatedConnectionTransactionalChannels.entrySet()) { int port = entry.getKey().getLocalPort(); if (port > 0 && entry.getKey().isOpen()) { LinkedList channelList = entry.getValue(); @@ -835,7 +850,7 @@ public Properties getCacheProperties() { } } for (Entry> entry : - this.allocatedConnectionNonTransactionalChannels.entrySet()) { + this.allocatedConnectionNonTransactionalChannels.entrySet()) { int port = entry.getKey().getLocalPort(); if (port > 0 && entry.getKey().isOpen()) { LinkedList channelList = entry.getValue(); @@ -880,6 +895,28 @@ private int countOpenConnections() { return n; } + /** + * Determine the executor service used to close connections. + * @return specified executor service otherwise the default one is created and returned. + * @since 1.7.9 + */ + protected ExecutorService getDeferredCloseExecutor() { + if (getExecutorService() != null) { + return getExecutorService(); + } + synchronized (this.connectionMonitor) { + if (this.deferredCloseExecutor == null) { + final String threadPrefix = + getBeanName() == null + ? DEFAULT_DEFERRED_POOL_PREFIX + threadPoolId.incrementAndGet() + : getBeanName(); + ThreadFactory threadPoolFactory = new CustomizableThreadFactory(threadPrefix); + this.deferredCloseExecutor = Executors.newCachedThreadPool(threadPoolFactory); + } + } + return this.deferredCloseExecutor; + } + @Override public String toString() { return "CachingConnectionFactory [channelCacheSize=" + this.channelCacheSize + ", host=" + getHost() @@ -1033,7 +1070,7 @@ private void releasePermitIfNecessary(Object proxy) { checkoutPermits.release(); if (logger.isDebugEnabled()) { logger.debug("Released permit for '" + this.theConnection + "', remaining: " - + checkoutPermits.availablePermits()); + + checkoutPermits.availablePermits()); } } else { @@ -1097,9 +1134,7 @@ private void physicalClose() throws Exception { if (CachingConnectionFactory.this.active && (CachingConnectionFactory.this.publisherConfirms || CachingConnectionFactory.this.publisherReturns)) { - ExecutorService executorService = (getExecutorService() != null - ? getExecutorService() - : CachingConnectionFactory.this.deferredCloseExecutor); + ExecutorService executorService = getDeferredCloseExecutor(); final Channel channel = CachedChannelInvocationHandler.this.target; executorService.execute(new Runnable() { @@ -1116,14 +1151,18 @@ public void run() { catch (InterruptedException e) { Thread.currentThread().interrupt(); } - catch (Exception e) { } + catch (Exception e) { + } finally { try { channel.close(); } - catch (IOException e) { } - catch (AlreadyClosedException e) { } - catch (TimeoutException e) { } + catch (IOException e) { + } + catch (AlreadyClosedException e) { + } + catch (TimeoutException e) { + } catch (ShutdownSignalException e) { if (!RabbitUtils.isNormalShutdown(e)) { logger.debug("Unexpected exception on deferred close", e); @@ -1253,8 +1292,8 @@ public int getLocalPort() { @Override public String toString() { return "Proxy@" + ObjectUtils.getIdentityHexString(this) + " " - + (CachingConnectionFactory.this.cacheMode == CacheMode.CHANNEL ? "Shared " : "Dedicated ") - + "Rabbit Connection: " + this.target; + + (CachingConnectionFactory.this.cacheMode == CacheMode.CHANNEL ? "Shared " : "Dedicated ") + + "Rabbit Connection: " + this.target; } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java index a754ba23d3..c59852a472 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerPlaceholderParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2016 the original author or authors. + * Copyright 2010-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.junit.After; @@ -42,6 +43,7 @@ /** * @author Dave Syer * @author Gary Russell + * @author Will Droste */ public final class ListenerContainerPlaceholderParserTests { @@ -58,14 +60,18 @@ public void closeBeanFactory() throws Exception { if (this.context != null) { CachingConnectionFactory cf = this.context.getBean(CachingConnectionFactory.class); this.context.close(); - assertTrue(TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class) - .isTerminated()); + ExecutorService es = TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class); + if (es != null) { + // if it gets started make sure its terminated.. + assertTrue(es.isTerminated()); + } } } @Test public void testParseWithQueueNames() throws Exception { - SimpleMessageListenerContainer container = this.context.getBean("testListener", SimpleMessageListenerContainer.class); + SimpleMessageListenerContainer container = + this.context.getBean("testListener", SimpleMessageListenerContainer.class); assertEquals(AcknowledgeMode.MANUAL, container.getAcknowledgeMode()); assertEquals(this.context.getBean(ConnectionFactory.class), container.getConnectionFactory()); assertEquals(MessageListenerAdapter.class, container.getMessageListener().getClass());