Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change each StreamingSubscriberConnection to have its own executor by default. #4622

Merged
merged 2 commits into from
Mar 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration maxAckExtensionPeriod;
private final ScheduledExecutorService executor;
// The ExecutorProvider used to generate executors for processing messages.
private final ExecutorProvider executorProvider;
// An instantiation of the SystemExecutorProvider used for processing acks
// and other system actions.
@Nullable private final ScheduledExecutorService alarmsExecutor;
private final Distribution ackLatencyDistribution =
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
Expand Down Expand Up @@ -132,16 +135,7 @@ private Subscriber(Builder builder) {

this.numPullers = builder.parallelPullCount;

executor = builder.executorProvider.getExecutor();
if (builder.executorProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() throws IOException {
executor.shutdown();
}
});
}
executorProvider = builder.executorProvider;

ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
if (systemExecutorProvider == null) {
Expand Down Expand Up @@ -322,6 +316,17 @@ public void run() {
private void startStreamingConnections() {
synchronized (streamingSubscriberConnections) {
for (int i = 0; i < numPullers; i++) {
final ScheduledExecutorService executor = executorProvider.getExecutor();
if (executorProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() {
executor.shutdown();
}
});
}

streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
subscriptionName,
Expand Down Expand Up @@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
private void startConnections(
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
for (ApiService subscriber : connections) {
subscriber.addListener(connectionsListener, executor);
subscriber.addListener(connectionsListener, alarmsExecutor);
subscriber.startAsync();
}
for (ApiService subscriber : connections) {
Expand Down Expand Up @@ -398,8 +403,7 @@ public static final class Builder {

static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
.setExecutorThreadCount(THREADS_PER_CHANNEL)
.build();

String subscriptionName;
Expand Down Expand Up @@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
return this;
}

/** Gives the ability to set a custom executor. */
/**
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
* called {@link Builder#parallelPullCount} times.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
return this;
Expand Down