Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a query paginator #1530

Merged
merged 9 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.1.4')
implementation platform('com.google.cloud:libraries-bom:26.1.5')

implementation 'com.google.cloud:google-cloud-bigtable'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.RowFilter;
Expand Down Expand Up @@ -248,6 +249,29 @@ public List<Query> shard(SortedSet<ByteString> splitPoints) {
return shards;
}

/**
* Create a query paginator that'll split the query into smaller chunks.
*
* <p>Example usage:
*
* <pre>{@code
* Query query = Query.create(...).range("a", "z");
* Query.QueryPaginator paginator = query.createQueryPaginator(100);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
* ByteString lastSeenRowKey = ByteString.EMPTY;
* do {
* List<Row> rows = client.readRowsCallable().all().call(paginator.getNextQuery());
* for (Row row : rows) {
* // do some processing
* lastSeenRow = row;
* }
* } while (paginator.advance(lastSeenRowKey));
* }</pre>
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public QueryPaginator createPaginator(int pageSize) {
return new QueryPaginator(this, pageSize);
}

/** Get the minimal range that encloses all of the row keys and ranges in this Query. */
public ByteStringRange getBound() {
return RowSetUtil.getBound(builder.getRows());
Expand Down Expand Up @@ -297,6 +321,73 @@ private static ByteString wrapKey(String key) {
return ByteString.copyFromUtf8(key);
}

/**
* A Query Paginator that will split a query into small chunks. See {@link
* Query#createPaginator(int)} for example usage.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public static class QueryPaginator {

private final boolean hasOverallLimit;
private long remainingRows;
private Query query;
private final int pageSize;
private ByteString prevSplitPoint;

QueryPaginator(@Nonnull Query query, int pageSize) {
this.hasOverallLimit = query.builder.getRowsLimit() > 0;
this.remainingRows = query.builder.getRowsLimit();
this.query = query.limit(pageSize);
if (hasOverallLimit) {
remainingRows -= pageSize;
}
this.pageSize = pageSize;
this.prevSplitPoint = ByteString.EMPTY;
}

/** Return the next query. */
public Query getNextQuery() {
return query;
}

/**
* Construct the next query. Return true if there are more queries to return. False if we've
* read everything.
*/
public boolean advance(@Nonnull ByteString lastSeenRowKey) {
Preconditions.checkNotNull(
lastSeenRowKey, "lastSeenRowKey cannot be null, use ByteString.EMPTY instead.");
// Full table scans don't have ranges or limits. Running the query again will return an empty
// list when we reach the end of the table. lastSeenRowKey won't be updated in this case, and
// we can break out of the loop.
if (lastSeenRowKey.equals(prevSplitPoint)) {
return false;
}
this.prevSplitPoint = lastSeenRowKey;

// Set the query limit. If the original limit is set, return false if the new
// limit is <= 0 to avoid returning more rows than intended.
if (hasOverallLimit && remainingRows <= 0) {
return false;
}
if (hasOverallLimit) {
query.limit(Math.min(this.pageSize, remainingRows));
remainingRows -= pageSize;
} else {
query.limit(pageSize);
}

// Split the row ranges / row keys. Return false if there's nothing
// left on the right of the split point.
RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey);
if (split.getRight() == null) {
return false;
}
query.builder.setRows(split.getRight());
return true;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,182 @@ public void testClone() {
assertThat(clonedReq).isEqualTo(query);
assertThat(clonedReq.toProto(requestContext)).isEqualTo(request);
}

@Test
public void testQueryPaginatorRangeLimitReached() {
int chunkSize = 10, limit = 15;
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
int expectedLimit = limit - chunkSize;
nextQuery = paginator.getNextQuery();
expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(expectedLimit);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
}

@Test
public void testQueryPaginatorRangeLimitMultiplyOfChunkSize() {
int chunkSize = 10, limit = 20;
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
int expectedLimit = limit - chunkSize;
nextQuery = paginator.getNextQuery();
expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(expectedLimit);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
}

@Test
public void testQueryPaginatorRagneNoLimit() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID).range("a", "z");
Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
nextQuery = paginator.getNextQuery();
expectedProto
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("z"))).isFalse();
}

@Test
public void testQueryPaginatorRowsNoLimit() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID).rowKey("a").rowKey("b").rowKey("c");

Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

ReadRowsRequest.Builder expectedProto = expectedProtoBuilder();
expectedProto
.getRowsBuilder()
.addRowKeys(ByteString.copyFromUtf8("a"))
.addRowKeys(ByteString.copyFromUtf8("b"))
.addRowKeys(ByteString.copyFromUtf8("c"));
expectedProto.setRowsLimit(chunkSize);

assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

paginator.advance(ByteString.copyFromUtf8("b"));
nextQuery = paginator.getNextQuery();
expectedProto = expectedProtoBuilder();
expectedProto.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8("c"));
expectedProto.setRowsLimit(chunkSize);

assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isFalse();
}

@Test
public void testQueryPaginatorFullTableScan() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID);
Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize);

ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isTrue();
expectedProto
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("a")).build()))
.setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isFalse();
}

@Test
public void testQueryPaginatorEmptyTable() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID);
Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize);

ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.EMPTY)).isFalse();
}
}