From f9a9b1b094b3bf649684a35374f2de83b5e1cc8d Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 4 Mar 2019 15:54:18 -0500 Subject: [PATCH 1/2] Change each StreamingSubscriberConnection to have its own executor by default. This increases throughput by reducing contention on the executor queue mutex and makes the Subscriber implementation more accurately reflect the users intent when an InstantiatingExecutorProvider is passed. --- .../google/cloud/pubsub/v1/Subscriber.java | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 487ee0ccb4cb..3b5e413dc4c9 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -99,7 +99,7 @@ public class Subscriber extends AbstractApiService { private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration maxAckExtensionPeriod; - private final ScheduledExecutorService executor; + private final ExecutorProvider executorProvider; @Nullable private final ScheduledExecutorService alarmsExecutor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); @@ -132,16 +132,7 @@ private Subscriber(Builder builder) { this.numPullers = builder.parallelPullCount; - executor = builder.executorProvider.getExecutor(); - if (builder.executorProvider.shouldAutoClose()) { - closeables.add( - new AutoCloseable() { - @Override - public void close() throws IOException { - executor.shutdown(); - } - }); - } + executorProvider = builder.executorProvider; ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider; if (systemExecutorProvider == null) { @@ -322,6 +313,17 @@ public void run() { private void startStreamingConnections() { synchronized (streamingSubscriberConnections) { for (int i = 0; i < numPullers; i++) { + final ScheduledExecutorService executor = executorProvider.getExecutor(); + if (executorProvider.shouldAutoClose()) { + closeables.add( + new AutoCloseable() { + @Override + public void close() { + executor.shutdown(); + } + }); + } + streamingSubscriberConnections.add( new StreamingSubscriberConnection( subscriptionName, @@ -364,7 +366,7 @@ private void stopAllStreamingConnections() { private void startConnections( List connections, final ApiService.Listener connectionsListener) { for (ApiService subscriber : connections) { - subscriber.addListener(connectionsListener, executor); + subscriber.addListener(connectionsListener, alarmsExecutor); subscriber.startAsync(); } for (ApiService subscriber : connections) { @@ -398,8 +400,7 @@ public static final class Builder { static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() - .setExecutorThreadCount( - THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors()) + .setExecutorThreadCount(THREADS_PER_CHANNEL) .build(); String subscriptionName; @@ -502,7 +503,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { return this; } - /** Gives the ability to set a custom executor. */ + /** + * Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be + * called {@link Builder#parallelPullCount} times. + */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); return this; From 2629007045ad6f962ed25234850b4e75404be297 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 11 Mar 2019 10:38:25 -0400 Subject: [PATCH 2/2] Add a comment for executorProvider and alarmsExecutor. --- .../src/main/java/com/google/cloud/pubsub/v1/Subscriber.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 3b5e413dc4c9..70d17a4c9884 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService { private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration maxAckExtensionPeriod; + // The ExecutorProvider used to generate executors for processing messages. private final ExecutorProvider executorProvider; + // An instantiation of the SystemExecutorProvider used for processing acks + // and other system actions. @Nullable private final ScheduledExecutorService alarmsExecutor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);