Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Cloud Bigtable Changestream #1569

Merged
merged 31 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6a4444f
feat: copy preview Change Streams API (#1309)
tonytanger Jul 14, 2022
c529f19
feat: Add ListChangeStreamPartitions callable (#1312)
tengzhonger Jul 20, 2022
39a7b58
feat: Create ReadChangeStreamQuery and ChangeStreamRecode::Heartbeat/…
tengzhonger Jul 26, 2022
53dd0f0
feat: Add ChangeStreamMutation which is a ChangeStreamRecord (#1324)
tengzhonger Aug 1, 2022
cb7b455
feat: Add ChangeStreamRecordAdapter and ChangeStreamStateMachine (#1334)
tengzhonger Aug 3, 2022
c612cf6
feat: Add readChangeStream callables (#1338)
tengzhonger Aug 8, 2022
9b30758
feat: Expose some package-private methods to be used by CDC beam code…
tengzhonger Aug 9, 2022
f1176ae
feat: Implement ReadChangeStreamResumptionStrategy (#1344)
tengzhonger Aug 10, 2022
bb5c0c0
feat: Add toByteString/fromByteString for ChangeStreamContinuationTok…
tengzhonger Aug 11, 2022
2a4e786
feat!: rename ListChangeStreamPartitions to GenerateInitialChangeStre…
tonytanger Aug 12, 2022
c3086d9
feat: Change CDC related APIs to return ByteStringRange instead of Ro…
tengzhonger Aug 15, 2022
351a151
feat: Return MutationType and bigtable.common.Status instead of raw p…
tengzhonger Aug 16, 2022
c631fa4
feat: Expose CDC data API settings in EnhancedBigtableStubSettings (#…
tengzhonger Sep 1, 2022
9e5994f
chore: pull in changes from main branch (#1379)
tengzhonger Sep 7, 2022
e0f5d84
chore: pull in changes from main branch (#1544)
jackdingilian Dec 13, 2022
053cba6
fix: resolve merge conflict in samples/native-image-sample/pom.xml (#…
jackdingilian Dec 13, 2022
7f0e9df
feat: Cdc rebase (#1566)
tengzhonger Jan 9, 2023
1ead0cd
Merge remote-tracking branch 'origin/main' into cdc
Jan 9, 2023
0a12471
Address comments
Jan 10, 2023
46193f9
fix test
Jan 10, 2023
613cfec
Merge branch 'main' into cdc
Jan 10, 2023
6108c4e
Correct the grpc & proto version
Jan 10, 2023
aff6021
Address comments
Jan 12, 2023
7ad8ac4
Address comments
Feb 7, 2023
9a278d0
Merge remote-tracking branch 'origin/main' into cdc
Feb 7, 2023
ab311fa
Update grpc version
Feb 7, 2023
6907ff3
Update comment
Feb 7, 2023
dc16628
Address comments
Feb 8, 2023
8f7c6dd
Merge remote-tracking branch 'origin/main' into cdc
Feb 10, 2023
00421c7
Delete accidentally added dependency-reduced-pom.xml file
Feb 10, 2023
3cb1dfc
Fix test
Feb 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google-cloud-bigtable-stats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
<version>2.17.2-SNAPSHOT</version><!-- {x-version-update:google-cloud-bigtable:current} -->
<description>Experimental project to shade OpenCensus dependencies.</description>

<properties>
<cloud.monitoring.version>3.4.1</cloud.monitoring.version>
</properties>

tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
<dependencyManagement>
<dependencies>
<dependency>
Expand Down
5 changes: 5 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracerFactory</className>
</difference>
<!-- InternalApi was renamed -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable</className>
</difference>
<!-- InternalApi that was removed -->
<difference>
<differenceType>8001</differenceType>
Expand Down
33 changes: 33 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
<!-- This is used by bigtable-prod-batch-it profile to ensure that tests work on the batch endpoint.
Also, this property will be augmented by `internal-bigtable-prod-batch-it-prop-helper` profile -->
<bigtable.cfe-data-batch-endpoint>batch-bigtable.googleapis.com:443</bigtable.cfe-data-batch-endpoint>

<grpc.version>1.44.0</grpc.version>
<protobuf.version>3.19.3</protobuf.version>
<protoc.version>${protobuf.version}</protoc.version>
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -593,7 +597,36 @@
</profiles>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.0</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
Expand Down Expand Up @@ -1489,6 +1492,298 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
return stub.readModifyWriteRowCallable();
}

/**
* Convenience method for synchronously streaming the partitions of a table. The returned
* ServerStream instance is not threadsafe, it can only be used from single thread.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* try {
* ServerStream<ByteStringRange> stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId);
* int count = 0;
*
* // Iterator style
* for (ByteStringRange partition : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
* }
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
tengzhonger marked this conversation as resolved.
Show resolved Hide resolved
public ServerStream<ByteStringRange> generateInitialChangeStreamPartitions(String tableId) {
return generateInitialChangeStreamPartitionsCallable().call(tableId);
}

/**
* Convenience method for asynchronously streaming the partitions of a table.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* bigtableDataClient.generateInitialChangeStreamPartitionsAsync(tableId, new ResponseObserver<RowRange>() {
* StreamController controller;
* int count = 0;
*
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(ByteStringRange partition) {
* if (++count > 10) {
* controller.cancel();
* return;
* }
* // Do something with partition
* }
* public void onError(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onComplete() {
* // Handle stream completion
* }
* });
* }
* }</pre>
*/
@InternalApi("Used in Changestream beam pipeline.")
public void generateInitialChangeStreamPartitionsAsync(
String tableId, ResponseObserver<ByteStringRange> observer) {
generateInitialChangeStreamPartitionsCallable().call(tableId, observer);
}

/**
* Streams back the results of the query. The returned callable object allows for customization of
* api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* // Iterator style
* try {
* for(ByteStringRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) {
* // Do something with partition
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<ByteStringRange> partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<ByteStringRange> partitionFuture =
* bigtableDataClient.generateInitialChangeStreamPartitionsCallable().first().futureCall(tableId);
*
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<ByteStringRange>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(RowRange result) {
* System.out.println("Got partition: " + result);
* }
* }, MoreExecutors.directExecutor());
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable() {
return stub.generateInitialChangeStreamPartitionsCallable();
}

/**
* Convenience method for synchronously streaming the results of a {@link ReadChangeStreamQuery}.
* The returned ServerStream instance is not threadsafe, it can only be used from single thread.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* ReadChangeStreamQuery query = ReadChangeStreamQuery.create(tableId)
* .streamPartition("START_KEY", "END_KEY")
* .startTime(Timestamp.newBuilder().setSeconds(100).build());
*
* try {
* ServerStream<ChangeStreamRecord> stream = bigtableDataClient.readChangeStream(query);
* int count = 0;
*
* // Iterator style
* for (ChangeStreamRecord record : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
* }
* // Do something with the change stream record.
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
* @see ReadChangeStreamQuery For query options.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStream<ChangeStreamRecord> readChangeStream(ReadChangeStreamQuery query) {
return readChangeStreamCallable().call(query);
}

/**
* Convenience method for asynchronously streaming the results of a {@link ReadChangeStreamQuery}.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* ReadChangeStreamQuery query = ReadChangeStreamQuery.create(tableId)
* .streamPartition("START_KEY", "END_KEY")
* .startTime(Timestamp.newBuilder().setSeconds(100).build());
*
* bigtableDataClient.readChangeStreamAsync(query, new ResponseObserver<ChangeStreamRecord>() {
* StreamController controller;
* int count = 0;
*
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(ChangeStreamRecord record) {
* if (++count > 10) {
* controller.cancel();
* return;
* }
* // Do something with the change stream record.
* }
* public void onError(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onComplete() {
* // Handle stream completion
* }
* });
* }
* }</pre>
*/
@InternalApi("Used in Changestream beam pipeline.")
public void readChangeStreamAsync(
ReadChangeStreamQuery query, ResponseObserver<ChangeStreamRecord> observer) {
readChangeStreamCallable().call(query, observer);
}

/**
* Streams back the results of the query. The returned callable object allows for customization of
* api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create("[PROJECT]", "[INSTANCE]")) {
* String tableId = "[TABLE]";
*
* ReadChangeStreamQuery query = ReadChangeStreamQuery.create(tableId)
* .streamPartition("START_KEY", "END_KEY")
* .startTime(Timestamp.newBuilder().setSeconds(100).build());
*
* // Iterator style
* try {
* for(ChangeStreamRecord record : bigtableDataClient.readChangeStreamCallable().call(query)) {
* // Do something with record
* }
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Sync style
* try {
* List<ChangeStreamRecord> records = bigtableDataClient.readChangeStreamCallable().all().call(query);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<ChangeStreamRecord> recordFuture =
* bigtableDataClient.readChangeStreamCallable().first().futureCall(query);
*
* ApiFutures.addCallback(recordFuture, new ApiFutureCallback<ChangeStreamRecord>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(ChangeStreamRecord result) {
* System.out.println("Got record: " + result);
* }
* }, MoreExecutors.directExecutor());
*
* // etc
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
* @see ReadChangeStreamQuery For query options.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecord>
readChangeStreamCallable() {
return stub.readChangeStreamCallable();
}

/** Close the clients and releases all associated resources. */
@Override
public void close() {
Expand Down
Loading