Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

fix: stop overriding default grpc executor #1355

Merged
merged 10 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
: builder.directPathServiceConfig;
}

/**
* @deprecated If executor is not set, this channel provider will create channels with default
* grpc executor.
*/
@Deprecated
mutianf marked this conversation as resolved.
Show resolved Hide resolved
@Override
public boolean needsExecutor() {
return executor == null;
Expand Down Expand Up @@ -212,9 +217,7 @@ public TransportChannelProvider withCredentials(Credentials credentials) {

@Override
public TransportChannel getTransportChannel() throws IOException {
if (needsExecutor()) {
throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true");
} else if (needsHeaders()) {
if (needsHeaders()) {
throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true");
} else if (needsEndpoint()) {
throw new IllegalStateException("getTransportChannel() called when needsEndpoint() is true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public boolean shouldAutoClose() {
return true;
}

@Deprecated
@Override
public boolean needsExecutor() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ private InstantiatingHttpJsonChannelProvider(
this.mtlsProvider = mtlsProvider;
}

/**
* @deprecated If executor is not set, this channel provider will create channels with default
* executor defined in {@link ManagedHttpJsonChannel}.
*/
@Deprecated
@Override
public boolean needsExecutor() {
return executor == null;
Expand Down Expand Up @@ -149,9 +154,7 @@ public String getTransportName() {

@Override
public TransportChannel getTransportChannel() throws IOException {
if (needsExecutor()) {
throw new IllegalStateException("getTransportChannel() called when needsExecutor() is true");
} else if (needsHeaders()) {
if (needsHeaders()) {
throw new IllegalStateException("getTransportChannel() called when needsHeaders() is true");
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,23 @@
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/** Implementation of HttpJsonChannel which can issue http-json calls. */
@BetaApi
public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource {
private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance();
private static final ExecutorService DEFAULT_EXECUTOR =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the same thing for gRPC InstantiatingChannelProvider? I.e. would Executors.newCachedThreadPool() instantiated by grpc be any different from Executors.newCachedThreadPool() instantiated in gax?

Also, I'm not sure that we need to touch HTTP stuff here at all, as it is not necessarily good to repeat in http world everything which is done in grpc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do something similar for grpc channel provider. I thought it might be better to use the default grpc one in case grpc update it to something more efficient in the future. And also since grpc is already providing default executor, we don't need to repeat it in gax?

I'm setting a default http executor here because I'm deprecating needsExecutor, and http channel provider doesn't have a default one. If we're not sure about setting it to Executors.newCachedThreadPool(), maybe we can use the default gax executor here? And I think it's better to separate channel executor and background executor?

InstantiatingExecutorProvider.newBuilder().build().getExecutor();

private final Executor executor;
private final String endpoint;
Expand Down Expand Up @@ -134,7 +138,9 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted
public void close() {}

public static Builder newBuilder() {
return new Builder().setHeaderEnhancers(new LinkedList<HttpJsonHeaderEnhancer>());
return new Builder()
.setHeaderEnhancers(new LinkedList<HttpJsonHeaderEnhancer>())
.setExecutor(DEFAULT_EXECUTOR);
}

public static class Builder {
Expand All @@ -147,7 +153,7 @@ public static class Builder {
private Builder() {}

public Builder setExecutor(Executor executor) {
this.executor = executor;
this.executor = Preconditions.checkNotNull(executor);
return this;
}

Expand All @@ -167,7 +173,6 @@ public Builder setHttpTransport(HttpTransport httpTransport) {
}

public ManagedHttpJsonChannel build() {
Preconditions.checkNotNull(executor);
Preconditions.checkNotNull(endpoint);
return new ManagedHttpJsonChannel(
executor, endpoint, jsonFactory, headerEnhancers, httpTransport);
Expand Down
28 changes: 19 additions & 9 deletions gax/src/main/java/com/google/api/gax/rpc/ClientContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
Expand All @@ -73,6 +72,10 @@ public abstract class ClientContext {
*/
public abstract List<BackgroundResource> getBackgroundResources();

/**
* Gets the executor to use for running scheduled API call logic (such as retries and long-running
* operations).
*/
public abstract ScheduledExecutorService getExecutor();

@Nullable
Expand Down Expand Up @@ -163,8 +166,8 @@ static String getEndpoint(
public static ClientContext create(StubSettings settings) throws IOException {
ApiClock clock = settings.getClock();

ExecutorProvider executorProvider = settings.getExecutorProvider();
final ScheduledExecutorService executor = executorProvider.getExecutor();
ExecutorProvider backgroundExecutorProvider = settings.getBackgroundExecutorProvider();
final ScheduledExecutorService backgroundExecutor = backgroundExecutorProvider.getExecutor();

Credentials credentials = settings.getCredentialsProvider().getCredentials();

Expand All @@ -177,8 +180,11 @@ public static ClientContext create(StubSettings settings) throws IOException {
}

TransportChannelProvider transportChannelProvider = settings.getTransportChannelProvider();
if (transportChannelProvider.needsExecutor()) {
transportChannelProvider = transportChannelProvider.withExecutor((Executor) executor);
// After needsExecutor and StubSettings#setExecutorProvider are deprecated, transport channel
// executor can only be set from TransportChannelProvider#withExecutor directly, and a provider
// will have a default executor if it needs one.
if (transportChannelProvider.needsExecutor() && settings.getExecutorProvider() != null) {
transportChannelProvider = transportChannelProvider.withExecutor(backgroundExecutor);
}
Map<String, String> headers = getHeadersFromSettings(settings);
if (transportChannelProvider.needsHeaders()) {
Expand Down Expand Up @@ -216,7 +222,7 @@ public static ClientContext create(StubSettings settings) throws IOException {
watchdogProvider = watchdogProvider.withClock(clock);
}
if (watchdogProvider.needsExecutor()) {
watchdogProvider = watchdogProvider.withExecutor(executor);
watchdogProvider = watchdogProvider.withExecutor(backgroundExecutor);
}
watchdog = watchdogProvider.getWatchdog();
}
Expand All @@ -226,16 +232,16 @@ public static ClientContext create(StubSettings settings) throws IOException {
if (transportChannelProvider.shouldAutoClose()) {
backgroundResources.add(transportChannel);
}
if (executorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(executor));
mutianf marked this conversation as resolved.
Show resolved Hide resolved
if (backgroundExecutorProvider.shouldAutoClose()) {
backgroundResources.add(new ExecutorAsBackgroundResource(backgroundExecutor));
}
if (watchdogProvider != null && watchdogProvider.shouldAutoClose()) {
backgroundResources.add(watchdog);
}

return newBuilder()
.setBackgroundResources(backgroundResources.build())
.setExecutor(executor)
.setExecutor(backgroundExecutor)
.setCredentials(credentials)
.setTransportChannel(transportChannel)
.setHeaders(ImmutableMap.copyOf(settings.getHeaderProvider().getHeaders()))
Expand Down Expand Up @@ -289,6 +295,10 @@ public abstract static class Builder {

public abstract Builder setBackgroundResources(List<BackgroundResource> backgroundResources);

/**
* Sets the executor to use for running scheduled API call logic (such as retries and
* long-running operations).
*/
public abstract Builder setExecutor(ScheduledExecutorService value);

public abstract Builder setCredentials(Credentials value);
Expand Down
47 changes: 46 additions & 1 deletion gax/src/main/java/com/google/api/gax/rpc/ClientSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.common.base.MoreObjects;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -63,10 +64,16 @@ public final StubSettings getStubSettings() {
return stubSettings;
}

/** @deprecated Please use {@link #getBackgroundExecutorProvider()} */
@Deprecated
public final ExecutorProvider getExecutorProvider() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return stubSettings.getExecutorProvider();
}

public final ExecutorProvider getBackgroundExecutorProvider() {
return stubSettings.getBackgroundExecutorProvider();
}

public final TransportChannelProvider getTransportChannelProvider() {
return stubSettings.getTransportChannelProvider();
}
Expand Down Expand Up @@ -112,6 +119,7 @@ public final Duration getWatchdogCheckInterval() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("executorProvider", getExecutorProvider())
.add("backgroundExecutorProvider", getBackgroundExecutorProvider())
.add("transportChannelProvider", getTransportChannelProvider())
.add("credentialsProvider", getCredentialsProvider())
.add("headerProvider", getHeaderProvider())
Expand Down Expand Up @@ -159,9 +167,27 @@ protected StubSettings.Builder getStubSettings() {
* call logic (such as retries and long-running operations), and also to pass to the transport
* settings if an executor is needed for the transport and it doesn't have its own executor
* provider.
*
* @deprecated Please use {@link #setBackgroundExecutorProvider(ExecutorProvider)} for setting
* executor to use for running scheduled API call logic. To set executor for {@link
* TransportChannelProvider}, please use {@link
* TransportChannelProvider#withExecutor(Executor)} instead.
*/
@Deprecated
public B setExecutorProvider(ExecutorProvider executorProvider) {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
stubSettings.setExecutorProvider(executorProvider);
stubSettings.setBackgroundExecutorProvider(executorProvider);
return self();
}

/**
* Sets the ExecutorProvider to use for getting the executor to use for running scheduled API
* call logic (such as retries and long-running operations). This will not set the executor in
* {@link TransportChannelProvider}. To set executor for {@link TransportChannelProvider},
* please use {@link TransportChannelProvider#withExecutor(Executor)}.
*/
public B setBackgroundExecutorProvider(ExecutorProvider executorProvider) {
stubSettings.setBackgroundExecutorProvider(executorProvider);
return self();
}

Expand Down Expand Up @@ -238,11 +264,29 @@ public B setWatchdogCheckInterval(@Nullable Duration checkInterval) {
return self();
}

/** Gets the ExecutorProvider that was previously set on this Builder. */
/**
* Gets the ExecutorProvider that was previously set on this Builder. This ExecutorProvider is
* to use for running asynchronous API call logic (such as retries and long-running operations),
* and also to pass to the transport settings if an executor is needed for the transport and it
* doesn't have its own executor provider.
*
* @deprecated Please use {@link #getBackgroundExecutorProvider()} for getting the executor
* provider that's used for running scheduled API call logic.
*/
@Deprecated
public ExecutorProvider getExecutorProvider() {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
return stubSettings.getExecutorProvider();
}

/**
* Gets the ExecutorProvider that was previously set on this Builder. This ExecutorProvider is
* to use for running asynchronous API call logic (such as retries and long-running operations).
* This ExecutorProvider is not used to set the executor in {@link TransportChannelProvider}.
*/
public ExecutorProvider getBackgroundExecutorProvider() {
return stubSettings.getBackgroundExecutorProvider();
}

/** Gets the TransportProvider that was previously set on this Builder. */
public TransportChannelProvider getTransportChannelProvider() {
return stubSettings.getTransportChannelProvider();
Expand Down Expand Up @@ -303,6 +347,7 @@ protected static void applyToAllUnaryMethods(
public String toString() {
return MoreObjects.toStringHelper(this)
.add("executorProvider", getExecutorProvider())
.add("backgroundExecutorProvider", getBackgroundExecutorProvider())
.add("transportChannelProvider", getTransportChannelProvider())
.add("credentialsProvider", getCredentialsProvider())
.add("headerProvider", getHeaderProvider())
Expand Down
Loading