Skip to content

Commit

Permalink
Add ListChangeStreamPartitions callable
Browse files Browse the repository at this point in the history
  • Loading branch information
Teng Zhong committed Jul 15, 2022
1 parent 6a4444f commit feaec40
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
Expand Down Expand Up @@ -1489,6 +1490,141 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return stub.readModifyWriteRowCallable();
}

/**
* Convenience method for synchronously streaming the partitions of a table. The returned
* ServerStream instance is not threadsafe, it can only be used from single thread.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* try {
* ServerStream<ByteStringRange> stream = bigtableDataClient.listChangeStreamPartitions(tableId);
* int count = 0;
*
* // Iterator style
* for (ByteStringRange partition : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
* }
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
public ServerStream<ByteStringRange> listChangeStreamPartitions(String tableId) {
return listChangeStreamPartitionsCallable().call(tableId);
}

/**
* Convenience method for asynchronously streaming the partitions of a table.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* bigtableDataClient.listChangeStreamPartitionsAsync(tableId, new ResponseObserver<ByteStringRange>() {
* StreamController controller;
* int count = 0;
*
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(ByteStringRange partition) {
* if (++count > 10) {
* controller.cancel();
* return;
* }
* // Do something with partition
* }
* public void onError(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onComplete() {
* // Handle stream completion
* }
* });
* }
* }</pre>
*/
public void listChangeStreamPartitionsAsync(
String tableId, ResponseObserver<ByteStringRange> observer) {
listChangeStreamPartitionsCallable().call(tableId, observer);
}

/**
* Streams back the results of the query. The returned callable object allows for customization of
* api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* // Iterator style
* try {
* for(ByteStringRange partition : bigtableDataClient.listChangeStreamPartitionsCallable().call(tableId)) {
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<ByteStringRange> partitions = bigtableDataClient.listChangeStreamPartitionsCallable().all().call(tableId);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<ByteStringRange> partitionFuture =
* bigtableDataClient.listChangeStreamPartitionsCallable().first().futureCall(tableId);
*
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<ByteStringRange>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(ByteStringRange result) {
* System.out.println("Got partition: " + result);
* }
* }, MoreExecutors.directExecutor());
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
public ServerStreamingCallable<String, ByteStringRange> listChangeStreamPartitionsCallable() {
return stub.listChangeStreamPartitionsCallable();
}

/** Close the clients and releases all associated resources. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.CheckAndMutateRowRequest;
import com.google.bigtable.v2.CheckAndMutateRowResponse;
import com.google.bigtable.v2.ListChangeStreamPartitionsRequest;
import com.google.bigtable.v2.ListChangeStreamPartitionsResponse;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.MutateRowsRequest;
Expand All @@ -65,11 +67,13 @@
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.changestream.ListChangeStreamPartitionsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
Expand Down Expand Up @@ -142,6 +146,8 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;

private final ServerStreamingCallable<String, ByteStringRange> listChangeStreamPartitionsCallable;

public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
throws IOException {
settings = finalizeSettings(settings, Tags.getTagger(), Stats.getStatsRecorder());
Expand Down Expand Up @@ -284,6 +290,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext
bulkMutateRowsCallable = createBulkMutateRowsCallable();
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
listChangeStreamPartitionsCallable = createListChangeStreamPartitionsCallable();
}

// <editor-fold desc="Callable creators">
Expand Down Expand Up @@ -798,6 +805,71 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
}

/**
* Creates a callable chain to handle streaming ListChangeStreamPartitions RPCs. The chain will:
*
* <ul>
* <li>Convert a String format tableId into a {@link com.google.bigtable.v2.ReadChangeStreamRequest} and
* dispatch the RPC.
* <li>Upon receiving the response stream, it will convert the {@link
* com.google.bigtable.v2.ListChangeStreamPartitionsResponse}s into {@link ByteStringRange}.
* </ul>
*/
private ServerStreamingCallable<String, ByteStringRange>
createListChangeStreamPartitionsCallable() {
ServerStreamingCallable<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings
.<ListChangeStreamPartitionsRequest, ListChangeStreamPartitionsResponse>
newBuilder()
.setMethodDescriptor(BigtableGrpc.getListChangeStreamPartitionsMethod())
.setParamsExtractor(
new RequestParamsExtractor<ListChangeStreamPartitionsRequest>() {
@Override
public Map<String, String> extract(
ListChangeStreamPartitionsRequest listChangeStreamPartitionsRequest) {
return ImmutableMap.of(
"table_name",
listChangeStreamPartitionsRequest.getTableName(),
"app_profile_id",
listChangeStreamPartitionsRequest.getAppProfileId());
}
})
.build(),
settings.listChangeStreamPartitionsSettings().getRetryableCodes());

ServerStreamingCallable<String, ByteStringRange> userCallable =
new ListChangeStreamPartitionsUserCallable(base, requestContext);

ServerStreamingCallable<String, ByteStringRange> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(userCallable);

// Copy settings for the middle String -> ByteStringRange callable (as opposed to the inner
// ListChangeStreamPartitionsRequest -> ListChangeStreamPartitionsResponse callable).
ServerStreamingCallSettings<String, ByteStringRange> innerSettings =
ServerStreamingCallSettings.<String, ByteStringRange>newBuilder()
.setRetryableCodes(settings.listChangeStreamPartitionsSettings().getRetryableCodes())
.setRetrySettings(settings.listChangeStreamPartitionsSettings().getRetrySettings())
.setIdleTimeout(settings.listChangeStreamPartitionsSettings().getIdleTimeout())
.build();

