From eaa0914b6f91dba8111aab1fb831ff073ae7e647 Mon Sep 17 00:00:00 2001
From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com>
Date: Thu, 18 Jan 2024 22:02:34 -0500
Subject: [PATCH 1/6] Count optimization changes
---
.../sgv2/jsonapi/config/OperationsConfig.java | 18 +
.../cqldriver/executor/QueryExecutor.java | 6 +-
.../operation/model/CountOperation.java | 52 +-
.../operation/model/ReadOperation.java | 45 +-
.../model/impl/CountOperationPage.java | 2 +-
.../impl/CountDocumentsCommandResolver.java | 13 +-
.../v1/CountByCassandraIntegrationTest.java | 814 ++++++++++++++++++
.../model/impl/CountOperationTest.java | 211 ++++-
.../model/impl/CountCommandResolverTest.java | 91 --
.../CountDocumentsCommandResolverTest.java | 7 +
.../jsonapi/testresource/DseTestResource.java | 11 +
.../testresource/StargateTestResource.java | 9 +
12 files changed, 1158 insertions(+), 121 deletions(-)
create mode 100644 src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
delete mode 100644 src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountCommandResolverTest.java
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
index 866b9fb0f9..84f120fa8a 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
@@ -100,6 +100,24 @@ public interface OperationsConfig {
@WithDefault("1000")
int maxVectorSearchLimit();
+ /**
+ * @return Maximum size of keys read from database to return count, Setting it to -1 will use
+ * Cassandra's count function. Default is 1000 1000
command.
+ */
+ @Max(10000)
+ @Positive
+ @WithDefault("1000")
+ int maxCountLimit();
+
+ /**
+ * @return Defines the default page size for count operation, having separate config because count
+ * will more keys per page, defaults to 100
.
+ */
+ @Max(500)
+ @Positive
+ @WithDefault("100")
+ int defaultCountPageSize();
+
@NotNull
@Valid
LwtConfig lwt();
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java
index eff43db052..2bbf67ea00 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
+import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,13 +67,12 @@ public Uni executeRead(
* query must have keyspace prefixed.
* @return AsyncResultSet
*/
- public Uni executeCount(SimpleStatement simpleStatement) {
+ public CompletionStage executeCount(SimpleStatement simpleStatement) {
simpleStatement =
simpleStatement
.setExecutionProfileName("count")
.setConsistencyLevel(operationsConfig.queriesConfig().consistency().reads());
- return Uni.createFrom()
- .completionStage(cqlSessionCache.getSession().executeAsync(simpleStatement));
+ return cqlSessionCache.getSession().executeAsync(simpleStatement);
}
/**
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
index 10114b35b1..0dcd6c624f 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
@@ -20,15 +20,32 @@
* Operation that returns count of documents based on the filter condition. Written with the
* assumption that all variables to be indexed.
*/
-public record CountOperation(CommandContext commandContext, LogicalExpression logicalExpression)
+public record CountOperation(
+ CommandContext commandContext, LogicalExpression logicalExpression, int pageSize, int limit)
implements ReadOperation {
@Override
public Uni> execute(QueryExecutor queryExecutor) {
SimpleStatement simpleStatement = buildSelectQuery();
- return countDocuments(queryExecutor, simpleStatement)
+ Uni countResponse = null;
+ if (limit == -1) countResponse = countDocuments(queryExecutor, simpleStatement);
+ else countResponse = countDocumentsByKey(queryExecutor, simpleStatement);
+
+ return countResponse
.onItem()
- .transform(docs -> new CountOperationPage(docs.count()));
+ .transform(
+ docs -> {
+ if (limit == -1) {
+ return new CountOperationPage(docs.count(), false);
+ } else {
+ boolean moreData = docs.count() > limit();
+ return new CountOperationPage(
+ docs.count() == 0
+ ? 0
+ : docs.count() > limit() ? docs.count() - 1 : docs.count(),
+ moreData);
+ }
+ });
}
private SimpleStatement buildSelectQuery() {
@@ -38,16 +55,29 @@ private SimpleStatement buildSelectQuery() {
if (expressions != null && !expressions.isEmpty() && expressions.get(0) != null) {
collect = ExpressionBuilder.getExpressionValuesInOrder(expressions.get(0));
}
- final QueryOuterClass.Query query =
- new QueryBuilder()
- .select()
- .count()
- .as("count")
- .from(commandContext.namespace(), commandContext.collection())
- .where(expressions.get(0)) // TODO count will assume no id filter query split?
- .build();
+ QueryOuterClass.Query query = null;
+ if (limit == -1) {
+ query =
+ new QueryBuilder()
+ .select()
+ .count()
+ .as("count")
+ .from(commandContext.namespace(), commandContext.collection())
+ .where(expressions.get(0))
+ .build();
+ } else {
+ query =
+ new QueryBuilder()
+ .select()
+ .column("key")
+ .from(commandContext.namespace(), commandContext.collection())
+ .where(expressions.get(0))
+ .limit(limit + 1)
+ .build();
+ }
final SimpleStatement simpleStatement = SimpleStatement.newInstance(query.getCql());
+ simpleStatement.setPageSize(pageSize());
return simpleStatement.setPositionalValues(collect);
}
}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
index df89768d89..b80b0db60b 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
@@ -29,7 +29,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -343,6 +345,34 @@ private String extractPageStateFromResultSet(AsyncResultSet rSet) {
}
return null;
}
+ /**
+ * Default implementation to run count query and parse the result set, this approach counts by key
+ * field
+ *
+ * @param queryExecutor
+ * @param simpleStatement
+ * @return
+ */
+ default Uni countDocumentsByKey(
+ QueryExecutor queryExecutor, SimpleStatement simpleStatement) {
+ AtomicLong counter = new AtomicLong();
+ final CompletionStage async =
+ queryExecutor
+ .executeCount(simpleStatement)
+ .whenComplete(
+ (rs, error) -> {
+ getCount(rs, error, counter);
+ });
+
+ return Uni.createFrom()
+ .completionStage(async)
+ .onItem()
+ .transform(
+ rs -> {
+ return new CountResponse(counter.get());
+ });
+ }
+
/**
* Default implementation to run count query and parse the result set
*
@@ -352,8 +382,8 @@ private String extractPageStateFromResultSet(AsyncResultSet rSet) {
*/
default Uni countDocuments(
QueryExecutor queryExecutor, SimpleStatement simpleStatement) {
- return queryExecutor
- .executeCount(simpleStatement)
+ return Uni.createFrom()
+ .completionStage(queryExecutor.executeCount(simpleStatement))
.onItem()
.transform(
rSet -> {
@@ -363,6 +393,17 @@ default Uni countDocuments(
});
}
+ private void getCount(AsyncResultSet rs, Throwable error, AtomicLong counter) {
+ if (error != null) {
+ throw new RuntimeException(error);
+ } else {
+ counter.addAndGet(rs.remaining());
+ if (rs.hasMorePages()) {
+ rs.fetchNextPage().whenComplete((nextRs, e) -> getCount(nextRs, e, counter));
+ }
+ }
+ }
+
record FindResponse(List docs, String pageState) {}
record CountResponse(long count) {}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java
index f2ecf18dd5..0fc71bcec4 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java
@@ -5,7 +5,7 @@
import java.util.Map;
import java.util.function.Supplier;
-public record CountOperationPage(long count) implements Supplier {
+public record CountOperationPage(long count, boolean moreData) implements Supplier {
@Override
public CommandResult get() {
return new CommandResult(Map.of(CommandStatus.COUNTED_DOCUMENT, count()));
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolver.java
index 680358f888..43081bc60f 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolver.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolver.java
@@ -3,6 +3,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.clause.filter.LogicalExpression;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CountDocumentsCommand;
+import io.stargate.sgv2.jsonapi.config.OperationsConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.CountOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver;
@@ -14,9 +15,13 @@
@ApplicationScoped
public class CountDocumentsCommandResolver extends FilterableResolver
implements CommandResolver {
+
+ private final OperationsConfig operationsConfig;
+
@Inject
- public CountDocumentsCommandResolver() {
+ public CountDocumentsCommandResolver(OperationsConfig operationsConfig) {
super();
+ this.operationsConfig = operationsConfig;
}
@Override
@@ -27,6 +32,10 @@ public Class getCommandClass() {
@Override
public Operation resolveCommand(CommandContext ctx, CountDocumentsCommand command) {
LogicalExpression logicalExpression = resolve(ctx, command);
- return new CountOperation(ctx, logicalExpression);
+ return new CountOperation(
+ ctx,
+ logicalExpression,
+ operationsConfig.defaultCountPageSize(),
+ operationsConfig.maxCountLimit());
}
}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
new file mode 100644
index 0000000000..3edf664a4a
--- /dev/null
+++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
@@ -0,0 +1,814 @@
+package io.stargate.sgv2.jsonapi.api.v1;
+
+import static io.restassured.RestAssured.given;
+import static io.stargate.sgv2.common.IntegrationTestUtils.getAuthToken;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import io.restassured.http.ContentType;
+import io.stargate.sgv2.jsonapi.config.constants.HttpConstants;
+import io.stargate.sgv2.jsonapi.testresource.DseTestResource;
+import org.junit.jupiter.api.ClassOrderer;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestClassOrder;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@QuarkusIntegrationTest
+@QuarkusTestResource(
+ value = CountByCassandraIntegrationTest.CassandraCounterTestResource.class,
+ restrictToAnnotatedClass = true)
+@TestClassOrder(ClassOrderer.OrderAnnotation.class)
+public class CountByCassandraIntegrationTest extends AbstractCollectionIntegrationTestBase {
+
+ // Need to set max count limit to -1, so cassandra count(*) is used
+ // to trigger test failure for "not enough indexes"
+ public static class CassandraCounterTestResource extends DseTestResource {
+ public CassandraCounterTestResource() {}
+
+ @Override
+ public int getMaxCountLimit() {
+ return -1;
+ }
+ }
+
+ @Nested
+ @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+ @Order(1)
+ class Count {
+
+ @Test
+ @Order(1)
+ public void setUp() {
+ String json =
+ """
+ {
+ "insertOne": {
+ "document": {
+ "_id": "doc1",
+ "username": "user1",
+ "active_user" : true
+ }
+ }
+ }
+ """;
+ insert(json);
+
+ json =
+ """
+ {
+ "insertOne": {
+ "document": {
+ "_id": "doc2",
+ "username": "user2",
+ "subdoc" : {
+ "id" : "abc"
+ },
+ "array" : [
+ "value1"
+ ]
+ }
+ }
+ }
+ """;
+ insert(json);
+
+ json =
+ """
+ {
+ "insertOne": {
+ "document": {
+ "_id": "doc3",
+ "username": "user3",
+ "tags" : ["tag1", "tag2", "tag1234567890123456789012345", null, 1, true],
+ "nestedArray" : [["tag1", "tag2"], ["tag1234567890123456789012345", null]]
+ }
+ }
+ }
+ """;
+ insert(json);
+
+ json =
+ """
+ {
+ "insertOne": {
+ "document": {
+ "_id": "doc4",
+ "indexedObject" : { "0": "value_0", "1": "value_1" }
+ }
+ }
+ }
+ """;
+ insert(json);
+
+ json =
+ """
+ {
+ "insertOne": {
+ "document": {
+ "_id": "doc5",
+ "username": "user5",
+ "sub_doc" : { "a": 5, "b": { "c": "v1", "d": false } }
+ }
+ }
+ }
+ """;
+ insert(json);
+ }
+
+ private void insert(String json) {
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void noFilter() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(5))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void emptyOptionsAllowed() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "options": {}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(5))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void byColumn() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"username" : "user1"}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void countBySimpleOr() {
+ String json =
+ """
+{
+ "countDocuments": {
+ "filter": {
+ "$or": [
+ {
+ "username": "user1"
+ },
+ {
+ "username": "user2"
+ }
+ ]
+ }
+ }
+}
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(2))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqComparisonOperator() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"username" : {"$eq" : "user1"}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqSubDoc() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"subdoc.id" : {"$eq" : "abc"}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqSubDocWithIndex() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"indexedObject.1" : {"$eq" : "value_1"}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqArrayElement() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"array.0" : {"$eq" : "value1"}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withExistFalseOperator() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"active_user" : {"$exists" : false}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(4))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withExistOperator() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"active_user" : {"$exists" : true}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withAllOperator() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$all" : ["tag1", "tag2"]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withAllOperatorAnd() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter": {
+ "$and": [
+ {
+ "tags": {
+ "$all": [
+ "tag1",
+ "tag2"
+ ]
+ }
+ },
+ {
+ "active_user": {
+ "$exists": true
+ }
+ }
+ ]
+ }
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(0))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withAllOperatorLongerString() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$all" : ["tag1", "tag1234567890123456789012345"]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withAllOperatorMixedAFormatArray() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$all" : ["tag1", 1, true, null]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withAllOperatorNoMatch() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$all" : ["tag1", 2, true, null]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(0))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqSubDocumentShortcut() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"sub_doc" : { "a": 5, "b": { "c": "v1", "d": false } } }
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqSubDocument() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"sub_doc" : { "$eq" : { "a": 5, "b": { "c": "v1", "d": false } } } }
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqSubDocumentOrderChangeNoMatch() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"sub_doc" : { "$eq" : { "a": 5, "b": { "d": false, "c": "v1" } } } }
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(0))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqSubDocumentNoMatch() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"sub_doc" : { "$eq" : { "a": 5, "b": { "c": "v1", "d": true } } } }
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(0))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withSizeOperator() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$size" : 6}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withSizeOperatorNoMatch() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$size" : 1}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(0))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqOperatorArray() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$eq" : ["tag1", "tag2", "tag1234567890123456789012345", null, 1, true]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqOperatorNestedArray() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"nestedArray" : {"$eq" : [["tag1", "tag2"], ["tag1234567890123456789012345", null]]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqOperatorArrayNoMatch() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"tags" : {"$eq" : ["tag1", "tag2", "tag1234567890123456789012345", null, 1]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(0))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withEqOperatorNestedArrayNoMatch() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"nestedArray" : {"$eq" : [["tag1", "tag2"], ["tag1234567890123456789012345", null], ["abc"]]}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(0))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void withNEComparisonOperator() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"username" : {"$ne" : "user1"}}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(4))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+
+ @Test
+ public void byBooleanColumn() {
+ String json =
+ """
+ {
+ "countDocuments": {
+ "filter" : {"active_user" : true}
+ }
+ }
+ """;
+
+ given()
+ .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken())
+ .contentType(ContentType.JSON)
+ .body(json)
+ .when()
+ .post(CollectionResource.BASE_PATH, namespaceName, collectionName)
+ .then()
+ .statusCode(200)
+ .body("status.count", is(1))
+ .body("data", is(nullValue()))
+ .body("errors", is(nullValue()));
+ }
+ }
+
+ @Nested
+ @Order(2)
+ class Metrics {
+ @Test
+ public void checkMetrics() {
+ CountByCassandraIntegrationTest.super.checkMetrics("CountDocumentsCommand");
+ }
+ }
+}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationTest.java
index 3daf84721d..78943f440a 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationTest.java
@@ -11,7 +11,6 @@
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;
-import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
@@ -25,6 +24,7 @@
import io.stargate.sgv2.jsonapi.service.testutil.MockRow;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.junit.jupiter.api.Nested;
@@ -33,8 +33,15 @@
@QuarkusTest
@TestProfile(NoGlobalResourcesTestProfile.Impl.class)
public class CountOperationTest extends OperationTestBase {
+
+ private final ColumnDefinitions KEY_COLUMN = buildColumnDefs(TestColumn.keyColumn());
+
+ MockRow resultRow(int index, String key) {
+ return new MockRow(KEY_COLUMN, index, Arrays.asList(byteBufferForKey(key)));
+ }
+
@Nested
- class Execute {
+ class ExecuteCassandraCount {
private final ColumnDefinitions COUNT_RESULT_COLUMNS =
buildColumnDefs(TestColumn.ofLong("count"));
@@ -52,10 +59,10 @@ public void countWithNoFilter() {
.then(
invocation -> {
callCount.incrementAndGet();
- return Uni.createFrom().item(mockResults);
+ return CompletableFuture.supplyAsync(() -> mockResults).minimalCompletionStage();
});
- CountOperation countOperation = new CountOperation(CONTEXT, LogicalExpression.and());
+ CountOperation countOperation = new CountOperation(CONTEXT, LogicalExpression.and(), 100, -1);
Supplier execute =
countOperation
.execute(queryExecutor)
@@ -93,7 +100,7 @@ public void countWithDynamic() {
.then(
invocation -> {
callCount.incrementAndGet();
- return Uni.createFrom().item(mockResults);
+ return CompletableFuture.supplyAsync(() -> mockResults).minimalCompletionStage();
});
LogicalExpression implicitAnd = LogicalExpression.and();
@@ -105,7 +112,7 @@ public void countWithDynamic() {
List.of(
new DBFilterBase.TextFilter(
"username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")));
- CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd);
+ CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd, 100, -1);
Supplier execute =
countOperation
.execute(queryExecutor)
@@ -143,7 +150,7 @@ public void countWithDynamicNoMatch() {
.then(
invocation -> {
callCount.incrementAndGet();
- return Uni.createFrom().item(mockResults);
+ return CompletableFuture.supplyAsync(() -> mockResults).minimalCompletionStage();
});
LogicalExpression implicitAnd = LogicalExpression.and();
@@ -156,7 +163,7 @@ public void countWithDynamicNoMatch() {
new DBFilterBase.TextFilter(
"username", DBFilterBase.MapFilterBase.Operator.EQ, "user_all")));
- CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd);
+ CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd, 100, -1);
Supplier execute =
countOperation
.execute(queryExecutor)
@@ -181,7 +188,7 @@ public void countWithDynamicNoMatch() {
@Test
public void error() {
// failures are propagated down
- RuntimeException failure = new RuntimeException("Ivan fails the test.");
+ RuntimeException failure = new RuntimeException("Test failure message.");
String collectionReadCql =
"SELECT COUNT(1) AS count FROM \"%s\".\"%s\"".formatted(KEYSPACE_NAME, COLLECTION_NAME);
SimpleStatement stmt = SimpleStatement.newInstance(collectionReadCql);
@@ -191,11 +198,193 @@ public void error() {
.then(
invocation -> {
callCount.incrementAndGet();
- return Uni.createFrom().failure(failure);
+ return CompletableFuture.failedFuture(failure).minimalCompletionStage();
+ });
+
+ LogicalExpression implicitAnd = LogicalExpression.and();
+ CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd, 100, -1);
+ Throwable result =
+ countOperation
+ .execute(queryExecutor)
+ .subscribe()
+ .withSubscriber(UniAssertSubscriber.create())
+ .awaitFailure()
+ .getFailure();
+
+ // assert query execution
+ assertThat(callCount.get()).isEqualTo(1);
+
+ // then result
+ assertThat(result).isEqualTo(failure);
+ }
+ }
+
+ @Nested
+ class ExecuteByKey {
+ private final ColumnDefinitions COUNT_RESULT_COLUMNS =
+ buildColumnDefs(TestColumn.ofLong("count"));
+
+ @Test
+ public void countWithNoFilter() {
+ String collectionReadCql =
+ "SELECT key FROM \"%s\".\"%s\" LIMIT 11".formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ SimpleStatement stmt = SimpleStatement.newInstance(collectionReadCql);
+ List rows =
+ Arrays.asList(
+ resultRow(0, "key1"),
+ resultRow(1, "key2"),
+ resultRow(2, "key3"),
+ resultRow(3, "key4"),
+ resultRow(4, "key5"));
+ AsyncResultSet mockResults = new MockAsyncResultSet(COUNT_RESULT_COLUMNS, rows, null);
+ final AtomicInteger callCount = new AtomicInteger();
+ QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ when(queryExecutor.executeCount(eq(stmt)))
+ .then(
+ invocation -> {
+ callCount.incrementAndGet();
+ return CompletableFuture.supplyAsync(() -> mockResults).minimalCompletionStage();
+ });
+
+ CountOperation countOperation = new CountOperation(CONTEXT, LogicalExpression.and(), 100, 10);
+ Supplier execute =
+ countOperation
+ .execute(queryExecutor)
+ .subscribe()
+ .withSubscriber(UniAssertSubscriber.create())
+ .awaitItem()
+ .getItem();
+
+ // assert query execution
+ assertThat(callCount.get()).isEqualTo(1);
+
+ // then result
+ CommandResult result = execute.get();
+ assertThat(result)
+ .satisfies(
+ commandResult -> {
+ assertThat(result.status().get(CommandStatus.COUNTED_DOCUMENT)).isNotNull();
+ assertThat(result.status().get(CommandStatus.COUNTED_DOCUMENT)).isEqualTo(5L);
+ });
+ }
+
+ @Test
+ public void countWithDynamic() {
+ String collectionReadCql =
+ "SELECT key FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 11"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ final String filterValue = "username " + new DocValueHasher().getHash("user2").hash();
+ SimpleStatement stmt = SimpleStatement.newInstance(collectionReadCql, filterValue);
+ List rows = Arrays.asList(resultRow(0, "key1"), resultRow(1, "key2"));
+ AsyncResultSet mockResults = new MockAsyncResultSet(COUNT_RESULT_COLUMNS, rows, null);
+ final AtomicInteger callCount = new AtomicInteger();
+ QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ when(queryExecutor.executeCount(eq(stmt)))
+ .then(
+ invocation -> {
+ callCount.incrementAndGet();
+ return CompletableFuture.supplyAsync(() -> mockResults).minimalCompletionStage();
+ });
+
+ LogicalExpression implicitAnd = LogicalExpression.and();
+ implicitAnd.comparisonExpressions.add(new ComparisonExpression(null, null, null));
+ implicitAnd
+ .comparisonExpressions
+ .get(0)
+ .setDBFilters(
+ List.of(
+ new DBFilterBase.TextFilter(
+ "username", DBFilterBase.MapFilterBase.Operator.EQ, "user2")));
+ CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd, 100, 10);
+ Supplier execute =
+ countOperation
+ .execute(queryExecutor)
+ .subscribe()
+ .withSubscriber(UniAssertSubscriber.create())
+ .awaitItem()
+ .getItem();
+
+ // assert query execution
+ assertThat(callCount.get()).isEqualTo(1);
+
+ // then result
+ CommandResult result = execute.get();
+ assertThat(result)
+ .satisfies(
+ commandResult -> {
+ assertThat(result.status().get(CommandStatus.COUNTED_DOCUMENT)).isNotNull();
+ assertThat(result.status().get(CommandStatus.COUNTED_DOCUMENT)).isEqualTo(2L);
+ });
+ }
+
+ @Test
+ public void countWithDynamicNoMatch() {
+ String collectionReadCql =
+ "SELECT key FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 11"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ final String filterValue = "username " + new DocValueHasher().getHash("user_all").hash();
+ SimpleStatement stmt = SimpleStatement.newInstance(collectionReadCql, filterValue);
+ List rows = Arrays.asList();
+ AsyncResultSet mockResults = new MockAsyncResultSet(COUNT_RESULT_COLUMNS, rows, null);
+ final AtomicInteger callCount = new AtomicInteger();
+ QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ when(queryExecutor.executeCount(eq(stmt)))
+ .then(
+ invocation -> {
+ callCount.incrementAndGet();
+ return CompletableFuture.supplyAsync(() -> mockResults).minimalCompletionStage();
+ });
+
+ LogicalExpression implicitAnd = LogicalExpression.and();
+ implicitAnd.comparisonExpressions.add(new ComparisonExpression(null, null, null));
+ implicitAnd
+ .comparisonExpressions
+ .get(0)
+ .setDBFilters(
+ List.of(
+ new DBFilterBase.TextFilter(
+ "username", DBFilterBase.MapFilterBase.Operator.EQ, "user_all")));
+
+ CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd, 100, 10);
+ Supplier execute =
+ countOperation
+ .execute(queryExecutor)
+ .subscribe()
+ .withSubscriber(UniAssertSubscriber.create())
+ .awaitItem()
+ .getItem();
+
+ // assert query execution
+ assertThat(callCount.get()).isEqualTo(1);
+
+ // then result
+ CommandResult result = execute.get();
+ assertThat(result)
+ .satisfies(
+ commandResult -> {
+ assertThat(result.status().get(CommandStatus.COUNTED_DOCUMENT)).isNotNull();
+ assertThat(result.status().get(CommandStatus.COUNTED_DOCUMENT)).isEqualTo(0L);
+ });
+ }
+
+ @Test
+ public void error() {
+ // failures are propagated down
+ RuntimeException failure = new RuntimeException("Test failure message.");
+ String collectionReadCql =
+ "SELECT key FROM \"%s\".\"%s\" LIMIT 11".formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ SimpleStatement stmt = SimpleStatement.newInstance(collectionReadCql);
+ final AtomicInteger callCount = new AtomicInteger();
+ QueryExecutor queryExecutor = mock(QueryExecutor.class);
+ when(queryExecutor.executeCount(eq(stmt)))
+ .then(
+ invocation -> {
+ callCount.incrementAndGet();
+ return CompletableFuture.failedFuture(failure).minimalCompletionStage();
});
LogicalExpression implicitAnd = LogicalExpression.and();
- CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd);
+ CountOperation countOperation = new CountOperation(CONTEXT, implicitAnd, 100, 10);
Throwable result =
countOperation
.execute(queryExecutor)
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountCommandResolverTest.java
deleted file mode 100644
index 71c8d648ee..0000000000
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountCommandResolverTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package io.stargate.sgv2.jsonapi.service.resolver.model.impl;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.quarkus.test.junit.QuarkusTest;
-import io.quarkus.test.junit.TestProfile;
-import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile;
-import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
-import io.stargate.sgv2.jsonapi.api.model.command.clause.filter.ComparisonExpression;
-import io.stargate.sgv2.jsonapi.api.model.command.clause.filter.LogicalExpression;
-import io.stargate.sgv2.jsonapi.api.model.command.impl.CountDocumentsCommand;
-import io.stargate.sgv2.jsonapi.service.operation.model.CountOperation;
-import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
-import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase;
-import jakarta.inject.Inject;
-import java.util.List;
-import org.junit.jupiter.api.Test;
-
-@QuarkusTest
-@TestProfile(NoGlobalResourcesTestProfile.Impl.class)
-public class CountCommandResolverTest {
- @Inject ObjectMapper objectMapper;
- @Inject CountDocumentsCommandResolver countCommandResolver;
-
- @Test
- public void noFilterCondition() throws Exception {
- String json =
- """
- {
- "countDocuments": {
-
- }
- }
- """;
-
- CountDocumentsCommand countCommand = objectMapper.readValue(json, CountDocumentsCommand.class);
- final CommandContext commandContext = new CommandContext("namespace", "collection");
- final Operation operation = countCommandResolver.resolveCommand(commandContext, countCommand);
- LogicalExpression implicitAnd = LogicalExpression.and();
- CountOperation expected = new CountOperation(commandContext, implicitAnd);
- assertThat(operation)
- .isInstanceOf(CountOperation.class)
- .satisfies(
- op -> {
- CountOperation countOperation = (CountOperation) op;
- assertThat(countOperation.logicalExpression().getTotalComparisonExpressionCount())
- .isEqualTo(expected.logicalExpression().getTotalComparisonExpressionCount());
- });
- }
-
- @Test
- public void dynamicFilterCondition() throws Exception {
- String json =
- """
- {
- "countDocuments": {
- "filter" : {"col" : "val"}
- }
- }
- """;
-
- CountDocumentsCommand countCommand = objectMapper.readValue(json, CountDocumentsCommand.class);
- final CommandContext commandContext = new CommandContext("namespace", "collection");
- final Operation operation = countCommandResolver.resolveCommand(commandContext, countCommand);
-
- LogicalExpression implicitAnd = LogicalExpression.and();
- implicitAnd.comparisonExpressions.add(new ComparisonExpression(null, null, null));
- implicitAnd
- .comparisonExpressions
- .get(0)
- .setDBFilters(
- List.of(
- new DBFilterBase.TextFilter("col", DBFilterBase.MapFilterBase.Operator.EQ, "val")));
- CountOperation expected = new CountOperation(commandContext, implicitAnd);
- assertThat(operation)
- .isInstanceOf(CountOperation.class)
- .satisfies(
- op -> {
- CountOperation countOperation = (CountOperation) op;
- assertThat(
- countOperation
- .logicalExpression()
- .comparisonExpressions
- .get(0)
- .getDbFilters())
- .isEqualTo(
- expected.logicalExpression().comparisonExpressions.get(0).getDbFilters());
- });
- }
-}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolverTest.java
index faf8fe7f1b..f5eb148278 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolverTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/CountDocumentsCommandResolverTest.java
@@ -9,6 +9,7 @@
import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.impl.CountDocumentsCommand;
+import io.stargate.sgv2.jsonapi.config.OperationsConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.CountOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase;
@@ -24,6 +25,8 @@ class CountDocumentsCommandResolverTest {
@Inject ObjectMapper objectMapper;
@Inject CountDocumentsCommandResolver resolver;
+ @Inject OperationsConfig operationsConfig;
+
@Nested
class ResolveCommand {
@@ -48,6 +51,8 @@ public void noFilter() throws Exception {
op -> {
assertThat(op.commandContext()).isEqualTo(context);
assertThat(op.logicalExpression().comparisonExpressions).isEmpty();
+ assertThat(op.pageSize()).isEqualTo(operationsConfig.defaultCountPageSize());
+ assertThat(op.limit()).isEqualTo(operationsConfig.maxCountLimit());
});
}
@@ -78,6 +83,8 @@ public void withFilter() throws Exception {
assertThat(
op.logicalExpression().comparisonExpressions.get(0).getDbFilters().get(0))
.isEqualTo(expected);
+ assertThat(op.pageSize()).isEqualTo(operationsConfig.defaultCountPageSize());
+ assertThat(op.limit()).isEqualTo(operationsConfig.maxCountLimit());
});
}
}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java b/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java
index 497feb05d3..b39132b7a4 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java
@@ -49,6 +49,17 @@ public int getIndexesPerDBOverride() {
return 100;
}
+ // Test count with count limit as -1 to use cassandra
+ @Override
+ public int getMaxCountLimit() {
+ return 1000;
+ }
+
+ // Setting this to 2 so data read with Pagination
+ public int getCountPageSize() {
+ return 2;
+ }
+
@Override
public Map start() {
Map env = super.start();
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java b/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java
index 2a3c684261..2074c73638 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/testresource/StargateTestResource.java
@@ -79,6 +79,11 @@ public Map start() {
propsBuilder.put(
"stargate.database.limits.indexes-available-per-database",
String.valueOf(getIndexesPerDBOverride()));
+ propsBuilder.put(
+ "stargate.jsonapi.operations.max-count-limit", String.valueOf(getMaxCountLimit()));
+ propsBuilder.put(
+ "stargate.jsonapi.operations.default-count-page-size",
+ String.valueOf(getCountPageSize()));
ImmutableMap props = propsBuilder.build();
props.forEach(System::setProperty);
@@ -261,6 +266,10 @@ private String getAuthToken(String host, int authPort) {
public abstract int getIndexesPerDBOverride();
+ public abstract int getMaxCountLimit();
+
+ public abstract int getCountPageSize();
+
interface Defaults {
String CASSANDRA_IMAGE = "cassandra";
String CASSANDRA_IMAGE_TAG = "4.0.10";
From 1bd3154366e9b4d7999f890d1956cc85adfd3676 Mon Sep 17 00:00:00 2001
From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com>
Date: Thu, 18 Jan 2024 22:16:59 -0500
Subject: [PATCH 2/6] Added test case for count pagination and moreData
---
.../operation/model/impl/CountOperationPage.java | 4 ++++
.../jsonapi/api/v1/CountIntegrationTest.java | 16 ++++++++++++++--
.../jsonapi/testresource/DseTestResource.java | 4 ++--
3 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java
index 0fc71bcec4..47bc6f28c7 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/CountOperationPage.java
@@ -8,6 +8,10 @@
public record CountOperationPage(long count, boolean moreData) implements Supplier {
@Override
public CommandResult get() {
+ if (moreData) {
+ return new CommandResult(
+ Map.of(CommandStatus.COUNTED_DOCUMENT, count(), CommandStatus.MORE_DATA, true));
+ }
return new CommandResult(Map.of(CommandStatus.COUNTED_DOCUMENT, count()));
}
}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java
index 1b942f4bbd..cb62f6e2e9 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java
@@ -104,6 +104,16 @@ public void setUp() {
}
""";
insert(json);
+
+ json =
+ """
+ {
+ "insertOne": {
+ "document": {}
+ }
+ }
+ """;
+ insert(json);
}
private void insert(String json) {
@@ -137,6 +147,7 @@ public void noFilter() {
.then()
.statusCode(200)
.body("status.count", is(5))
+ .body("status.moreData", is(true))
.body("errors", is(nullValue()));
}
@@ -160,6 +171,7 @@ public void emptyOptionsAllowed() {
.then()
.statusCode(200)
.body("status.count", is(5))
+ .body("status.moreData", is(true))
.body("errors", is(nullValue()));
}
@@ -335,7 +347,7 @@ public void withExistFalseOperator() {
.post(CollectionResource.BASE_PATH, namespaceName, collectionName)
.then()
.statusCode(200)
- .body("status.count", is(4))
+ .body("status.count", is(5))
.body("data", is(nullValue()))
.body("errors", is(nullValue()));
}
@@ -759,7 +771,7 @@ public void withNEComparisonOperator() {
.post(CollectionResource.BASE_PATH, namespaceName, collectionName)
.then()
.statusCode(200)
- .body("status.count", is(4))
+ .body("status.count", is(5))
.body("data", is(nullValue()))
.body("errors", is(nullValue()));
}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java b/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java
index b39132b7a4..a64e6f29d5 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/testresource/DseTestResource.java
@@ -49,10 +49,10 @@ public int getIndexesPerDBOverride() {
return 100;
}
- // Test count with count limit as -1 to use cassandra
+ // Test count with count limit as 5 so more data can be tested
@Override
public int getMaxCountLimit() {
- return 1000;
+ return 5;
}
// Setting this to 2 so data read with Pagination
From 4cbfefdb62e859276a2c2b0cfd4117f5912ef8d2 Mon Sep 17 00:00:00 2001
From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com>
Date: Thu, 18 Jan 2024 22:44:32 -0500
Subject: [PATCH 3/6] Added configuration
---
CONFIGURATION.md | 3 +++
.../java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java | 1 -
2 files changed, 3 insertions(+), 1 deletion(-)
diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index a521344d9e..e661ff327f 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -52,6 +52,9 @@ Here are some Stargate-relevant property groups that are necessary for correct s
| `stargate.jsonapi.operations.lwt.retries` | `int` | `3` | The amount of client side retries in case of a LWT failure. |
| `stargate.jsonapi.operations.database-config.session-cache-ttl-seconds` | `int` | `300` | The amount of seconds that the cql session will be kept in memory after last access. |
| `stargate.jsonapi.operations.database-config.session-cache-max-size` | `int` | `50` | The maximum number of cql sessions that will be kept in memory. |
+| `stargate.jsonapi.operations.default-count-page-size` | `int` | `100` | The default Cassandra page size used for read queries that are used for counting by key purposes. |
+| `stargate.jsonapi.operations.max-count-limit` | `int` | `1000` | The default max count response when resolved using keys. Set to -1 use cassandra count function. |
+
## Jsonapi metering configuration
*Configuration for jsonapi metering, defined by [JsonApiMetricsConfig.java](io/stargate/sgv2/jsonapi/api/v1/metrics/JsonApiMetricsConfig.java).*
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
index 84f120fa8a..e3d0bbed39 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
@@ -105,7 +105,6 @@ public interface OperationsConfig {
* Cassandra's count function. Default is 1000 1000
command.
*/
@Max(10000)
- @Positive
@WithDefault("1000")
int maxCountLimit();
From c7b1cd19da228895db2978259231cf9ddc6dbbaf Mon Sep 17 00:00:00 2001
From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com>
Date: Fri, 19 Jan 2024 09:52:31 -0500
Subject: [PATCH 4/6] Added create ns and collection
---
.../sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
index 3edf664a4a..bbbc850932 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
@@ -44,6 +44,8 @@ class Count {
@Test
@Order(1)
public void setUp() {
+ createNamespace();
+ createSimpleCollection();
String json =
"""
{
From 9f2b143e91c220f96ff96e0f095aff8acfa078b9 Mon Sep 17 00:00:00 2001
From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com>
Date: Fri, 19 Jan 2024 10:30:11 -0500
Subject: [PATCH 5/6] Added create ns and collection
---
.../jsonapi/api/v1/CountByCassandraIntegrationTest.java | 9 ---------
1 file changed, 9 deletions(-)
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
index bbbc850932..fe8020bde9 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountByCassandraIntegrationTest.java
@@ -804,13 +804,4 @@ public void byBooleanColumn() {
.body("errors", is(nullValue()));
}
}
-
- @Nested
- @Order(2)
- class Metrics {
- @Test
- public void checkMetrics() {
- CountByCassandraIntegrationTest.super.checkMetrics("CountDocumentsCommand");
- }
- }
}
From c39f3b92c70fb47f35134b1bf03aae1837bdf9f6 Mon Sep 17 00:00:00 2001
From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com>
Date: Fri, 19 Jan 2024 13:46:24 -0500
Subject: [PATCH 6/6] Changes based on review comments
---
CONFIGURATION.md | 4 ++--
.../io/stargate/sgv2/jsonapi/config/OperationsConfig.java | 8 ++++----
.../io/stargate/sgv2/jsonapi/exception/ErrorCode.java | 1 +
.../jsonapi/service/operation/model/CountOperation.java | 5 +----
.../jsonapi/service/operation/model/ReadOperation.java | 2 +-
.../sgv2/jsonapi/api/v1/CountIntegrationTest.java | 4 ++++
6 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index e661ff327f..febfe7c056 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -52,8 +52,8 @@ Here are some Stargate-relevant property groups that are necessary for correct s
| `stargate.jsonapi.operations.lwt.retries` | `int` | `3` | The amount of client side retries in case of a LWT failure. |
| `stargate.jsonapi.operations.database-config.session-cache-ttl-seconds` | `int` | `300` | The amount of seconds that the cql session will be kept in memory after last access. |
| `stargate.jsonapi.operations.database-config.session-cache-max-size` | `int` | `50` | The maximum number of cql sessions that will be kept in memory. |
-| `stargate.jsonapi.operations.default-count-page-size` | `int` | `100` | The default Cassandra page size used for read queries that are used for counting by key purposes. |
-| `stargate.jsonapi.operations.max-count-limit` | `int` | `1000` | The default max count response when resolved using keys. Set to -1 use cassandra count function. |
+| `stargate.jsonapi.operations.default-count-page-size` | `int` | `100` | The default Cassandra page size used for reading keys for count command. |
+| `stargate.jsonapi.operations.max-count-limit` | `int` | `1000` | The default maximum number of rows to read for count operation. |
## Jsonapi metering configuration
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
index e3d0bbed39..2612ac4b3d 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
@@ -102,15 +102,15 @@ public interface OperationsConfig {
/**
* @return Maximum size of keys read from database to return count, Setting it to -1 will use
- * Cassandra's count function. Default is 1000 1000
command.
+ * Cassandra's count function. Default is 1000
.
*/
- @Max(10000)
@WithDefault("1000")
int maxCountLimit();
/**
- * @return Defines the default page size for count operation, having separate config because count
- * will more keys per page, defaults to 100
.
+ * @return Defines the default page size for count operation, having separate from
+ * `defaultPageSize` config because count will read more keys per page, defaults to 100
+ *
.
*/
@Max(500)
@Positive
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java b/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java
index 9d0044283c..bb3d0a203a 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java
@@ -3,6 +3,7 @@
/** ErrorCode is our internal enum that provides codes and a default message for that error code. */
public enum ErrorCode {
/** Command error codes. */
+ COUNT_READ_FAILED("Unable to count documents"),
COMMAND_NOT_IMPLEMENTED("The provided command is not implemented."),
COMMAND_ACCEPTS_NO_OPTIONS("Command accepts no options"),
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
index 0dcd6c624f..912eae48c3 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
@@ -40,10 +40,7 @@ public Uni> execute(QueryExecutor queryExecutor) {
} else {
boolean moreData = docs.count() > limit();
return new CountOperationPage(
- docs.count() == 0
- ? 0
- : docs.count() > limit() ? docs.count() - 1 : docs.count(),
- moreData);
+ docs.count() > limit() ? docs.count() - 1 : docs.count(), moreData);
}
});
}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
index b80b0db60b..edf20cec3b 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
@@ -395,7 +395,7 @@ default Uni countDocuments(
private void getCount(AsyncResultSet rs, Throwable error, AtomicLong counter) {
if (error != null) {
- throw new RuntimeException(error);
+ throw new JsonApiException(ErrorCode.COUNT_READ_FAILED);
} else {
counter.addAndGet(rs.remaining());
if (rs.hasMorePages()) {
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java
index cb62f6e2e9..198dd5e348 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/CountIntegrationTest.java
@@ -21,6 +21,10 @@
@QuarkusIntegrationTest
@QuarkusTestResource(DseTestResource.class)
@TestClassOrder(ClassOrderer.OrderAnnotation.class)
+/**
+ * To run this test DseTestResource is updated to have maxCountLimit to `5` and getCountPageSize to
+ * 2 so pagination and moreData flag can be tested
+ */
public class CountIntegrationTest extends AbstractCollectionIntegrationTestBase {
@Nested
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)