From e3bf030cdf96952a48c25ba93af95f9a795061d6 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Tue, 6 Oct 2020 15:17:43 +0800 Subject: [PATCH 1/4] use ThreadPoolExecutor --- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 6 ++---- .../org/apache/pulsar/client/impl/PulsarClientImpl.java | 9 ++++++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index a61952c893ec7..f85cb34f0ecd0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -230,9 +230,7 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { } else { // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid // recursion and stack overflow - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); + client.getIoExecutorService().execute(() -> receiveMessageFromConsumer(consumer)); } } finally { lock.writeLock().unlock(); @@ -314,7 +312,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { } // if messages are readily available on consumer we will attempt to writeLock on the same thread - client.eventLoopGroup().execute(() -> { + client.getIoExecutorService().execute(() -> { receiveMessageFromConsumer(consumer); }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index cbdcb27eacb6e..32e66de90ac19 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -41,8 +41,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -65,7 +67,6 @@ import org.apache.pulsar.client.api.schema.SchemaInfoProvider; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.transaction.TransactionBuilder; -import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -110,6 +111,7 @@ public enum State { private final AtomicLong requestIdGenerator = new AtomicLong(); private final EventLoopGroup eventLoopGroup; + private final ExecutorService ioExecutorService; private final LoadingCache schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { @@ -145,6 +147,8 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr conf.getAuthentication().start(); this.cnxPool = cnxPool; externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener")); + ioExecutorService = new ThreadPoolExecutor(conf.getNumIoThreads(), conf.getNumIoThreads(), 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), getThreadFactory("pulsar-io-thread")); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, eventLoopGroup); } else { @@ -815,6 +819,9 @@ protected CompletableFuture> preProcessSchemaBeforeSubscribe(Pulsa return CompletableFuture.completedFuture(schema); } + public ExecutorService getIoExecutorService() { + return ioExecutorService; + } // // Transaction related API // From 8612e46e11d617bddfe7268d57b5a3b3b9250aa4 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Tue, 6 Oct 2020 15:46:57 +0800 Subject: [PATCH 2/4] add shutdown --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 32e66de90ac19..88821e12f8735 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -595,6 +595,7 @@ public void shutdown() throws PulsarClientException { cnxPool.close(); timer.stop(); externalExecutorProvider.shutdownNow(); + ioExecutorService.shutdownNow(); conf.getAuthentication().close(); } catch (Throwable t) { log.warn("Failed to shutdown Pulsar client", t); From b19536ba52067d72a2378d83fa3eb2a057c72932 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Thu, 8 Oct 2020 00:49:47 +0800 Subject: [PATCH 3/4] use ExecutorProvider instead --- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ++-- .../apache/pulsar/client/impl/PulsarClientImpl.java | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f85cb34f0ecd0..31400a7f3570d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -230,7 +230,7 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { } else { // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid // recursion and stack overflow - client.getIoExecutorService().execute(() -> receiveMessageFromConsumer(consumer)); + client.getInternalExecutorService().execute(() -> receiveMessageFromConsumer(consumer)); } } finally { lock.writeLock().unlock(); @@ -312,7 +312,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { } // if messages are readily available on consumer we will attempt to writeLock on the same thread - client.getIoExecutorService().execute(() -> { + client.getInternalExecutorService().execute(() -> { receiveMessageFromConsumer(consumer); }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 88821e12f8735..3853c3185d69e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -97,6 +97,7 @@ public class PulsarClientImpl implements PulsarClient { private final ConnectionPool cnxPool; private final Timer timer; private final ExecutorProvider externalExecutorProvider; + private final ExecutorProvider internalExecutorService; public enum State { Open, Closing, Closed @@ -111,7 +112,6 @@ public enum State { private final AtomicLong requestIdGenerator = new AtomicLong(); private final EventLoopGroup eventLoopGroup; - private final ExecutorService ioExecutorService; private final LoadingCache schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000) .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader() { @@ -147,8 +147,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr conf.getAuthentication().start(); this.cnxPool = cnxPool; externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener")); - ioExecutorService = new ThreadPoolExecutor(conf.getNumIoThreads(), conf.getNumIoThreads(), 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), getThreadFactory("pulsar-io-thread")); + internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), getThreadFactory("pulsar-client-io")); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, eventLoopGroup); } else { @@ -595,7 +594,7 @@ public void shutdown() throws PulsarClientException { cnxPool.close(); timer.stop(); externalExecutorProvider.shutdownNow(); - ioExecutorService.shutdownNow(); + internalExecutorService.shutdownNow(); conf.getAuthentication().close(); } catch (Throwable t) { log.warn("Failed to shutdown Pulsar client", t); @@ -820,8 +819,8 @@ protected CompletableFuture> preProcessSchemaBeforeSubscribe(Pulsa return CompletableFuture.completedFuture(schema); } - public ExecutorService getIoExecutorService() { - return ioExecutorService; + public ExecutorService getInternalExecutorService() { + return internalExecutorService.getExecutor(); } // // Transaction related API From 64daa07e801579c6d83b5fcb0b2a596ba2faf198 Mon Sep 17 00:00:00 2001 From: feynmanlin <315157973@qq.com> Date: Sun, 11 Oct 2020 21:39:50 +0800 Subject: [PATCH 4/4] Update pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java Co-authored-by: lipenghui --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 3853c3185d69e..64f1ef96de0c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -147,7 +147,7 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr conf.getAuthentication().start(); this.cnxPool = cnxPool; externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), getThreadFactory("pulsar-external-listener")); - internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), getThreadFactory("pulsar-client-io")); + internalExecutorService = new ExecutorProvider(conf.getNumIoThreads(), getThreadFactory("pulsar-client-internal")); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(conf, eventLoopGroup); } else {