ServerStreamingCallable<String, ByteStringRange> watched =
Callables.watched(withStatsHeaders, innerSettings, clientContext);

ServerStreamingCallable<String, ByteStringRange> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);

SpanName span = getSpanName("ListChangeStreamPartitions");
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Wraps a callable chain in a user presentable callable that will inject the default call context
* and trace the call.
Expand Down Expand Up @@ -854,6 +926,11 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return readModifyWriteRowCallable;
}

/** Returns a streaming list change stream partitions callable */
public ServerStreamingCallable<String, ByteStringRange> listChangeStreamPartitionsCallable() {
return listChangeStreamPartitionsCallable;
}
// </editor-fold>

private SpanName getSpanName(String methodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
Expand Down Expand Up @@ -137,6 +138,22 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
.setTotalTimeout(Duration.ofMinutes(10))
.build();

private static final Set<Code> LIST_CHANGE_STREAM_PARTITIONS_RETRY_CODES =
ImmutableSet.<Code>builder().addAll(IDEMPOTENT_RETRY_CODES).add(Code.ABORTED).build();

private static final RetrySettings LIST_CHANGE_STREAM_PARTITIONS_RETRY_SETTINGS =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(10))
.setRetryDelayMultiplier(2.0)
.setMaxRetryDelay(Duration.ofMinutes(1))
.setMaxAttempts(10)
.setJittered(true)
.setInitialRpcTimeout(Duration.ofMinutes(5))
.setRpcTimeoutMultiplier(2.0)
.setMaxRpcTimeout(Duration.ofMinutes(5))
.setTotalTimeout(Duration.ofHours(12))
.build();

/**
* Scopes that are equivalent to JWT's audience.
*
Expand Down Expand Up @@ -174,6 +191,9 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final UnaryCallSettings<ConditionalRowMutation, Boolean> checkAndMutateRowSettings;
private final UnaryCallSettings<ReadModifyWriteRow, Row> readModifyWriteRowSettings;

private final ServerStreamingCallSettings<String, ByteStringRange>
listChangeStreamPartitionsSettings;

private EnhancedBigtableStubSettings(Builder builder) {
super(builder);

Expand Down Expand Up @@ -208,6 +228,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
bulkReadRowsSettings = builder.bulkReadRowsSettings.build();
checkAndMutateRowSettings = builder.checkAndMutateRowSettings.build();
readModifyWriteRowSettings = builder.readModifyWriteRowSettings.build();
listChangeStreamPartitionsSettings = builder.listChangeStreamPartitionsSettings.build();
}

/** Create a new builder. */
Expand Down Expand Up @@ -491,6 +512,10 @@ public UnaryCallSettings<ReadModifyWriteRow, Row> readModifyWriteRowSettings() {
return readModifyWriteRowSettings;
}

public ServerStreamingCallSettings<String, ByteStringRange> listChangeStreamPartitionsSettings() {
return listChangeStreamPartitionsSettings;
}

/** Returns a builder containing all the values of this settings class. */
public Builder toBuilder() {
return new Builder(this);
Expand All @@ -516,6 +541,9 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
checkAndMutateRowSettings;
private final UnaryCallSettings.Builder<ReadModifyWriteRow, Row> readModifyWriteRowSettings;

private final ServerStreamingCallSettings.Builder<String, ByteStringRange>
listChangeStreamPartitionsSettings;

/**
* Initializes a new Builder with sane defaults for all settings.
*
Expand Down Expand Up @@ -626,6 +654,12 @@ private Builder() {

readModifyWriteRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
copyRetrySettings(baseDefaults.readModifyWriteRowSettings(), readModifyWriteRowSettings);

listChangeStreamPartitionsSettings = ServerStreamingCallSettings.newBuilder();
listChangeStreamPartitionsSettings
.setRetryableCodes(LIST_CHANGE_STREAM_PARTITIONS_RETRY_CODES)
.setRetrySettings(LIST_CHANGE_STREAM_PARTITIONS_RETRY_SETTINGS)
.setIdleTimeout(Duration.ofMinutes(5));
}

private Builder(EnhancedBigtableStubSettings settings) {
Expand All @@ -646,6 +680,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
bulkReadRowsSettings = settings.bulkReadRowsSettings.toBuilder();
checkAndMutateRowSettings = settings.checkAndMutateRowSettings.toBuilder();
readModifyWriteRowSettings = settings.readModifyWriteRowSettings.toBuilder();
listChangeStreamPartitionsSettings = settings.listChangeStreamPartitionsSettings.toBuilder();
}
// <editor-fold desc="Private Helpers">

Expand Down Expand Up @@ -857,6 +892,7 @@ public String toString() {
.add("bulkReadRowsSettings", bulkReadRowsSettings)
.add("checkAndMutateRowSettings", checkAndMutateRowSettings)
.add("readModifyWriteRowSettings", readModifyWriteRowSettings)
.add("listChangeStreamPartitionsSettings", listChangeStreamPartitionsSettings)
.add("parent", super.toString())
.toString();
}
Expand Down
Loading

0 comments on commit feaec40

Please sign in to comment.