diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index 7a00d5154f4..8045edd6d3a 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -117,6 +117,12 @@ + + + com.google.cloud + grpc-gcp + 1.0.0 + io.grpc grpc-api diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 3274048c61a..a58d93720ae 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -29,10 +29,9 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceRpc; import com.google.cloud.TransportOptions; +import com.google.cloud.grpc.GcpManagedChannelOptions; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.Options.QueryOption; -import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; -import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator; import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings; import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; import com.google.cloud.spanner.admin.instance.v1.InstanceAdminSettings; @@ -103,6 +102,8 @@ public class SpannerOptions extends ServiceOptions { private final InstanceAdminStubSettings instanceAdminStubSettings; private final DatabaseAdminStubSettings databaseAdminStubSettings; private final Duration partitionedDmlTimeout; + private final boolean grpcGcpExtensionEnabled; + private final GcpManagedChannelOptions grpcGcpOptions; private final boolean autoThrottleAdministrativeRequests; private final RetrySettings retryAdministrativeRequestsSettings; private final boolean trackTransactionStarter; @@ -554,6 +555,8 @@ private SpannerOptions(Builder builder) { throw SpannerExceptionFactory.newSpannerException(e); } partitionedDmlTimeout = builder.partitionedDmlTimeout; + grpcGcpExtensionEnabled = builder.grpcGcpExtensionEnabled; + grpcGcpOptions = builder.grpcGcpOptions; autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests; retryAdministrativeRequestsSettings = builder.retryAdministrativeRequestsSettings; trackTransactionStarter = builder.trackTransactionStarter; @@ -658,6 +661,8 @@ public static class Builder private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder = DatabaseAdminStubSettings.newBuilder(); private Duration partitionedDmlTimeout = Duration.ofHours(2L); + private boolean grpcGcpExtensionEnabled = false; + private GcpManagedChannelOptions grpcGcpOptions; private RetrySettings retryAdministrativeRequestsSettings = DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS; private boolean autoThrottleAdministrativeRequests = false; @@ -707,6 +712,8 @@ private Builder() { this.instanceAdminStubSettingsBuilder = options.instanceAdminStubSettings.toBuilder(); this.databaseAdminStubSettingsBuilder = options.databaseAdminStubSettings.toBuilder(); this.partitionedDmlTimeout = options.partitionedDmlTimeout; + this.grpcGcpExtensionEnabled = options.grpcGcpExtensionEnabled; + this.grpcGcpOptions = options.grpcGcpOptions; this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests; this.retryAdministrativeRequestsSettings = options.retryAdministrativeRequestsSettings; this.trackTransactionStarter = options.trackTransactionStarter; @@ -1035,6 +1042,28 @@ public Builder setHost(String host) { return this; } + /** Enables gRPC-GCP extension with the default settings. */ + public Builder enableGrpcGcpExtension() { + this.grpcGcpExtensionEnabled = true; + return this; + } + + /** + * Enables gRPC-GCP extension and uses provided options for configuration. The metric registry + * and default Spanner metric labels will be added automatically. + */ + public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) { + this.grpcGcpExtensionEnabled = true; + this.grpcGcpOptions = options; + return this; + } + + /** Disables gRPC-GCP extension. */ + public Builder disableGrpcGcpExtension() { + this.grpcGcpExtensionEnabled = false; + return this; + } + /** * Sets the host of an emulator to use. By default the value is read from an environment * variable. If the environment variable is not set, this will be null. @@ -1128,6 +1157,14 @@ public Duration getPartitionedDmlTimeout() { return partitionedDmlTimeout; } + public boolean isGrpcGcpExtensionEnabled() { + return grpcGcpExtensionEnabled; + } + + public GcpManagedChannelOptions getGrpcGcpOptions() { + return grpcGcpOptions; + } + public boolean isAutoThrottleAdministrativeRequests() { return autoThrottleAdministrativeRequests; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index ef7966beed1..0f97898a40a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; import com.google.api.core.NanoClock; @@ -54,6 +55,9 @@ import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; +import com.google.cloud.grpc.GcpManagedChannelBuilder; +import com.google.cloud.grpc.GcpManagedChannelOptions; +import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.AdminRequestsPerMinuteExceededException; import com.google.cloud.spanner.ErrorCode; @@ -80,6 +84,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.io.Resources; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.iam.v1.GetIamPolicyRequest; @@ -156,10 +161,13 @@ import com.google.spanner.v1.Transaction; import io.grpc.CallCredentials; import io.grpc.Context; +import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; +import io.opencensus.metrics.Metrics; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.HashMap; @@ -249,6 +257,7 @@ private void awaitTermination() throws InterruptedException { private static final String CLIENT_LIBRARY_LANGUAGE = "spanner-java"; public static final String DEFAULT_USER_AGENT = CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class); + private static final String API_FILE = "grpc-gcp-apiconfig.json"; private final ManagedInstantiatingExecutorProvider executorProvider; private boolean rpcIsClosed; @@ -368,6 +377,9 @@ public GapicSpannerRpc(final SpannerOptions options) { // whether the attempt is allowed is totally controlled by service owner. .setAttemptDirectPath(true); + // If it is enabled in options uses the channel pool provided by the gRPC-GCP extension. + maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options); + TransportChannelProvider channelProvider = MoreObjects.firstNonNull( options.getChannelProvider(), defaultChannelProviderBuilder.build()); @@ -509,6 +521,62 @@ public UnaryCallable createUnaryCalla } } + private static String parseGrpcGcpApiConfig() { + try { + return Resources.toString( + GapicSpannerRpc.class.getResource(API_FILE), Charset.forName("UTF8")); + } catch (IOException e) { + throw newSpannerException(e); + } + } + + // Enhance metric options for gRPC-GCP extension. Adds metric registry if not specified. + private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) { + GcpManagedChannelOptions grpcGcpOptions = + MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions()); + GcpMetricsOptions metricsOptions = + MoreObjects.firstNonNull( + grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build()); + GcpMetricsOptions.Builder metricsOptionsBuilder = GcpMetricsOptions.newBuilder(metricsOptions); + if (metricsOptions.getMetricRegistry() == null) { + metricsOptionsBuilder.withMetricRegistry(Metrics.getMetricRegistry()); + } + // TODO: Add default labels with values: client_id, database, instance_id. + if (metricsOptions.getNamePrefix().equals("")) { + metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/"); + } + return GcpManagedChannelOptions.newBuilder(grpcGcpOptions) + .withMetricsOptions(metricsOptionsBuilder.build()) + .build(); + } + + @SuppressWarnings("rawtypes") + private static void maybeEnableGrpcGcpExtension( + InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder, + final SpannerOptions options) { + if (!options.isGrpcGcpExtensionEnabled()) { + return; + } + + final String jsonApiConfig = parseGrpcGcpApiConfig(); + final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetrics(options); + + ApiFunction apiFunction = + channelBuilder -> { + if (options.getChannelConfigurator() != null) { + channelBuilder = options.getChannelConfigurator().apply(channelBuilder); + } + return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder) + .withApiConfigJsonString(jsonApiConfig) + .withOptions(grpcGcpOptions) + .setPoolSize(options.getNumChannels()); + }; + + // Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1. + // Enable gRPC-GCP channel pool via the channel configurator. + defaultChannelProviderBuilder.setPoolSize(1).setChannelConfigurator(apiFunction); + } + private static HeaderProvider headerProviderWithUserAgentFrom(HeaderProvider headerProvider) { final Map headersWithUserAgent = new HashMap<>(headerProvider.getHeaders()); String userAgent = null; diff --git a/google-cloud-spanner/src/main/resources/com/google/cloud/spanner/spi/v1/grpc-gcp-apiconfig.json b/google-cloud-spanner/src/main/resources/com/google/cloud/spanner/spi/v1/grpc-gcp-apiconfig.json new file mode 100644 index 00000000000..1761bd2d382 --- /dev/null +++ b/google-cloud-spanner/src/main/resources/com/google/cloud/spanner/spi/v1/grpc-gcp-apiconfig.json @@ -0,0 +1,106 @@ +{ + "channelPool": { + "maxSize": 3, + "maxConcurrentStreamsLowWatermark": 0 + }, + "method": [ + { + "name": ["google.spanner.v1.Spanner/CreateSession"], + "affinity" : { + "command": "BIND", + "affinityKey": "name" + } + }, + { + "name": ["google.spanner.v1.Spanner/BatchCreateSessions"], + "affinity" : { + "command": "BIND", + "affinityKey": "session.name" + } + }, + { + "name": ["google.spanner.v1.Spanner/GetSession"], + "affinity": { + "command": "BOUND", + "affinityKey": "name" + } + }, + { + "name": ["google.spanner.v1.Spanner/DeleteSession"], + "affinity": { + "command": "UNBIND", + "affinityKey": "name" + } + }, + { + "name": ["google.spanner.v1.Spanner/ExecuteSql"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/ExecuteBatchDml"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/ExecuteStreamingSql"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/Read"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/StreamingRead"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/BeginTransaction"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/Commit"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/PartitionRead"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/PartitionQuery"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + }, + { + "name": ["google.spanner.v1.Spanner/Rollback"], + "affinity": { + "command": "BOUND", + "affinityKey": "session" + } + } + ] +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java new file mode 100644 index 00000000000..e390e581206 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java @@ -0,0 +1,235 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ListValue; +import com.google.spanner.v1.ResultSetMetadata; +import com.google.spanner.v1.SpannerGrpc; +import com.google.spanner.v1.StructType; +import com.google.spanner.v1.StructType.Field; +import com.google.spanner.v1.TypeCode; +import io.grpc.Attributes; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests that the Spanner client opens multiple channels, and that each session is associated with + * one specific channel. + */ +@RunWith(Parameterized.class) +public class ChannelUsageTest { + + @Parameter(0) + public int numChannels; + + @Parameter(1) + public boolean enableGcpPool; + + @Parameters(name = "num channels = {0}, enable GCP pool = {1}") + public static Collection data() { + return Arrays.asList( + new Object[][] {{1, true}, {1, false}, {2, true}, {2, false}, {4, true}, {4, false}}); + } + + private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); + private static final ResultSetMetadata SELECT1_METADATA = + ResultSetMetadata.newBuilder() + .setRowType( + StructType.newBuilder() + .addFields( + Field.newBuilder() + .setName("COL1") + .setType( + com.google.spanner.v1.Type.newBuilder() + .setCode(TypeCode.INT64) + .build()) + .build()) + .build()) + .build(); + private static final com.google.spanner.v1.ResultSet SELECT1_RESULTSET = + com.google.spanner.v1.ResultSet.newBuilder() + .addRows( + ListValue.newBuilder() + .addValues(com.google.protobuf.Value.newBuilder().setStringValue("1").build()) + .build()) + .setMetadata(SELECT1_METADATA) + .build(); + + private static MockSpannerServiceImpl mockSpanner; + private static Server server; + private static InetSocketAddress address; + private static final Set batchCreateSessionLocalIps = + ConcurrentHashMap.newKeySet(); + private static final Set executeSqlLocalIps = ConcurrentHashMap.newKeySet(); + + @BeforeClass + public static void startServer() throws IOException { + mockSpanner = new MockSpannerServiceImpl(); + mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions. + mockSpanner.putStatementResult(StatementResult.query(SELECT1, SELECT1_RESULTSET)); + + address = new InetSocketAddress("localhost", 0); + server = + NettyServerBuilder.forAddress(address) + .addService(mockSpanner) + // Add a server interceptor to register the remote addresses that we are seeing. This + // indicates how many channels are used client side to communicate with the server. + .intercept( + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + Metadata headers, + ServerCallHandler next) { + Attributes attributes = call.getAttributes(); + @SuppressWarnings({"unchecked", "deprecation"}) + Attributes.Key key = + (Attributes.Key) + attributes.keys().stream() + .filter(k -> k.toString().equals("remote-addr")) + .findFirst() + .orElse(null); + if (key != null) { + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { + batchCreateSessionLocalIps.add(attributes.get(key)); + } + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { + executeSqlLocalIps.add(attributes.get(key)); + } + } + return Contexts.interceptCall(Context.current(), call, headers, next); + } + }) + .build() + .start(); + } + + @AfterClass + public static void stopServer() throws InterruptedException { + server.shutdown(); + server.awaitTermination(); + } + + @After + public void reset() { + mockSpanner.reset(); + batchCreateSessionLocalIps.clear(); + executeSqlLocalIps.clear(); + } + + private SpannerOptions createSpannerOptions() { + String endpoint = address.getHostString() + ":" + server.getPort(); + SpannerOptions.Builder builder = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelConfigurator( + input -> { + input.usePlaintext(); + return input; + }) + .setNumChannels(numChannels) + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setMinSessions(numChannels * 2) + .setMaxSessions(numChannels * 2) + .build()) + .setHost("http://" + endpoint) + .setCredentials(NoCredentials.getInstance()); + if (enableGcpPool) { + builder.enableGrpcGcpExtension(); + } + + return builder.build(); + } + + @Test + public void testCreatesNumChannels() { + try (Spanner spanner = createSpannerOptions().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + } + assertEquals(numChannels, batchCreateSessionLocalIps.size()); + } + + @Test + public void testUsesAllChannels() throws InterruptedException, ExecutionException { + try (Spanner spanner = createSpannerOptions().getService()) { + DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + ListeningExecutorService executor = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numChannels * 2)); + CountDownLatch latch = new CountDownLatch(numChannels * 2); + List> futures = new ArrayList<>(numChannels * 2); + for (int run = 0; run < numChannels * 2; run++) { + futures.add( + executor.submit( + () -> { + try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + latch.countDown(); + try { + return latch.await(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.asSpannerException(e); + } + } + } + })); + } + assertEquals(numChannels * 2, Futures.allAsList(futures).get().size()); + } + assertEquals(numChannels, executeSqlLocalIps.size()); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWithGrpcGcpTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWithGrpcGcpTest.java new file mode 100644 index 00000000000..8fdf687e444 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWithGrpcGcpTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.it; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ParallelIntegrationTest; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TimestampBound; +import com.google.cloud.spanner.testing.RemoteSpannerHelper; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for using gRPC-GCP extension. */ +@Category(ParallelIntegrationTest.class) +@RunWith(JUnit4.class) +public class ITWithGrpcGcpTest { + + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); + private static final String TABLE_NAME = "TestTable"; + private static final List ALL_COLUMNS = Arrays.asList("Key", "StringValue"); + + private static RemoteSpannerHelper testHelper; + private static Database db; + private static DatabaseClient client; + + @BeforeClass + public static void setUpDatabase() { + // Get default spanner options for an integration test. + SpannerOptions.Builder builder = env.getTestHelper().getOptions().toBuilder(); + builder.enableGrpcGcpExtension(); + + // Create a new testHelper with the gRPC-GCP extension enabled. + testHelper = RemoteSpannerHelper.create(builder.build(), env.getTestHelper().getInstanceId()); + + db = + env.getTestHelper() + .createTestDatabase( + "CREATE TABLE " + + TABLE_NAME + + " (" + + " Key STRING(MAX) NOT NULL," + + " StringValue STRING(MAX)," + + ") PRIMARY KEY (Key)"); + client = testHelper.getDatabaseClient(db); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 3; ++i) { + mutations.add( + Mutation.newInsertOrUpdateBuilder(TABLE_NAME) + .set("Key") + .to("k" + i) + .set("StringValue") + .to("v" + i) + .build()); + } + client.write(mutations); + } + + @Test + public void singleRead() { + Struct row = + client.singleUse(TimestampBound.strong()).readRow(TABLE_NAME, Key.of("k1"), ALL_COLUMNS); + assertThat(row).isNotNull(); + assertThat(row.getString(0)).isEqualTo("k1"); + assertThat(row.getString(1)).isEqualTo("v1"); + } + + @Test + public void usingTransaction() { + final Long updatedCount = + client + .readWriteTransaction() + .run( + transaction -> + transaction.executeUpdate( + Statement.of( + "UPDATE " + TABLE_NAME + " SET StringValue='v2upd' WHERE Key='k2'"))); + assertThat(updatedCount).isEqualTo(1L); + + Struct row = + client.singleUse(TimestampBound.strong()).readRow(TABLE_NAME, Key.of("k2"), ALL_COLUMNS); + assertThat(row).isNotNull(); + assertThat(row.getString(0)).isEqualTo("k2"); + assertThat(row.getString(1)).isEqualTo("v2upd"); + } +}