Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ListChangeStreamPartitions callable #1312

Merged
merged 6 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory</className>
</difference>
<!-- InternalApi was renamed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable</className>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
Expand Down Expand Up @@ -1489,6 +1490,143 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return stub.readModifyWriteRowCallable();
}

/**
* Convenience method for synchronously streaming the partitions of a table. The returned
* ServerStream instance is not threadsafe, it can only be used from single thread.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* try {
* ServerStream<RowRange> stream = bigtableDataClient.listChangeStreamPartitions(tableId);
* int count = 0;
*
* // Iterator style
* for (RowRange partition : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
* }
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStream<RowRange> listChangeStreamPartitions(String tableId) {
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
return listChangeStreamPartitionsCallable().call(tableId);
}

/**
* Convenience method for asynchronously streaming the partitions of a table.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* bigtableDataClient.listChangeStreamPartitionsAsync(tableId, new ResponseObserver<RowRange>() {
* StreamController controller;
* int count = 0;
*
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(RowRange partition) {
* if (++count > 10) {
* controller.cancel();
* return;
* }
* // Do something with partition
* }
* public void onError(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onComplete() {
* // Handle stream completion
* }
* });
* }
* }</pre>
*/
@InternalApi("Used in Changestream beam pipeline.")
public void listChangeStreamPartitionsAsync(String tableId, ResponseObserver<RowRange> observer) {
listChangeStreamPartitionsCallable().call(tableId, observer);
}

