diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 487ee0ccb4cb..70d17a4c9884 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService { private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration maxAckExtensionPeriod; - private final ScheduledExecutorService executor; + // The ExecutorProvider used to generate executors for processing messages. + private final ExecutorProvider executorProvider; + // An instantiation of the SystemExecutorProvider used for processing acks + // and other system actions. @Nullable private final ScheduledExecutorService alarmsExecutor; private final Distribution ackLatencyDistribution = new Distribution(MAX_ACK_DEADLINE_SECONDS + 1); @@ -132,16 +135,7 @@ private Subscriber(Builder builder) { this.numPullers = builder.parallelPullCount; - executor = builder.executorProvider.getExecutor(); - if (builder.executorProvider.shouldAutoClose()) { - closeables.add( - new AutoCloseable() { - @Override - public void close() throws IOException { - executor.shutdown(); - } - }); - } + executorProvider = builder.executorProvider; ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider; if (systemExecutorProvider == null) { @@ -322,6 +316,17 @@ public void run() { private void startStreamingConnections() { synchronized (streamingSubscriberConnections) { for (int i = 0; i < numPullers; i++) { + final ScheduledExecutorService executor = executorProvider.getExecutor(); + if (executorProvider.shouldAutoClose()) { + closeables.add( + new AutoCloseable() { + @Override + public void close() { + executor.shutdown(); + } + }); + } + streamingSubscriberConnections.add( new StreamingSubscriberConnection( subscriptionName, @@ -364,7 +369,7 @@ private void stopAllStreamingConnections() { private void startConnections( List connections, final ApiService.Listener connectionsListener) { for (ApiService subscriber : connections) { - subscriber.addListener(connectionsListener, executor); + subscriber.addListener(connectionsListener, alarmsExecutor); subscriber.startAsync(); } for (ApiService subscriber : connections) { @@ -398,8 +403,7 @@ public static final class Builder { static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() - .setExecutorThreadCount( - THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors()) + .setExecutorThreadCount(THREADS_PER_CHANNEL) .build(); String subscriptionName; @@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { return this; } - /** Gives the ability to set a custom executor. */ + /** + * Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be + * called {@link Builder#parallelPullCount} times. + */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); return this;