Skip to content

Commit

Permalink
Count optimization changes (#811)
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshrajamani authored Jan 19, 2024
1 parent 88023ab commit bbf46e6
Show file tree
Hide file tree
Showing 15 changed files with 1,173 additions and 123 deletions.
3 changes: 3 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Here are some Stargate-relevant property groups that are necessary for correct s
| `stargate.jsonapi.operations.lwt.retries` | `int` | `3` | The amount of client side retries in case of a LWT failure. |
| `stargate.jsonapi.operations.database-config.session-cache-ttl-seconds` | `int` | `300` | The amount of seconds that the cql session will be kept in memory after last access. |
| `stargate.jsonapi.operations.database-config.session-cache-max-size` | `int` | `50` | The maximum number of cql sessions that will be kept in memory. |
| `stargate.jsonapi.operations.default-count-page-size` | `int` | `100` | The default Cassandra page size used for reading keys for count command. |
| `stargate.jsonapi.operations.max-count-limit` | `int` | `1000` | The default maximum number of rows to read for count operation. |


## Jsonapi metering configuration
*Configuration for jsonapi metering, defined by [JsonApiMetricsConfig.java](io/stargate/sgv2/jsonapi/api/v1/metrics/JsonApiMetricsConfig.java).*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,23 @@ public interface OperationsConfig {
@WithDefault("1000")
int maxVectorSearchLimit();

/**
* @return Maximum size of keys read from database to return count, Setting it to -1 will use
* Cassandra's count function. Default is <code>1000</code>.
*/
@WithDefault("1000")
int maxCountLimit();

/**
* @return Defines the default page size for count operation, having separate from
* `defaultPageSize` config because count will read more keys per page, defaults to <code>100
* </code>.
*/
@Max(500)
@Positive
@WithDefault("100")
int defaultCountPageSize();

@NotNull
@Valid
LwtConfig lwt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/** ErrorCode is our internal enum that provides codes and a default message for that error code. */
public enum ErrorCode {
/** Command error codes. */
COUNT_READ_FAILED("Unable to count documents"),
COMMAND_NOT_IMPLEMENTED("The provided command is not implemented."),

COMMAND_ACCEPTS_NO_OPTIONS("Command accepts no options"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,13 +67,12 @@ public Uni<AsyncResultSet> executeRead(
* query must have keyspace prefixed.
* @return AsyncResultSet
*/
public Uni<AsyncResultSet> executeCount(SimpleStatement simpleStatement) {
public CompletionStage<AsyncResultSet> executeCount(SimpleStatement simpleStatement) {
simpleStatement =
simpleStatement
.setExecutionProfileName("count")
.setConsistencyLevel(operationsConfig.queriesConfig().consistency().reads());
return Uni.createFrom()
.completionStage(cqlSessionCache.getSession().executeAsync(simpleStatement));
return cqlSessionCache.getSession().executeAsync(simpleStatement);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,29 @@
* Operation that returns count of documents based on the filter condition. Written with the
* assumption that all variables to be indexed.
*/
public record CountOperation(CommandContext commandContext, LogicalExpression logicalExpression)
public record CountOperation(
CommandContext commandContext, LogicalExpression logicalExpression, int pageSize, int limit)
implements ReadOperation {

@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
SimpleStatement simpleStatement = buildSelectQuery();
return countDocuments(queryExecutor, simpleStatement)
Uni<CountResponse> countResponse = null;
if (limit == -1) countResponse = countDocuments(queryExecutor, simpleStatement);
else countResponse = countDocumentsByKey(queryExecutor, simpleStatement);

return countResponse
.onItem()
.transform(docs -> new CountOperationPage(docs.count()));
.transform(
docs -> {
if (limit == -1) {
return new CountOperationPage(docs.count(), false);
} else {
boolean moreData = docs.count() > limit();
return new CountOperationPage(
docs.count() > limit() ? docs.count() - 1 : docs.count(), moreData);
}
});
}

private SimpleStatement buildSelectQuery() {
Expand All @@ -38,16 +52,29 @@ private SimpleStatement buildSelectQuery() {
if (expressions != null && !expressions.isEmpty() && expressions.get(0) != null) {
collect = ExpressionBuilder.getExpressionValuesInOrder(expressions.get(0));
}
final QueryOuterClass.Query query =
new QueryBuilder()
.select()
.count()
.as("count")
.from(commandContext.namespace(), commandContext.collection())
.where(expressions.get(0)) // TODO count will assume no id filter query split?
.build();
QueryOuterClass.Query query = null;
if (limit == -1) {
query =
new QueryBuilder()
.select()
.count()
.as("count")
.from(commandContext.namespace(), commandContext.collection())
.where(expressions.get(0))
.build();
} else {
query =
new QueryBuilder()
.select()
.column("key")
.from(commandContext.namespace(), commandContext.collection())
.where(expressions.get(0))
.limit(limit + 1)
.build();
}

final SimpleStatement simpleStatement = SimpleStatement.newInstance(query.getCql());
simpleStatement.setPageSize(pageSize());
return simpleStatement.setPositionalValues(collect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -343,6 +345,34 @@ private String extractPageStateFromResultSet(AsyncResultSet rSet) {
}
return null;
}
/**
* Default implementation to run count query and parse the result set, this approach counts by key
* field
*
* @param queryExecutor
* @param simpleStatement
* @return
*/
default Uni<CountResponse> countDocumentsByKey(
QueryExecutor queryExecutor, SimpleStatement simpleStatement) {
AtomicLong counter = new AtomicLong();
final CompletionStage<AsyncResultSet> async =
queryExecutor
.executeCount(simpleStatement)
.whenComplete(
(rs, error) -> {
getCount(rs, error, counter);
});

return Uni.createFrom()
.completionStage(async)
.onItem()
.transform(
rs -> {
return new CountResponse(counter.get());
});
}

/**
* Default implementation to run count query and parse the result set
*
Expand All @@ -352,8 +382,8 @@ private String extractPageStateFromResultSet(AsyncResultSet rSet) {
*/
default Uni<CountResponse> countDocuments(
QueryExecutor queryExecutor, SimpleStatement simpleStatement) {
return queryExecutor
.executeCount(simpleStatement)
return Uni.createFrom()
.completionStage(queryExecutor.executeCount(simpleStatement))
.onItem()
.transform(
rSet -> {
Expand All @@ -363,6 +393,17 @@ default Uni<CountResponse> countDocuments(
});
}

private void getCount(AsyncResultSet rs, Throwable error, AtomicLong counter) {
if (error != null) {
throw new JsonApiException(ErrorCode.COUNT_READ_FAILED);
} else {
counter.addAndGet(rs.remaining());
if (rs.hasMorePages()) {
rs.fetchNextPage().whenComplete((nextRs, e) -> getCount(nextRs, e, counter));
}
}
}

record FindResponse(List<ReadDocument> docs, String pageState) {}

record CountResponse(long count) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import java.util.Map;
import java.util.function.Supplier;

public record CountOperationPage(long count) implements Supplier<CommandResult> {
public record CountOperationPage(long count, boolean moreData) implements Supplier<CommandResult> {
@Override
public CommandResult get() {
if (moreData) {
return new CommandResult(
Map.of(CommandStatus.COUNTED_DOCUMENT, count(), CommandStatus.MORE_DATA, true));
}
return new CommandResult(Map.of(CommandStatus.COUNTED_DOCUMENT, count()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.clause.filter.LogicalExpression;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CountDocumentsCommand;
import io.stargate.sgv2.jsonapi.config.OperationsConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.CountOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver;
Expand All @@ -14,9 +15,13 @@
@ApplicationScoped
public class CountDocumentsCommandResolver extends FilterableResolver<CountDocumentsCommand>
implements CommandResolver<CountDocumentsCommand> {

private final OperationsConfig operationsConfig;

@Inject
public CountDocumentsCommandResolver() {
public CountDocumentsCommandResolver(OperationsConfig operationsConfig) {
super();
this.operationsConfig = operationsConfig;
}

@Override
Expand All @@ -27,6 +32,10 @@ public Class<CountDocumentsCommand> getCommandClass() {
@Override
public Operation resolveCommand(CommandContext ctx, CountDocumentsCommand command) {
LogicalExpression logicalExpression = resolve(ctx, command);
return new CountOperation(ctx, logicalExpression);
return new CountOperation(
ctx,
logicalExpression,
operationsConfig.defaultCountPageSize(),
operationsConfig.maxCountLimit());
}
}
Loading

0 comments on commit bbf46e6

Please sign in to comment.