Skip to content

Commit

Permalink
feat: Change CDC related APIs to return ByteStringRange instead of Ro… (
Browse files Browse the repository at this point in the history
googleapis#1355)

* feat: Change CDC related APIs to return ByteStringRange instead of RowRange

1. GenerateInitialChangeStreamPartitions
2. ChangeStreamContinuationToken::GetRowRange

* fix: Fix tests

* fix: Address comments

Co-authored-by: Teng Zhong <tengzhong@google.com>
  • Loading branch information
tengzhonger and Teng Zhong committed Sep 2, 2022
1 parent 76d9c45 commit fd1804f
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -1503,11 +1503,11 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
* String tableId = "[TABLE]";
*
* try {
* ServerStream<RowRange> stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId);
* ServerStream<ByteStringRange> stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId);
* int count = 0;
*
* // Iterator style
* for (RowRange partition : stream) {
* for (ByteStringRange partition : stream) {
* if (++count > 10) {
* stream.cancel();
* break;
Expand All @@ -1525,7 +1525,7 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStream<RowRange> generateInitialChangeStreamPartitions(String tableId) {
public ServerStream<ByteStringRange> generateInitialChangeStreamPartitions(String tableId) {
return generateInitialChangeStreamPartitionsCallable().call(tableId);
}

Expand All @@ -1545,7 +1545,7 @@ public ServerStream<RowRange> generateInitialChangeStreamPartitions(String table
* public void onStart(StreamController controller) {
* this.controller = controller;
* }
* public void onResponse(RowRange partition) {
* public void onResponse(ByteStringRange partition) {
* if (++count > 10) {
* controller.cancel();
* return;
Expand All @@ -1568,7 +1568,7 @@ public ServerStream<RowRange> generateInitialChangeStreamPartitions(String table
*/
@InternalApi("Used in Changestream beam pipeline.")
public void generateInitialChangeStreamPartitionsAsync(
String tableId, ResponseObserver<RowRange> observer) {
String tableId, ResponseObserver<ByteStringRange> observer) {
generateInitialChangeStreamPartitionsCallable().call(tableId, observer);
}

Expand All @@ -1584,7 +1584,7 @@ public void generateInitialChangeStreamPartitionsAsync(
*
* // Iterator style
* try {
* for(RowRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) {
* for(ByteStringRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) {
* // Do something with partition
* }
* } catch (NotFoundException e) {
Expand All @@ -1595,18 +1595,18 @@ public void generateInitialChangeStreamPartitionsAsync(
*
* // Sync style
* try {
* List<RowRange> partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
* List<ByteStringRange> partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId);
* } catch (NotFoundException e) {
* System.out.println("Tried to read a non-existent table");
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Point look up
* ApiFuture<RowRange> partitionFuture =
* ApiFuture<ByteStringRange> partitionFuture =
* bigtableDataClient.generateInitialChangeStreamPartitionsCallable().first().futureCall(tableId);
*
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<RowRange>() {
* ApiFutures.addCallback(partitionFuture, new ApiFutureCallback<ByteStringRange>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
Expand All @@ -1626,7 +1626,8 @@ public void generateInitialChangeStreamPartitionsAsync(
* @see ServerStreamingCallable For call styles.
*/
@InternalApi("Used in Changestream beam pipeline.")
public ServerStreamingCallable<String, RowRange> generateInitialChangeStreamPartitionsCallable() {
public ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable() {
return stub.generateInitialChangeStreamPartitionsCallable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ public ChangeStreamContinuationToken(
.build();
}

// TODO: Change this to return ByteStringRange.
public RowRange getRowRange() {
return this.tokenProto.getPartition().getRowRange();
/**
* Get the partition of the current continuation token, represented by a {@link ByteStringRange}.
*/
public ByteStringRange getPartition() {
return ByteStringRange.create(
this.tokenProto.getPartition().getRowRange().getStartKeyClosed(),
this.tokenProto.getPartition().getRowRange().getEndKeyOpen());
}

public String getToken() {
Expand Down Expand Up @@ -95,19 +99,19 @@ public boolean equals(Object o) {
return false;
}
ChangeStreamContinuationToken otherToken = (ChangeStreamContinuationToken) o;
return Objects.equal(getRowRange(), otherToken.getRowRange())
return Objects.equal(getPartition(), otherToken.getPartition())
&& Objects.equal(getToken(), otherToken.getToken());
}

@Override
public int hashCode() {
return Objects.hashCode(getRowRange(), getToken());
return Objects.hashCode(getPartition(), getToken());
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("rowRange", getRowRange())
.add("partition", getPartition())
.add("token", getToken())
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.bigtable.v2.RowRange;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
Expand Down Expand Up @@ -395,6 +398,22 @@ private void writeObject(ObjectOutputStream output) throws IOException {
output.defaultWriteObject();
}

@InternalApi("Used in Changestream beam pipeline.")
public static ByteString toByteString(ByteStringRange byteStringRange) {
return RowRange.newBuilder()
.setStartKeyClosed(byteStringRange.getStart())
.setEndKeyOpen(byteStringRange.getEnd())
.build()
.toByteString();
}

@InternalApi("Used in Changestream beam pipeline.")
public static ByteStringRange toByteStringRange(ByteString byteString)
throws InvalidProtocolBufferException {
RowRange rowRange = RowRange.newBuilder().mergeFrom(byteString).build();
return ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen());
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -160,7 +161,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;

private final ServerStreamingCallable<String, RowRange>
private final ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable;

private final ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecord>
Expand Down Expand Up @@ -851,7 +852,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
* RowRange}.
* </ul>
*/
private ServerStreamingCallable<String, RowRange>
private ServerStreamingCallable<String, ByteStringRange>
createGenerateInitialChangeStreamPartitionsCallable() {
ServerStreamingCallable<
GenerateInitialChangeStreamPartitionsRequest,
Expand Down Expand Up @@ -880,22 +881,22 @@ public Map<String, String> extract(
.build(),
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes());

ServerStreamingCallable<String, RowRange> userCallable =
ServerStreamingCallable<String, ByteStringRange> userCallable =
new GenerateInitialChangeStreamPartitionsUserCallable(base, requestContext);

ServerStreamingCallable<String, RowRange> withStatsHeaders =
ServerStreamingCallable<String, ByteStringRange> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(userCallable);

// Sometimes GenerateInitialChangeStreamPartitions connections are disconnected via an RST
// frame. This error is transient and should be treated similar to UNAVAILABLE. However, this
// exception has an INTERNAL error code which by default is not retryable. Convert the exception
// so it can be retried in the client.
ServerStreamingCallable<String, RowRange> convertException =
ServerStreamingCallable<String, ByteStringRange> convertException =
new ConvertStreamExceptionCallable<>(withStatsHeaders);

// Copy idle timeout settings for watchdog.
ServerStreamingCallSettings<String, RowRange> innerSettings =
ServerStreamingCallSettings.<String, RowRange>newBuilder()
ServerStreamingCallSettings<String, ByteStringRange> innerSettings =
ServerStreamingCallSettings.<String, ByteStringRange>newBuilder()
.setRetryableCodes(
settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes())
.setRetrySettings(
Expand All @@ -904,17 +905,17 @@ public Map<String, String> extract(
settings.generateInitialChangeStreamPartitionsSettings().getIdleTimeout())
.build();

ServerStreamingCallable<String, RowRange> watched =
ServerStreamingCallable<String, ByteStringRange> watched =
Callables.watched(convertException, innerSettings, clientContext);

ServerStreamingCallable<String, RowRange> withBigtableTracer =
ServerStreamingCallable<String, ByteStringRange> withBigtableTracer =
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, RowRange> retrying =
ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);

SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
ServerStreamingCallable<String, RowRange> traced =
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
Expand Down Expand Up @@ -1080,7 +1081,8 @@ UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
}

/** Returns a streaming generate initial change stream partitions callable */
public ServerStreamingCallable<String, RowRange> generateInitialChangeStreamPartitionsCallable() {
public ServerStreamingCallable<String, ByteStringRange>
generateInitialChangeStreamPartitionsCallable() {
return generateInitialChangeStreamPartitionsCallable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.auth.Credentials;
import com.google.bigtable.v2.PingAndWarmRequest;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.Version;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -218,7 +218,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final UnaryCallSettings<ReadModifyWriteRow, Row> readModifyWriteRowSettings;
private final UnaryCallSettings<PingAndWarmRequest, Void> pingAndWarmSettings;

private final ServerStreamingCallSettings<String, RowRange>
private final ServerStreamingCallSettings<String, ByteStringRange>
generateInitialChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings<ReadChangeStreamQuery, ChangeStreamRecord>
readChangeStreamSettings;
Expand Down Expand Up @@ -556,7 +556,7 @@ UnaryCallSettings<PingAndWarmRequest, Void> pingAndWarmSettings() {
return pingAndWarmSettings;
}

public ServerStreamingCallSettings<String, RowRange>
public ServerStreamingCallSettings<String, ByteStringRange>
generateInitialChangeStreamPartitionsSettings() {
return generateInitialChangeStreamPartitionsSettings;
}
Expand Down Expand Up @@ -592,7 +592,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private final UnaryCallSettings.Builder<ReadModifyWriteRow, Row> readModifyWriteRowSettings;
private final UnaryCallSettings.Builder<PingAndWarmRequest, Void> pingAndWarmSettings;

private final ServerStreamingCallSettings.Builder<String, RowRange>
private final ServerStreamingCallSettings.Builder<String, ByteStringRange>
generateInitialChangeStreamPartitionsSettings;
private final ServerStreamingCallSettings.Builder<ReadChangeStreamQuery, ChangeStreamRecord>
readChangeStreamSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest;
import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;

/**
* Simple wrapper for GenerateInitialChangeStreamPartitions to wrap the request and response
* protobufs.
*/
public class GenerateInitialChangeStreamPartitionsUserCallable
extends ServerStreamingCallable<String, RowRange> {
extends ServerStreamingCallable<String, ByteStringRange> {
private final RequestContext requestContext;
private final ServerStreamingCallable<
GenerateInitialChangeStreamPartitionsRequest,
Expand All @@ -49,7 +49,7 @@ public GenerateInitialChangeStreamPartitionsUserCallable(

@Override
public void call(
String tableId, ResponseObserver<RowRange> responseObserver, ApiCallContext context) {
String tableId, ResponseObserver<ByteStringRange> responseObserver, ApiCallContext context) {
String tableName =
NameUtil.formatTableName(
requestContext.getProjectId(), requestContext.getInstanceId(), tableId);
Expand All @@ -62,12 +62,12 @@ public void call(
inner.call(request, new ConvertPartitionToRangeObserver(responseObserver), context);
}

private class ConvertPartitionToRangeObserver
private static class ConvertPartitionToRangeObserver
implements ResponseObserver<GenerateInitialChangeStreamPartitionsResponse> {

private final ResponseObserver<RowRange> outerObserver;
private final ResponseObserver<ByteStringRange> outerObserver;

ConvertPartitionToRangeObserver(ResponseObserver<RowRange> observer) {
ConvertPartitionToRangeObserver(ResponseObserver<ByteStringRange> observer) {
this.outerObserver = observer;
}

Expand All @@ -78,12 +78,11 @@ public void onStart(final StreamController controller) {

@Override
public void onResponse(GenerateInitialChangeStreamPartitionsResponse response) {
RowRange rowRange =
RowRange.newBuilder()
.setStartKeyClosed(response.getPartition().getRowRange().getStartKeyClosed())
.setEndKeyOpen(response.getPartition().getRowRange().getEndKeyOpen())
.build();
outerObserver.onResponse(rowRange);
ByteStringRange byteStringRange =
ByteStringRange.create(
response.getPartition().getRowRange().getStartKeyClosed(),
response.getPartition().getRowRange().getEndKeyOpen());
outerObserver.onResponse(byteStringRange);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.RowRange;
import com.google.cloud.bigtable.data.v2.models.BulkMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.Filters.Filter;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
import com.google.cloud.bigtable.data.v2.models.Mutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery;
import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow;
import com.google.cloud.bigtable.data.v2.models.Row;
Expand Down Expand Up @@ -83,7 +83,7 @@ public class BigtableDataClientTests {
@Mock private Batcher<ByteString, Row> mockBulkReadRowsBatcher;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ServerStreamingCallable<String, RowRange>
private ServerStreamingCallable<String, ByteStringRange>
mockGenerateInitialChangeStreamPartitionsCallable;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
Expand Down Expand Up @@ -342,7 +342,7 @@ public void proxyGenerateInitialChangeStreamPartitionsAsyncTest() {
.thenReturn(mockGenerateInitialChangeStreamPartitionsCallable);

@SuppressWarnings("unchecked")
ResponseObserver<RowRange> mockObserver = Mockito.mock(ResponseObserver.class);
ResponseObserver<ByteStringRange> mockObserver = Mockito.mock(ResponseObserver.class);
bigtableDataClient.generateInitialChangeStreamPartitionsAsync("fake-table", mockObserver);

Mockito.verify(mockGenerateInitialChangeStreamPartitionsCallable)
Expand Down
Loading

0 comments on commit fd1804f

Please sign in to comment.