Skip to content

Commit

Permalink
feat: add gRPC-GCP channel pool as an option (#1227)
Browse files Browse the repository at this point in the history
* add grpc-gcp extensions

* remove local files

* update read apiconfig

* update read apiconfig

* Add custom pool size for GCP extension

* add one more entry in aip config file

* change gcp package name

* Adjust grpc-gcp package name and group

* Set grpc-gcp low streams watermark to 1

This allows sessions to be spread across channels when the Spanner client starts up.

* Add gRPC-GCP extension options to SpannerOptions

* feat: add gRPC-GCP channel pool as an option

This enables a user to opt-in for using the gRPC-GCP extension channel pool and configure its options.

* Addressed comments.

* Fixed linting issues.

* Add integration test with enabled gRPC-GCP extension

* Remake gRPC-GCP extension related SpannerOptions

* Add ChannelUsageTest

Co-authored-by: Zhenli(Jenny) Jiang <jennyjiang@google.com>
Co-authored-by: Knut Olav Løite <koloite@gmail.com>
  • Loading branch information
3 people authored Jun 29, 2021
1 parent b2a56c6 commit 1fa95a9
Show file tree
Hide file tree
Showing 6 changed files with 569 additions and 2 deletions.
6 changes: 6 additions & 0 deletions google-cloud-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@
</build>

<dependencies>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>grpc-gcp</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.InstanceAdminSettings;
Expand Down Expand Up @@ -103,6 +102,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final InstanceAdminStubSettings instanceAdminStubSettings;
private final DatabaseAdminStubSettings databaseAdminStubSettings;
private final Duration partitionedDmlTimeout;
private final boolean grpcGcpExtensionEnabled;
private final GcpManagedChannelOptions grpcGcpOptions;
private final boolean autoThrottleAdministrativeRequests;
private final RetrySettings retryAdministrativeRequestsSettings;
private final boolean trackTransactionStarter;
Expand Down Expand Up @@ -554,6 +555,8 @@ private SpannerOptions(Builder builder) {
throw SpannerExceptionFactory.newSpannerException(e);
}
partitionedDmlTimeout = builder.partitionedDmlTimeout;
grpcGcpExtensionEnabled = builder.grpcGcpExtensionEnabled;
grpcGcpOptions = builder.grpcGcpOptions;
autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests;
retryAdministrativeRequestsSettings = builder.retryAdministrativeRequestsSettings;
trackTransactionStarter = builder.trackTransactionStarter;
Expand Down Expand Up @@ -658,6 +661,8 @@ public static class Builder
private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder =
DatabaseAdminStubSettings.newBuilder();
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
private boolean grpcGcpExtensionEnabled = false;
private GcpManagedChannelOptions grpcGcpOptions;
private RetrySettings retryAdministrativeRequestsSettings =
DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS;
private boolean autoThrottleAdministrativeRequests = false;
Expand Down Expand Up @@ -707,6 +712,8 @@ private Builder() {
this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder();
this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder();
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
this.grpcGcpExtensionEnabled = options.grpcGcpExtensionEnabled;
this.grpcGcpOptions = options.grpcGcpOptions;
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.retryAdministrativeRequestsSettings = options.retryAdministrativeRequestsSettings;
this.trackTransactionStarter = options.trackTransactionStarter;
Expand Down Expand Up @@ -1035,6 +1042,28 @@ public Builder setHost(String host) {
return this;
}

/** Enables gRPC-GCP extension with the default settings. */
public Builder enableGrpcGcpExtension() {
this.grpcGcpExtensionEnabled = true;
return this;
}

/**
* Enables gRPC-GCP extension and uses provided options for configuration. The metric registry
* and default Spanner metric labels will be added automatically.
*/
public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) {
this.grpcGcpExtensionEnabled = true;
this.grpcGcpOptions = options;
return this;
}

/** Disables gRPC-GCP extension. */
public Builder disableGrpcGcpExtension() {
this.grpcGcpExtensionEnabled = false;
return this;
}

/**
* Sets the host of an emulator to use. By default the value is read from an environment
* variable. If the environment variable is not set, this will be <code>null</code>.
Expand Down Expand Up @@ -1128,6 +1157,14 @@ public Duration getPartitionedDmlTimeout() {
return partitionedDmlTimeout;
}

public boolean isGrpcGcpExtensionEnabled() {
return grpcGcpExtensionEnabled;
}

public GcpManagedChannelOptions getGrpcGcpOptions() {
return grpcGcpOptions;
}

public boolean isAutoThrottleAdministrativeRequests() {
return autoThrottleAdministrativeRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
Expand Down Expand Up @@ -54,6 +55,9 @@
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.grpc.GcpManagedChannelBuilder;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException;
import com.google.cloud.spanner.ErrorCode;
Expand All @@ -80,6 +84,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.Resources;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.iam.v1.GetIamPolicyRequest;
Expand Down Expand Up @@ -156,10 +161,13 @@
import com.google.spanner.v1.Transaction;
import io.grpc.CallCredentials;
import io.grpc.Context;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.opencensus.metrics.Metrics;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -249,6 +257,7 @@ private void awaitTermination() throws InterruptedException {
private static final String CLIENT_LIBRARY_LANGUAGE = "spanner-java";
public static final String DEFAULT_USER_AGENT =
CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class);
private static final String API_FILE = "grpc-gcp-apiconfig.json";

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
Expand Down Expand Up @@ -368,6 +377,9 @@ public GapicSpannerRpc(final SpannerOptions options) {
// whether the attempt is allowed is totally controlled by service owner.
.setAttemptDirectPath(true);

// If it is enabled in options uses the channel pool provided by the gRPC-GCP extension.
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);

TransportChannelProvider channelProvider =
MoreObjects.firstNonNull(
options.getChannelProvider(), defaultChannelProviderBuilder.build());
Expand Down Expand Up @@ -509,6 +521,62 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
}
}

private static String parseGrpcGcpApiConfig() {
try {
return Resources.toString(
GapicSpannerRpc.class.getResource(API_FILE), Charset.forName("UTF8"));
} catch (IOException e) {
throw newSpannerException(e);
}
}

// Enhance metric options for gRPC-GCP extension. Adds metric registry if not specified.
private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) {
GcpManagedChannelOptions grpcGcpOptions =
MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions());
GcpMetricsOptions metricsOptions =
MoreObjects.firstNonNull(
grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build());
GcpMetricsOptions.Builder metricsOptionsBuilder = GcpMetricsOptions.newBuilder(metricsOptions);
if (metricsOptions.getMetricRegistry() == null) {
metricsOptionsBuilder.withMetricRegistry(Metrics.getMetricRegistry());
}
// TODO: Add default labels with values: client_id, database, instance_id.
if (metricsOptions.getNamePrefix().equals("")) {
metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/");
}
return GcpManagedChannelOptions.newBuilder(grpcGcpOptions)
.withMetricsOptions(metricsOptionsBuilder.build())
.build();
}

@SuppressWarnings("rawtypes")
private static void maybeEnableGrpcGcpExtension(
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder,
final SpannerOptions options) {
if (!options.isGrpcGcpExtensionEnabled()) {
return;
}

final String jsonApiConfig = parseGrpcGcpApiConfig();
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetrics(options);

ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
channelBuilder -> {
if (options.getChannelConfigurator() != null) {
channelBuilder = options.getChannelConfigurator().apply(channelBuilder);
}
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(grpcGcpOptions)
.setPoolSize(options.getNumChannels());
};

// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
// Enable gRPC-GCP channel pool via the channel configurator.
defaultChannelProviderBuilder.setPoolSize(1).setChannelConfigurator(apiFunction);
}

private static HeaderProvider headerProviderWithUserAgentFrom(HeaderProvider headerProvider) {
final Map<String, String> headersWithUserAgent = new HashMap<>(headerProvider.getHeaders());
String userAgent = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"channelPool": {
"maxSize": 3,
"maxConcurrentStreamsLowWatermark": 0
},
"method": [
{
"name": ["google.spanner.v1.Spanner/CreateSession"],
"affinity" : {
"command": "BIND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/BatchCreateSessions"],
"affinity" : {
"command": "BIND",
"affinityKey": "session.name"
}
},
{
"name": ["google.spanner.v1.Spanner/GetSession"],
"affinity": {
"command": "BOUND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/DeleteSession"],
"affinity": {
"command": "UNBIND",
"affinityKey": "name"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteSql"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteBatchDml"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/ExecuteStreamingSql"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Read"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/StreamingRead"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/BeginTransaction"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Commit"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/PartitionRead"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/PartitionQuery"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
},
{
"name": ["google.spanner.v1.Spanner/Rollback"],
"affinity": {
"command": "BOUND",
"affinityKey": "session"
}
}
]
}
Loading

0 comments on commit 1fa95a9

Please sign in to comment.