Skip to content

Commit

Permalink
feat: Add ListChangeStreamPartitions callable (googleapis#1312)
Browse files Browse the repository at this point in the history
* feat: Add ListChangeStreamPartitions callable

* feat: Change return type of ListChangeStreamPartitions to RowRange

* feat: Fix format for ListChangeStreamPartitions

* fix: Address comments for ListChangeStreamPartitionsCallable

* feat: Add comments for IntervalApi for ListChangeStreamPartitions

* feat: Ignore renaming of ReadRowsConvertExceptionCallable

Co-authored-by: Teng Zhong <tengzhong@google.com>
  • Loading branch information
tengzhonger and Teng Zhong committed Sep 2, 2022
1 parent d02f0e5 commit 5021a5e
Show file tree
Hide file tree
Showing 10 changed files with 640 additions and 2 deletions.
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory</className>
</difference>
<!-- InternalApi was renamed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable</className>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
Expand Down Expand Up @@ -1489,6 +1490,143 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return stub.readModifyWriteRowCallable();
}

/**
* Convenience method for synchronously streaming the partitions of a table. The returned
* ServerStream instance is not threadsafe, it can only be used from single thread.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* try {
* ServerStream<RowRange> stream = bigtableDataClient.listChangeStreamPartitions(tableId);
* int count = 0;
*
* // Iterator style
* for (RowRange partition : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
* }
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStream<RowRange> listChangeStreamPartitions(String tableId) {
return listChangeStreamPartitionsCallable().call(tableId);
}

/**
* Convenience method for asynchronously streaming the partitions of a table.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* bigtableDataClient.listChangeStreamPartitionsAsync(tableId, new ResponseObserver<RowRange>() {
* StreamController controller;
* int count = 0;
*
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(RowRange partition) {
* if (++count > 10) {
* controller.cancel();
* return;
* }
* // Do something with partition
* }
* public void onError(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onComplete() {
* // Handle stream completion
* }
* });
* }
* }</pre>
*/
@InternalApi("Used in Changestream beam pipeline.")
public void listChangeStreamPartitionsAsync(String tableId, ResponseObserver<RowRange> observer) {
listChangeStreamPartitionsCallable().call(tableId, observer);
}

/**
* Streams back the results of the query. The returned callable object allows for customization of
* api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* // Iterator style
* try {
* for(RowRange partition : bigtableDataClient.listChangeStreamPartitionsCallable().call(tableId)) {
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<RowRange> partitions = bigtableDataClient.listChangeStreamPartitionsCallable().all().call(tableId);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<RowRange> partitionFuture =
* bigtableDataClient.listChangeStreamPartitionsCallable().first().futureCall(tableId);
*
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<RowRange>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(RowRange result) {
* System.out.println("Got partition: " + result);
* }
* }, MoreExecutors.directExecutor());
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStreamingCallable<String, RowRange> listChangeStreamPartitionsCallable() {
return stub.listChangeStreamPartitionsCallable();
}

/** Close the clients and releases all associated resources. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;

/**
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
*/
final class ConvertStreamExceptionCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<RequestT, ResponseT> innerCallable;

public ConvertStreamExceptionCallable(
ServerStreamingCallable<RequestT, ResponseT> innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public void call(
RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
ConvertStreamExceptionResponseObserver<ResponseT> observer =
new ConvertStreamExceptionResponseObserver<>(responseObserver);
innerCallable.call(request, observer, context);
}

private class ConvertStreamExceptionResponseObserver<ResponseT>
implements ResponseObserver<ResponseT> {

private final ResponseObserver<ResponseT> outerObserver;

ConvertStreamExceptionResponseObserver(ResponseObserver<ResponseT> outerObserver) {
this.outerObserver = outerObserver;
}

@Override
public void onStart(StreamController controller) {
outerObserver.onStart(controller);
}

@Override
public void onResponse(ResponseT response) {
outerObserver.onResponse(response);
}

@Override
public void onError(Throwable t) {
outerObserver.onError(convertException(t));
}

@Override
public void onComplete() {
outerObserver.onComplete();
}
}

private Throwable convertException(Throwable t) {
// Long lived connections sometimes are disconnected via an RST frame. This error is
// transient and should be retried.
if (t instanceof InternalException && t.getMessage() != null) {
String error = t.getMessage().toLowerCase();
if (error.contains("rst_stream") || error.contains("rst stream")) {
return new InternalException(t, ((InternalException) t).getStatusCode(), true);
}
}
return t;
}
}
Loading

0 comments on commit 5021a5e

Please sign in to comment.