From feaec4040da6a1a0f4a472dfcf87b0c55638eed2 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Fri, 15 Jul 2022 17:15:06 -0400 Subject: [PATCH] Add ListChangeStreamPartitions callable --- .../bigtable/data/v2/BigtableDataClient.java | 136 ++++++++++++++++++ .../data/v2/stub/EnhancedBigtableStub.java | 77 ++++++++++ .../v2/stub/EnhancedBigtableStubSettings.java | 36 +++++ ...istChangeStreamPartitionsUserCallable.java | 92 ++++++++++++ .../data/v2/BigtableDataClientTests.java | 34 +++++ ...hangeStreamPartitionsUserCallableTest.java | 83 +++++++++++ 6 files changed, 458 insertions(+) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallable.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallableTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java index ce9a57fa7e..424d6fe50f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java @@ -35,6 +35,7 @@ import com.google.cloud.bigtable.data.v2.models.Filters.Filter; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowAdapter; @@ -1489,6 +1490,141 @@ public UnaryCallable readModifyWriteRowCallable() { return stub.readModifyWriteRowCallable(); } + /** + * Convenience method for synchronously streaming the partitions of a table. The returned + * ServerStream instance is not threadsafe, it can only be used from single thread. + * + *

Sample code: + * + *

{@code
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
+   *   String tableId = "[TABLE]";
+   *
+   *   try {
+   *     ServerStream stream = bigtableDataClient.listChangeStreamPartitions(tableId);
+   *     int count = 0;
+   *
+   *     // Iterator style
+   *     for (ByteStringRange partition : stream) {
+   *       if (++count > 10) {
+   *         stream.cancel();
+   *         break;
+   *       }
+   *       // Do something with partition
+   *     }
+   *   } catch (NotFoundException e) {
+   *     System.out.println("Tried to read a non-existent table");
+   *   } catch (RuntimeException e) {
+   *     e.printStackTrace();
+   *   }
+   * }
+   * }
+ * + * @see ServerStreamingCallable For call styles. + */ + public ServerStream listChangeStreamPartitions(String tableId) { + return listChangeStreamPartitionsCallable().call(tableId); + } + + /** + * Convenience method for asynchronously streaming the partitions of a table. + * + *

Sample code: + * + *

{@code
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
+   *   String tableId = "[TABLE]";
+   *
+   *   bigtableDataClient.listChangeStreamPartitionsAsync(tableId, new ResponseObserver() {
+   *     StreamController controller;
+   *     int count = 0;
+   *
+   *     public void onStart(StreamController controller) {
+   *       this.controller = controller;
+   *     }
+   *     public void onResponse(ByteStringRange partition) {
+   *       if (++count > 10) {
+   *         controller.cancel();
+   *         return;
+   *       }
+   *       // Do something with partition
+   *     }
+   *     public void onError(Throwable t) {
+   *       if (t instanceof NotFoundException) {
+   *         System.out.println("Tried to read a non-existent table");
+   *       } else {
+   *         t.printStackTrace();
+   *       }
+   *     }
+   *     public void onComplete() {
+   *       // Handle stream completion
+   *     }
+   *   });
+   * }
+   * }
+ */ + public void listChangeStreamPartitionsAsync( + String tableId, ResponseObserver observer) { + listChangeStreamPartitionsCallable().call(tableId, observer); + } + + /** + * Streams back the results of the query. The returned callable object allows for customization of + * api invocation. + * + *

Sample code: + * + *

{@code
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
+   *   String tableId = "[TABLE]";
+   *
+   *   // Iterator style
+   *   try {
+   *     for(ByteStringRange partition : bigtableDataClient.listChangeStreamPartitionsCallable().call(tableId)) {
+   *       // Do something with partition
+   *     }
+   *   } catch (NotFoundException e) {
+   *     System.out.println("Tried to read a non-existent table");
+   *   } catch (RuntimeException e) {
+   *     e.printStackTrace();
+   *   }
+   *
+   *   // Sync style
+   *   try {
+   *     List partitions = bigtableDataClient.listChangeStreamPartitionsCallable().all().call(tableId);
+   *   } catch (NotFoundException e) {
+   *     System.out.println("Tried to read a non-existent table");
+   *   } catch (RuntimeException e) {
+   *     e.printStackTrace();
+   *   }
+   *
+   *   // Point look up
+   *   ApiFuture partitionFuture =
+   *     bigtableDataClient.listChangeStreamPartitionsCallable().first().futureCall(tableId);
+   *
+   *   ApiFutures.addCallback(partitionFuture, new ApiFutureCallback() {
+   *     public void onFailure(Throwable t) {
+   *       if (t instanceof NotFoundException) {
+   *         System.out.println("Tried to read a non-existent table");
+   *       } else {
+   *         t.printStackTrace();
+   *       }
+   *     }
+   *     public void onSuccess(ByteStringRange result) {
+   *       System.out.println("Got partition: " + result);
+   *     }
+   *   }, MoreExecutors.directExecutor());
+   *
+   *   // etc
+   * }
+   * }
+ * + * @see ServerStreamingCallable For call styles. + */ + public ServerStreamingCallable listChangeStreamPartitionsCallable() { + return stub.listChangeStreamPartitionsCallable(); + } + /** Close the clients and releases all associated resources. */ @Override public void close() { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index ec237aabf7..0c36ac8f5f 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -47,6 +47,8 @@ import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.CheckAndMutateRowRequest; import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.ListChangeStreamPartitionsRequest; +import com.google.bigtable.v2.ListChangeStreamPartitionsResponse; import com.google.bigtable.v2.MutateRowRequest; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.MutateRowsRequest; @@ -65,11 +67,13 @@ import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import com.google.cloud.bigtable.data.v2.stub.changestream.ListChangeStreamPartitionsUserCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory; @@ -142,6 +146,8 @@ public class EnhancedBigtableStub implements AutoCloseable { private final UnaryCallable checkAndMutateRowCallable; private final UnaryCallable readModifyWriteRowCallable; + private final ServerStreamingCallable listChangeStreamPartitionsCallable; + public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings) throws IOException { settings = finalizeSettings(settings, Tags.getTagger(), Stats.getStatsRecorder()); @@ -284,6 +290,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext bulkMutateRowsCallable = createBulkMutateRowsCallable(); checkAndMutateRowCallable = createCheckAndMutateRowCallable(); readModifyWriteRowCallable = createReadModifyWriteRowCallable(); + listChangeStreamPartitionsCallable = createListChangeStreamPartitionsCallable(); } // @@ -798,6 +805,71 @@ public Map extract(ReadModifyWriteRowRequest request) { methodName, new ReadModifyWriteRowCallable(retrying, requestContext)); } + /** + * Creates a callable chain to handle streaming ListChangeStreamPartitions RPCs. The chain will: + * + *
    + *
  • Convert a String format tableId into a {@link com.google.bigtable.v2.ReadChangeStreamRequest} and + * dispatch the RPC. + *
  • Upon receiving the response stream, it will convert the {@link + * com.google.bigtable.v2.ListChangeStreamPartitionsResponse}s into {@link ByteStringRange}. + *
+ */ + private ServerStreamingCallable + createListChangeStreamPartitionsCallable() { + ServerStreamingCallable + base = + GrpcRawCallableFactory.createServerStreamingCallable( + GrpcCallSettings + . + newBuilder() + .setMethodDescriptor(BigtableGrpc.getListChangeStreamPartitionsMethod()) + .setParamsExtractor( + new RequestParamsExtractor() { + @Override + public Map extract( + ListChangeStreamPartitionsRequest listChangeStreamPartitionsRequest) { + return ImmutableMap.of( + "table_name", + listChangeStreamPartitionsRequest.getTableName(), + "app_profile_id", + listChangeStreamPartitionsRequest.getAppProfileId()); + } + }) + .build(), + settings.listChangeStreamPartitionsSettings().getRetryableCodes()); + + ServerStreamingCallable userCallable = + new ListChangeStreamPartitionsUserCallable(base, requestContext); + + ServerStreamingCallable withStatsHeaders = + new StatsHeadersServerStreamingCallable<>(userCallable); + + // Copy settings for the middle String -> ByteStringRange callable (as opposed to the inner + // ListChangeStreamPartitionsRequest -> ListChangeStreamPartitionsResponse callable). + ServerStreamingCallSettings innerSettings = + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.listChangeStreamPartitionsSettings().getRetryableCodes()) + .setRetrySettings(settings.listChangeStreamPartitionsSettings().getRetrySettings()) + .setIdleTimeout(settings.listChangeStreamPartitionsSettings().getIdleTimeout()) + .build(); + + ServerStreamingCallable watched = + Callables.watched(withStatsHeaders, innerSettings, clientContext); + + ServerStreamingCallable withBigtableTracer = + new BigtableTracerStreamingCallable<>(watched); + + ServerStreamingCallable retrying = + Callables.retrying(withBigtableTracer, innerSettings, clientContext); + + SpanName span = getSpanName("ListChangeStreamPartitions"); + ServerStreamingCallable traced = + new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span); + + return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + } + /** * Wraps a callable chain in a user presentable callable that will inject the default call context * and trace the call. @@ -854,6 +926,11 @@ public UnaryCallable checkAndMutateRowCallable( public UnaryCallable readModifyWriteRowCallable() { return readModifyWriteRowCallable; } + + /** Returns a streaming list change stream partitions callable */ + public ServerStreamingCallable listChangeStreamPartitionsCallable() { + return listChangeStreamPartitionsCallable; + } //
private SpanName getSpanName(String methodName) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 395ba52b08..e46061dde0 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -37,6 +37,7 @@ import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; @@ -137,6 +138,22 @@ public class EnhancedBigtableStubSettings extends StubSettings LIST_CHANGE_STREAM_PARTITIONS_RETRY_CODES = + ImmutableSet.builder().addAll(IDEMPOTENT_RETRY_CODES).add(Code.ABORTED).build(); + + private static final RetrySettings LIST_CHANGE_STREAM_PARTITIONS_RETRY_SETTINGS = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(10)) + .setRetryDelayMultiplier(2.0) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .setMaxAttempts(10) + .setJittered(true) + .setInitialRpcTimeout(Duration.ofMinutes(5)) + .setRpcTimeoutMultiplier(2.0) + .setMaxRpcTimeout(Duration.ofMinutes(5)) + .setTotalTimeout(Duration.ofHours(12)) + .build(); + /** * Scopes that are equivalent to JWT's audience. * @@ -174,6 +191,9 @@ public class EnhancedBigtableStubSettings extends StubSettings checkAndMutateRowSettings; private final UnaryCallSettings readModifyWriteRowSettings; + private final ServerStreamingCallSettings + listChangeStreamPartitionsSettings; + private EnhancedBigtableStubSettings(Builder builder) { super(builder); @@ -208,6 +228,7 @@ private EnhancedBigtableStubSettings(Builder builder) { bulkReadRowsSettings = builder.bulkReadRowsSettings.build(); checkAndMutateRowSettings = builder.checkAndMutateRowSettings.build(); readModifyWriteRowSettings = builder.readModifyWriteRowSettings.build(); + listChangeStreamPartitionsSettings = builder.listChangeStreamPartitionsSettings.build(); } /** Create a new builder. */ @@ -491,6 +512,10 @@ public UnaryCallSettings readModifyWriteRowSettings() { return readModifyWriteRowSettings; } + public ServerStreamingCallSettings listChangeStreamPartitionsSettings() { + return listChangeStreamPartitionsSettings; + } + /** Returns a builder containing all the values of this settings class. */ public Builder toBuilder() { return new Builder(this); @@ -516,6 +541,9 @@ public static class Builder extends StubSettings.Builder readModifyWriteRowSettings; + private final ServerStreamingCallSettings.Builder + listChangeStreamPartitionsSettings; + /** * Initializes a new Builder with sane defaults for all settings. * @@ -626,6 +654,12 @@ private Builder() { readModifyWriteRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); copyRetrySettings(baseDefaults.readModifyWriteRowSettings(), readModifyWriteRowSettings); + + listChangeStreamPartitionsSettings = ServerStreamingCallSettings.newBuilder(); + listChangeStreamPartitionsSettings + .setRetryableCodes(LIST_CHANGE_STREAM_PARTITIONS_RETRY_CODES) + .setRetrySettings(LIST_CHANGE_STREAM_PARTITIONS_RETRY_SETTINGS) + .setIdleTimeout(Duration.ofMinutes(5)); } private Builder(EnhancedBigtableStubSettings settings) { @@ -646,6 +680,7 @@ private Builder(EnhancedBigtableStubSettings settings) { bulkReadRowsSettings = settings.bulkReadRowsSettings.toBuilder(); checkAndMutateRowSettings = settings.checkAndMutateRowSettings.toBuilder(); readModifyWriteRowSettings = settings.readModifyWriteRowSettings.toBuilder(); + listChangeStreamPartitionsSettings = settings.listChangeStreamPartitionsSettings.toBuilder(); } // @@ -857,6 +892,7 @@ public String toString() { .add("bulkReadRowsSettings", bulkReadRowsSettings) .add("checkAndMutateRowSettings", checkAndMutateRowSettings) .add("readModifyWriteRowSettings", readModifyWriteRowSettings) + .add("listChangeStreamPartitionsSettings", listChangeStreamPartitionsSettings) .add("parent", super.toString()) .toString(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallable.java new file mode 100644 index 0000000000..b9c51b5635 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallable.java @@ -0,0 +1,92 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.bigtable.v2.*; +import com.google.cloud.bigtable.data.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.internal.RequestContext; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; + +/** Simple wrapper for ListChangeStreamPartitions to wrap the request and response protobufs. */ +public class ListChangeStreamPartitionsUserCallable + extends ServerStreamingCallable { + private final RequestContext requestContext; + private final ServerStreamingCallable< + ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse> + inner; + + public ListChangeStreamPartitionsUserCallable( + ServerStreamingCallable + inner, + RequestContext requestContext) { + this.requestContext = requestContext; + this.inner = inner; + } + + @Override + public void call( + String tableId, ResponseObserver responseObserver, ApiCallContext context) { + String tableName = + NameUtil.formatTableName( + requestContext.getProjectId(), requestContext.getInstanceId(), tableId); + ListChangeStreamPartitionsRequest request = + ListChangeStreamPartitionsRequest.newBuilder() + .setTableName(tableName) + .setAppProfileId(requestContext.getAppProfileId()) + .build(); + + inner.call(request, new ConvertPartitionToRangeObserver(responseObserver), context); + } + + private class ConvertPartitionToRangeObserver + implements ResponseObserver { + + private final ResponseObserver outerObserver; + + ConvertPartitionToRangeObserver(ResponseObserver observer) { + this.outerObserver = observer; + } + + @Override + public void onStart(final StreamController controller) { + outerObserver.onStart(controller); + } + + @Override + public void onResponse(ListChangeStreamPartitionsResponse response) { + ByteStringRange range = + ByteStringRange.unbounded() + .of( + response.getPartition().getRowRange().getStartKeyClosed(), + response.getPartition().getRowRange().getEndKeyOpen()); + outerObserver.onResponse(range); + } + + @Override + public void onError(Throwable t) { + outerObserver.onError(t); + } + + @Override + public void onComplete() { + outerObserver.onComplete(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java index 34c9a29d71..603e7e8b67 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java @@ -30,6 +30,7 @@ import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Mutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowCell; @@ -79,6 +80,9 @@ public class BigtableDataClientTests { @Mock private Batcher mockBulkMutationBatcher; @Mock private Batcher mockBulkReadRowsBatcher; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ServerStreamingCallable mockListChangeStreamPartitionsCallable; + private BigtableDataClient bigtableDataClient; @Before @@ -153,6 +157,14 @@ public void proxyReadRowsCallableTest() { assertThat(bigtableDataClient.readRowsCallable()).isSameInstanceAs(mockReadRowsCallable); } + @Test + public void proxyListChangeStreamPartitionsCallableTest() { + Mockito.when(mockStub.listChangeStreamPartitionsCallable()) + .thenReturn(mockListChangeStreamPartitionsCallable); + assertThat(bigtableDataClient.listChangeStreamPartitionsCallable()) + .isSameInstanceAs(mockListChangeStreamPartitionsCallable); + } + @Test public void proxyReadRowAsyncTest() { Mockito.when(mockStub.readRowCallable()).thenReturn(mockReadRowCallable); @@ -300,6 +312,28 @@ public void proxyReadRowsAsyncTest() { Mockito.verify(mockReadRowsCallable).call(query, mockObserver); } + @Test + public void proxyListChangeStreamPartitionsSyncTest() { + Mockito.when(mockStub.listChangeStreamPartitionsCallable()) + .thenReturn(mockListChangeStreamPartitionsCallable); + + bigtableDataClient.listChangeStreamPartitions("fake-table"); + + Mockito.verify(mockListChangeStreamPartitionsCallable).call("fake-table"); + } + + @Test + public void proxyListChangeStreamPartitionsAsyncTest() { + Mockito.when(mockStub.listChangeStreamPartitionsCallable()) + .thenReturn(mockListChangeStreamPartitionsCallable); + + @SuppressWarnings("unchecked") + ResponseObserver mockObserver = Mockito.mock(ResponseObserver.class); + bigtableDataClient.listChangeStreamPartitionsAsync("fake-table", mockObserver); + + Mockito.verify(mockListChangeStreamPartitionsCallable).call("fake-table", mockObserver); + } + @Test public void proxySampleRowKeysCallableTest() { Mockito.when(mockStub.sampleRowKeysCallable()).thenReturn(mockSampleRowKeysCallable); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallableTest.java new file mode 100644 index 0000000000..34ec1e98aa --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ListChangeStreamPartitionsUserCallableTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.ListChangeStreamPartitionsRequest; +import com.google.bigtable.v2.ListChangeStreamPartitionsResponse; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.internal.RequestContext; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi; +import com.google.common.collect.Lists; +import com.google.common.truth.Truth; +import com.google.protobuf.ByteString; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ListChangeStreamPartitionsUserCallableTest { + private final RequestContext requestContext = + RequestContext.create("my-project", "my-instance", "my-profile"); + + @Test + public void requestIsCorrect() { + FakeStreamingApi.ServerStreamingStashCallable< + ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse> + inner = new FakeStreamingApi.ServerStreamingStashCallable<>(Lists.newArrayList()); + ListChangeStreamPartitionsUserCallable listChangeStreamPartitionsUserCallable = + new ListChangeStreamPartitionsUserCallable(inner, requestContext); + + listChangeStreamPartitionsUserCallable.all().call("my-table"); + assertThat(inner.getActualRequest()) + .isEqualTo( + ListChangeStreamPartitionsRequest.newBuilder() + .setTableName( + NameUtil.formatTableName( + requestContext.getProjectId(), requestContext.getInstanceId(), "my-table")) + .setAppProfileId(requestContext.getAppProfileId()) + .build()); + } + + @Test + public void responseIsConverted() { + FakeStreamingApi.ServerStreamingStashCallable< + ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse> + inner = + new FakeStreamingApi.ServerStreamingStashCallable<>( + Lists.newArrayList( + ListChangeStreamPartitionsResponse.newBuilder() + .setPartition( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("apple")) + .setEndKeyOpen(ByteString.copyFromUtf8("banana")) + .build()) + .build()) + .build())); + ListChangeStreamPartitionsUserCallable listChangeStreamPartitionsUserCallable = + new ListChangeStreamPartitionsUserCallable(inner, requestContext); + + List results = listChangeStreamPartitionsUserCallable.all().call("my-table"); + Truth.assertThat(results).containsExactly(ByteStringRange.create("apple", "banana")); + } +}