Skip to content

Commit

Permalink
Change each StreamingSubscriberConnection to have its own executor by…
Browse files Browse the repository at this point in the history
… default. (#4622)

* Change each StreamingSubscriberConnection to have its own executor by default.

This increases throughput by reducing contention on the executor queue mutex and makes the Subscriber implementation more accurately reflect the users intent when an InstantiatingExecutorProvider is passed.

* Add a comment for executorProvider and alarmsExecutor.
  • Loading branch information
dpcollins-google authored and sduskis committed Mar 12, 2019
1 parent 257aa8e commit 67bf8fc
Showing 1 changed file with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration maxAckExtensionPeriod;
private final ScheduledExecutorService executor;
// The ExecutorProvider used to generate executors for processing messages.
private final ExecutorProvider executorProvider;
// An instantiation of the SystemExecutorProvider used for processing acks
// and other system actions.
@Nullable private final ScheduledExecutorService alarmsExecutor;
private final Distribution ackLatencyDistribution =
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
Expand Down Expand Up @@ -132,16 +135,7 @@ private Subscriber(Builder builder) {

this.numPullers = builder.parallelPullCount;

executor = builder.executorProvider.getExecutor();
if (builder.executorProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() throws IOException {
executor.shutdown();
}
});
}
executorProvider = builder.executorProvider;

ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
if (systemExecutorProvider == null) {
Expand Down Expand Up @@ -322,6 +316,17 @@ public void run() {
private void startStreamingConnections() {
synchronized (streamingSubscriberConnections) {
for (int i = 0; i < numPullers; i++) {
final ScheduledExecutorService executor = executorProvider.getExecutor();
if (executorProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() {
executor.shutdown();
}
});
}

streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
subscriptionName,
Expand Down Expand Up @@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
private void startConnections(
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
for (ApiService subscriber : connections) {
subscriber.addListener(connectionsListener, executor);
subscriber.addListener(connectionsListener, alarmsExecutor);
subscriber.startAsync();
}
for (ApiService subscriber : connections) {
Expand Down Expand Up @@ -398,8 +403,7 @@ public static final class Builder {

static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
.setExecutorThreadCount(THREADS_PER_CHANNEL)
.build();

String subscriptionName;
Expand Down Expand Up @@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
return this;
}

/** Gives the ability to set a custom executor. */
/**
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
* called {@link Builder#parallelPullCount} times.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
return this;
Expand Down

0 comments on commit 67bf8fc

Please sign in to comment.