/**
* Streams back the results of the query. The returned callable object allows for customization of
* api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* // Iterator style
* try {
* for(RowRange partition : bigtableDataClient.listChangeStreamPartitionsCallable().call(tableId)) {
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<RowRange> partitions = bigtableDataClient.listChangeStreamPartitionsCallable().all().call(tableId);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<RowRange> partitionFuture =
* bigtableDataClient.listChangeStreamPartitionsCallable().first().futureCall(tableId);
*
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<RowRange>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(RowRange result) {
* System.out.println("Got partition: " + result);
* }
* }, MoreExecutors.directExecutor());
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStreamingCallable<String, RowRange> listChangeStreamPartitionsCallable() {
return stub.listChangeStreamPartitionsCallable();
}

/** Close the clients and releases all associated resources. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
Expand All @@ -27,29 +27,30 @@
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
*/
@InternalApi
public final class ReadRowsConvertExceptionCallable<ReadRowsRequest, RowT>
extends ServerStreamingCallable<ReadRowsRequest, RowT> {
public final class ConvertStreamExceptionCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable;
private final ServerStreamingCallable<RequestT, ResponseT> innerCallable;

public ReadRowsConvertExceptionCallable(
ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable) {
public ConvertStreamExceptionCallable(
ServerStreamingCallable<RequestT, ResponseT> innerCallable) {
this.innerCallable = innerCallable;
}

@Override
public void call(
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
ReadRowsConvertExceptionResponseObserver<RowT> observer =
new ReadRowsConvertExceptionResponseObserver<>(responseObserver);
RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
ConvertStreamExceptionResponseObserver<ResponseT> observer =
new ConvertStreamExceptionResponseObserver<>(responseObserver);
innerCallable.call(request, observer, context);
}

private class ReadRowsConvertExceptionResponseObserver<RowT> implements ResponseObserver<RowT> {
private class ConvertStreamExceptionResponseObserver<ResponseT>
implements ResponseObserver<ResponseT> {

private final ResponseObserver<RowT> outerObserver;
private final ResponseObserver<ResponseT> outerObserver;

ReadRowsConvertExceptionResponseObserver(ResponseObserver<RowT> outerObserver) {
ConvertStreamExceptionResponseObserver(ResponseObserver<ResponseT> outerObserver) {
this.outerObserver = outerObserver;
}

Expand All @@ -59,7 +60,7 @@ public void onStart(StreamController controller) {
}

@Override
public void onResponse(RowT response) {
public void onResponse(ResponseT response) {
outerObserver.onResponse(response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
Expand All @@ -55,6 +57,7 @@
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.Version;
Expand All @@ -70,6 +73,7 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.changestream.ListChangeStreamPartitionsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
Expand All @@ -84,7 +88,6 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
Expand Down Expand Up @@ -142,6 +145,8 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;

private final ServerStreamingCallable<String, RowRange> listChangeStreamPartitionsCallable;

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
settings = finalizeSettings(settings, Tags.getTagger(), Stats.getStatsRecorder());
Expand Down Expand Up @@ -284,6 +289,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext
bulkMutateRowsCallable = createBulkMutateRowsCallable();
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
listChangeStreamPartitionsCallable = createListChangeStreamPartitionsCallable();
}

// <editor-fold desc="Callable creators">
Expand Down Expand Up @@ -410,7 +416,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> convertException =
new ReadRowsConvertExceptionCallable<>(withStatsHeaders);
new ConvertStreamExceptionCallable<>(withStatsHeaders);

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(convertException, rowAdapter);
Expand Down Expand Up @@ -798,6 +804,76 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
}

/**
* Creates a callable chain to handle streaming ListChangeStreamPartitions RPCs. The chain will:
*
* <ul>
* <li>Convert a String format tableId into a {@link
* com.google.bigtable.v2.ListChangeStreamPartitionsRequest} and dispatch the RPC.
* <li>Upon receiving the response stream, it will convert the {@link
* com.google.bigtable.v2.ListChangeStreamPartitionsResponse}s into {@link RowRange}.
* </ul>
*/
private ServerStreamingCallable<String, RowRange> createListChangeStreamPartitionsCallable() {
ServerStreamingCallable<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings
.<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
newBuilder()
.setMethodDescriptor(BigtableGrpc.getListChangeStreamPartitionsMethod())
.setParamsExtractor(
new RequestParamsExtractor<ListChangeStreamPartitionsRequest>() {
@Override
public Map<String, String> extract(
ListChangeStreamPartitionsRequest listChangeStreamPartitionsRequest) {
return ImmutableMap.of(
"table_name",
listChangeStreamPartitionsRequest.getTableName(),
"app_profile_id",
listChangeStreamPartitionsRequest.getAppProfileId());
}
})
.build(),
settings.listChangeStreamPartitionsSettings().getRetryableCodes());

ServerStreamingCallable<String, RowRange> userCallable =
new ListChangeStreamPartitionsUserCallable(base, requestContext);

tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
ServerStreamingCallable<String, RowRange> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(userCallable);

// Sometimes ListChangeStreamPartitions connections are disconnected via an RST frame. This
// error is transient and should be treated similar to UNAVAILABLE. However, this exception
// has an INTERNAL error code which by default is not retryable. Convert the exception so it
// can be retried in the client.
ServerStreamingCallable<String, RowRange> convertException =
new ConvertStreamExceptionCallable<>(withStatsHeaders);

// Copy idle timeout settings for watchdog.
ServerStreamingCallSettings<String, RowRange> innerSettings =
ServerStreamingCallSettings.<String, RowRange>newBuilder()
.setRetryableCodes(settings.listChangeStreamPartitionsSettings().getRetryableCodes())
.setRetrySettings(settings.listChangeStreamPartitionsSettings().getRetrySettings())
.setIdleTimeout(settings.listChangeStreamPartitionsSettings().getIdleTimeout())
.build();

ServerStreamingCallable<String, RowRange> watched =
Callables.watched(convertException, innerSettings, clientContext);

ServerStreamingCallable<String, RowRange> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, RowRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);

SpanName span = getSpanName("ListChangeStreamPartitions");
ServerStreamingCallable<String, RowRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Wraps a callable chain in a user presentable callable that will inject the default call context
* and trace the call.
Expand Down Expand Up @@ -854,6 +930,11 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return readModifyWriteRowCallable;
}

/** Returns a streaming list change stream partitions callable */
public ServerStreamingCallable<String, RowRange> listChangeStreamPartitionsCallable() {
return listChangeStreamPartitionsCallable;
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Loading