Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count optimization changes #811

Merged
merged 6 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Here are some Stargate-relevant property groups that are necessary for correct s
| `stargate.jsonapi.operations.lwt.retries` | `int` | `3` | The amount of client side retries in case of a LWT failure. |
| `stargate.jsonapi.operations.database-config.session-cache-ttl-seconds` | `int` | `300` | The amount of seconds that the cql session will be kept in memory after last access. |
| `stargate.jsonapi.operations.database-config.session-cache-max-size` | `int` | `50` | The maximum number of cql sessions that will be kept in memory. |
| `stargate.jsonapi.operations.default-count-page-size` | `int` | `100` | The default Cassandra page size used for read queries that are used for counting by key purposes. |
| `stargate.jsonapi.operations.max-count-limit` | `int` | `1000` | The default max count response when resolved using keys. Set to -1 use cassandra count function. |
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved


## Jsonapi metering configuration
*Configuration for jsonapi metering, defined by [JsonApiMetricsConfig.java](io/stargate/sgv2/jsonapi/api/v1/metrics/JsonApiMetricsConfig.java).*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,23 @@ public interface OperationsConfig {
@WithDefault("1000")
int maxVectorSearchLimit();

/**
* @return Maximum size of keys read from database to return count, Setting it to -1 will use
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maximum size -> Maximum number ?

* Cassandra's count function. Default is 1000 <code>1000</code> command.
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
*/
@Max(10000)
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
@WithDefault("1000")
int maxCountLimit();

/**
* @return Defines the default page size for count operation, having separate config because count
* will more keys per page, defaults to <code>100</code>.
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
*/
@Max(500)
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
@Positive
@WithDefault("100")
int defaultCountPageSize();

@NotNull
@Valid
LwtConfig lwt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,13 +67,12 @@ public Uni<AsyncResultSet> executeRead(
* query must have keyspace prefixed.
* @return AsyncResultSet
*/
public Uni<AsyncResultSet> executeCount(SimpleStatement simpleStatement) {
public CompletionStage<AsyncResultSet> executeCount(SimpleStatement simpleStatement) {
simpleStatement =
simpleStatement
.setExecutionProfileName("count")
.setConsistencyLevel(operationsConfig.queriesConfig().consistency().reads());
return Uni.createFrom()
.completionStage(cqlSessionCache.getSession().executeAsync(simpleStatement));
return cqlSessionCache.getSession().executeAsync(simpleStatement);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,32 @@
* Operation that returns count of documents based on the filter condition. Written with the
* assumption that all variables to be indexed.
*/
public record CountOperation(CommandContext commandContext, LogicalExpression logicalExpression)
public record CountOperation(
tatu-at-datastax marked this conversation as resolved.
Show resolved Hide resolved
CommandContext commandContext, LogicalExpression logicalExpression, int pageSize, int limit)
implements ReadOperation {

@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
SimpleStatement simpleStatement = buildSelectQuery();
return countDocuments(queryExecutor, simpleStatement)
Uni<CountResponse> countResponse = null;
if (limit == -1) countResponse = countDocuments(queryExecutor, simpleStatement);
else countResponse = countDocumentsByKey(queryExecutor, simpleStatement);

return countResponse
.onItem()
.transform(docs -> new CountOperationPage(docs.count()));
.transform(
docs -> {
if (limit == -1) {
return new CountOperationPage(docs.count(), false);
} else {
boolean moreData = docs.count() > limit();
return new CountOperationPage(
docs.count() == 0
? 0
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
: docs.count() > limit() ? docs.count() - 1 : docs.count(),
moreData);
}
});
}

private SimpleStatement buildSelectQuery() {
Expand All @@ -38,16 +55,29 @@ private SimpleStatement buildSelectQuery() {
if (expressions != null && !expressions.isEmpty() && expressions.get(0) != null) {
collect = ExpressionBuilder.getExpressionValuesInOrder(expressions.get(0));
Copy link
Contributor

@tatu-at-datastax tatu-at-datastax Jan 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is existing unchanged code, but can expressions ever have more than 1 element? It seems that'd not be supported. Would it make sense to assert (throw exception) if more than one exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is done that way. I will create a issue to handle all expressions,

}
final QueryOuterClass.Query query =
new QueryBuilder()
.select()
.count()
.as("count")
.from(commandContext.namespace(), commandContext.collection())
.where(expressions.get(0)) // TODO count will assume no id filter query split?
.build();
QueryOuterClass.Query query = null;
if (limit == -1) {
query =
new QueryBuilder()
.select()
.count()
.as("count")
.from(commandContext.namespace(), commandContext.collection())
.where(expressions.get(0))
.build();
} else {
query =
new QueryBuilder()
.select()
.column("key")
.from(commandContext.namespace(), commandContext.collection())
.where(expressions.get(0))
.limit(limit + 1)
.build();
}

final SimpleStatement simpleStatement = SimpleStatement.newInstance(query.getCql());
simpleStatement.setPageSize(pageSize());
return simpleStatement.setPositionalValues(collect);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -343,6 +345,34 @@ private String extractPageStateFromResultSet(AsyncResultSet rSet) {
}
return null;
}
/**
* Default implementation to run count query and parse the result set, this approach counts by key
* field
*
* @param queryExecutor
* @param simpleStatement
* @return
*/
default Uni<CountResponse> countDocumentsByKey(
QueryExecutor queryExecutor, SimpleStatement simpleStatement) {
AtomicLong counter = new AtomicLong();
final CompletionStage<AsyncResultSet> async =
queryExecutor
.executeCount(simpleStatement)
.whenComplete(
(rs, error) -> {
getCount(rs, error, counter);
});

return Uni.createFrom()
.completionStage(async)
.onItem()
.transform(
rs -> {
return new CountResponse(counter.get());
});
}

/**
* Default implementation to run count query and parse the result set
*
Expand All @@ -352,8 +382,8 @@ private String extractPageStateFromResultSet(AsyncResultSet rSet) {
*/
default Uni<CountResponse> countDocuments(
QueryExecutor queryExecutor, SimpleStatement simpleStatement) {
return queryExecutor
.executeCount(simpleStatement)
return Uni.createFrom()
.completionStage(queryExecutor.executeCount(simpleStatement))
.onItem()
.transform(
rSet -> {
Expand All @@ -363,6 +393,17 @@ default Uni<CountResponse> countDocuments(
});
}

private void getCount(AsyncResultSet rs, Throwable error, AtomicLong counter) {
if (error != null) {
throw new RuntimeException(error);
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
} else {
counter.addAndGet(rs.remaining());
if (rs.hasMorePages()) {
rs.fetchNextPage().whenComplete((nextRs, e) -> getCount(nextRs, e, counter));
}
}
}

record FindResponse(List<ReadDocument> docs, String pageState) {}

record CountResponse(long count) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import java.util.Map;
import java.util.function.Supplier;

public record CountOperationPage(long count) implements Supplier<CommandResult> {
public record CountOperationPage(long count, boolean moreData) implements Supplier<CommandResult> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use moreData, but could also name it something like countIncomplete or hasMoreRows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used it because DeleteMany and UpdateMany use this falg.

@Override
public CommandResult get() {
if (moreData) {
return new CommandResult(
Map.of(CommandStatus.COUNTED_DOCUMENT, count(), CommandStatus.MORE_DATA, true));
}
return new CommandResult(Map.of(CommandStatus.COUNTED_DOCUMENT, count()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.clause.filter.LogicalExpression;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CountDocumentsCommand;
import io.stargate.sgv2.jsonapi.config.OperationsConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.CountOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver;
Expand All @@ -14,9 +15,13 @@
@ApplicationScoped
public class CountDocumentsCommandResolver extends FilterableResolver<CountDocumentsCommand>
implements CommandResolver<CountDocumentsCommand> {

private final OperationsConfig operationsConfig;

@Inject
public CountDocumentsCommandResolver() {
public CountDocumentsCommandResolver(OperationsConfig operationsConfig) {
super();
this.operationsConfig = operationsConfig;
}

@Override
Expand All @@ -27,6 +32,10 @@ public Class<CountDocumentsCommand> getCommandClass() {
@Override
public Operation resolveCommand(CommandContext ctx, CountDocumentsCommand command) {
LogicalExpression logicalExpression = resolve(ctx, command);
return new CountOperation(ctx, logicalExpression);
return new CountOperation(
ctx,
logicalExpression,
operationsConfig.defaultCountPageSize(),
operationsConfig.maxCountLimit());
}
}
Loading