diff --git a/.github/workflows/continuous-integration.yaml b/.github/workflows/continuous-integration.yaml
index eddfc2562f..d074e7bf6a 100644
--- a/.github/workflows/continuous-integration.yaml
+++ b/.github/workflows/continuous-integration.yaml
@@ -6,7 +6,7 @@ name: Continuous Integration
# * manual trigger
on:
push:
- branches: [ "main" ]
+ branches: [ "main"]
pull_request:
branches: [ "main" ]
diff --git a/pom.xml b/pom.xml
index 8e5bfcfee0..fde19d48fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,26 @@
io.quarkus
quarkus-rest-client-reactive-jackson
+
+ com.datastax.oss
+ java-driver-core
+ ${driver.version}
+
+
+ io.dropwizard.metrics
+ metrics-core
+
+
+ org.hdrhistogram
+ HdrHistogram
+
+
+
+
+ com.datastax.oss
+ java-driver-metrics-micrometer
+ ${driver.version}
+
io.quarkus
quarkus-junit5
@@ -103,11 +123,6 @@
assertj-core
test
-
- com.datastax.oss
- java-driver-core
- test
-
io.stargate
sgv2-quarkus-common
diff --git a/src/main/java/com/github/benmanes/caffeine/cache/ReflectionRegistration.java b/src/main/java/com/github/benmanes/caffeine/cache/ReflectionRegistration.java
new file mode 100644
index 0000000000..0325fa3672
--- /dev/null
+++ b/src/main/java/com/github/benmanes/caffeine/cache/ReflectionRegistration.java
@@ -0,0 +1,8 @@
+package com.github.benmanes.caffeine.cache;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection(targets = {com.github.benmanes.caffeine.cache.PSAMS.class})
+public class ReflectionRegistration {
+ // This class is used only for annotation processing during build
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java b/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java
index c07115b836..a384cca9f0 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java
@@ -4,7 +4,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.stargate.sgv2.jsonapi.api.model.command.clause.sort.SortClause;
import io.stargate.sgv2.jsonapi.api.model.command.clause.update.UpdateClause;
-import io.stargate.sgv2.jsonapi.service.bridge.executor.CollectionSettings;
+import io.stargate.sgv2.jsonapi.service.cqldriver.executor.CollectionSettings;
import io.stargate.sgv2.jsonapi.service.embedding.DataVectorizer;
import io.stargate.sgv2.jsonapi.service.embedding.operation.EmbeddingService;
import java.util.List;
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java b/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java
index 6863ee5726..94483526e1 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java
@@ -20,7 +20,7 @@
import io.stargate.sgv2.jsonapi.config.constants.OpenApiConstants;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.exception.mappers.ThrowableCommandResultSupplier;
-import io.stargate.sgv2.jsonapi.service.bridge.executor.SchemaCache;
+import io.stargate.sgv2.jsonapi.service.cqldriver.executor.SchemaCache;
import io.stargate.sgv2.jsonapi.service.embedding.operation.EmbeddingService;
import io.stargate.sgv2.jsonapi.service.embedding.operation.EmbeddingServiceCache;
import io.stargate.sgv2.jsonapi.service.processor.MeteredCommandProcessor;
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/config/ConsistencyLevelConverter.java b/src/main/java/io/stargate/sgv2/jsonapi/config/ConsistencyLevelConverter.java
new file mode 100644
index 0000000000..7f2805e9a3
--- /dev/null
+++ b/src/main/java/io/stargate/sgv2/jsonapi/config/ConsistencyLevelConverter.java
@@ -0,0 +1,20 @@
+package io.stargate.sgv2.jsonapi.config;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
+import org.eclipse.microprofile.config.spi.Converter;
+
+/**
+ * Converts a string to a {@link ConsistencyLevel}, used in {@link OperationsConfig.QueriesConfig}.
+ */
+public class ConsistencyLevelConverter implements Converter {
+ /**
+ * @param value the string representation of a property value
+ * @return the converted ConsistencyLevel
+ */
+ @Override
+ public ConsistencyLevel convert(String value)
+ throws IllegalArgumentException, NullPointerException {
+ return DefaultConsistencyLevel.valueOf(value.toUpperCase());
+ }
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
index d9d28dd9a5..edcab068e3 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/config/OperationsConfig.java
@@ -17,12 +17,18 @@
package io.stargate.sgv2.jsonapi.config;
+import static io.stargate.sgv2.jsonapi.service.cqldriver.CQLSessionCache.CASSANDRA;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithConverter;
import io.smallrye.config.WithDefault;
import jakarta.validation.Valid;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;
+import java.util.List;
+import javax.annotation.Nullable;
/** Configuration for the operation execution. */
@ConfigMapping(prefix = "stargate.jsonapi.operations")
@@ -109,4 +115,93 @@ interface LwtConfig {
@WithDefault("3")
int retries();
}
+
+ /** Cassandra/AstraDB related configurations. */
+ @NotNull
+ @Valid
+ DatabaseConfig databaseConfig();
+
+ interface DatabaseConfig {
+
+ /** Database type can be cassandra
or astra
. */
+ @WithDefault(CASSANDRA)
+ String type();
+
+ /** Username when connecting to cassandra database (when type is cassandra
) */
+ @Nullable
+ @WithDefault("cassandra")
+ String userName();
+
+ /** Password when connecting to cassandra database (when type is cassandra
) */
+ @Nullable
+ @WithDefault("cassandra")
+ String password();
+
+ /** Fixed Token used for Integration Test authentication */
+ @Nullable
+ @WithDefault("not in tests")
+ String fixedToken();
+
+ /** Cassandra contact points (when type is cassandra
) */
+ @Nullable
+ @WithDefault("127.0.0.1")
+ List cassandraEndPoints();
+
+ /** Cassandra contact points (when type is cassandra
) */
+ @Nullable
+ @WithDefault("9042")
+ int cassandraPort();
+
+ /** Local datacenter that the driver must be configured with */
+ @NotNull
+ @WithDefault("datacenter1")
+ String localDatacenter();
+
+ /** Time to live for CQLSession in cache in seconds. */
+ @WithDefault("300")
+ long sessionCacheTtlSeconds();
+
+ /** Maximum number of CQLSessions in cache. */
+ @WithDefault("100")
+ long sessionCacheMaxSize();
+ }
+
+ /** Query consistency related configs. */
+ @NotNull
+ @Valid
+ QueriesConfig queriesConfig();
+
+ interface QueriesConfig {
+
+ /** @return Settings for the consistency level. */
+ @Valid
+ ConsistencyConfig consistency();
+
+ /** @return Serial Consistency for queries. */
+ @WithDefault("SERIAL")
+ @WithConverter(ConsistencyLevelConverter.class)
+ ConsistencyLevel serialConsistency();
+
+ /** @return Settings for the consistency level. */
+ interface ConsistencyConfig {
+
+ /** @return Consistency for queries making schema changes. */
+ @WithDefault("LOCAL_QUORUM")
+ @NotNull
+ @WithConverter(ConsistencyLevelConverter.class)
+ ConsistencyLevel schemaChanges();
+
+ /** @return Consistency for queries writing the data. */
+ @WithDefault("LOCAL_QUORUM")
+ @NotNull
+ @WithConverter(ConsistencyLevelConverter.class)
+ ConsistencyLevel writes();
+
+ /** @return Consistency for queries reading the data. */
+ @WithDefault("LOCAL_QUORUM")
+ @NotNull
+ @WithConverter(ConsistencyLevelConverter.class)
+ ConsistencyLevel reads();
+ }
+ }
}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/exception/JsonApiException.java b/src/main/java/io/stargate/sgv2/jsonapi/exception/JsonApiException.java
index 88526d9b99..f67784ab27 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/exception/JsonApiException.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/exception/JsonApiException.java
@@ -63,17 +63,15 @@ public CommandResult get() {
public CommandResult.Error getCommandResultError(String message) {
Map fieldsForMetricsTag =
Map.of("errorCode", errorCode.name(), "exceptionClass", this.getClass().getSimpleName());
- Map fields = null;
SmallRyeConfig config = ConfigProvider.getConfig().unwrap(SmallRyeConfig.class);
// enable debug mode for unit tests, since it can not be injected
DebugModeConfig debugModeConfig = config.getConfigMapping(DebugModeConfig.class);
final boolean debugEnabled = debugModeConfig.enabled();
- if (debugEnabled) {
- fields =
- Map.of("errorCode", errorCode.name(), "exceptionClass", this.getClass().getSimpleName());
- } else {
- fields = Map.of("errorCode", errorCode.name());
- }
+ final Map fields =
+ debugEnabled
+ ? Map.of(
+ "errorCode", errorCode.name(), "exceptionClass", this.getClass().getSimpleName())
+ : Map.of("errorCode", errorCode.name());
return new CommandResult.Error(message, fieldsForMetricsTag, fields, Response.Status.OK);
}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/exception/mappers/ThrowableToErrorMapper.java b/src/main/java/io/stargate/sgv2/jsonapi/exception/mappers/ThrowableToErrorMapper.java
index 2cf775cfb5..d684477267 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/exception/mappers/ThrowableToErrorMapper.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/exception/mappers/ThrowableToErrorMapper.java
@@ -1,7 +1,15 @@
package io.stargate.sgv2.jsonapi.exception.mappers;
+import com.datastax.oss.driver.api.core.AllNodesFailedException;
+import com.datastax.oss.driver.api.core.DriverException;
+import com.datastax.oss.driver.api.core.DriverTimeoutException;
+import com.datastax.oss.driver.api.core.NoNodeAvailableException;
+import com.datastax.oss.driver.api.core.NodeUnavailableException;
+import com.datastax.oss.driver.api.core.servererrors.QueryValidationException;
+import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.quarkus.security.UnauthorizedException;
import io.smallrye.config.SmallRyeConfig;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.config.DebugModeConfig;
@@ -17,7 +25,6 @@
* implementation.
*/
public final class ThrowableToErrorMapper {
-
private static final BiFunction MAPPER_WITH_MESSAGE =
(throwable, message) -> {
// if our own exception, shortcut
@@ -28,14 +35,17 @@ public final class ThrowableToErrorMapper {
SmallRyeConfig config = ConfigProvider.getConfig().unwrap(SmallRyeConfig.class);
DebugModeConfig debugModeConfig = config.getConfigMapping(DebugModeConfig.class);
final boolean debugEnabled = debugModeConfig.enabled();
+ final Map fields =
+ debugEnabled ? Map.of("exceptionClass", throwable.getClass().getSimpleName()) : null;
+ final Map fieldsForMetricsTag =
+ Map.of("exceptionClass", throwable.getClass().getSimpleName());
if (throwable instanceof StatusRuntimeException sre) {
- Map fields =
- debugEnabled ? Map.of("exceptionClass", throwable.getClass().getSimpleName()) : null;
- Map fieldsForMetricsTag =
- Map.of("exceptionClass", throwable.getClass().getSimpleName());
if (sre.getStatus().getCode() == Status.Code.UNAUTHENTICATED) {
return new CommandResult.Error(
- message, fieldsForMetricsTag, fields, Response.Status.UNAUTHORIZED);
+ "UNAUTHENTICATED: Invalid token",
+ fieldsForMetricsTag,
+ fields,
+ Response.Status.UNAUTHORIZED);
} else if (sre.getStatus().getCode() == Status.Code.INTERNAL) {
return new CommandResult.Error(
message, fieldsForMetricsTag, fields, Response.Status.INTERNAL_SERVER_ERROR);
@@ -47,11 +57,30 @@ public final class ThrowableToErrorMapper {
message, fieldsForMetricsTag, fields, Response.Status.GATEWAY_TIMEOUT);
}
}
- // add error code as error field
- Map fields =
- debugEnabled ? Map.of("exceptionClass", throwable.getClass().getSimpleName()) : null;
- Map fieldsForMetricsTag =
- Map.of("exceptionClass", throwable.getClass().getSimpleName());
+ if (throwable instanceof UnauthorizedException
+ || throwable
+ instanceof com.datastax.oss.driver.api.core.servererrors.UnauthorizedException) {
+ return new CommandResult.Error(
+ "UNAUTHENTICATED: Invalid token",
+ fieldsForMetricsTag,
+ fields,
+ Response.Status.UNAUTHORIZED);
+ } else if (throwable instanceof QueryValidationException) {
+ if (message.contains("vector executeRead(
- QueryOuterClass.Query query, Optional pageState, int pageSize) {
- QueryOuterClass.Consistency consistency = queriesConfig.consistency().reads();
- QueryOuterClass.ConsistencyValue.Builder consistencyValue =
- QueryOuterClass.ConsistencyValue.newBuilder().setValue(consistency);
- QueryOuterClass.QueryParameters.Builder params =
- QueryOuterClass.QueryParameters.newBuilder().setConsistency(consistencyValue);
- if (pageState.isPresent()) {
- params.setPagingState(BytesValue.of(ByteString.copyFrom(decodeBase64(pageState.get()))));
- }
-
- params.setPageSize(Int32Value.of(pageSize));
- return queryBridge(
- QueryOuterClass.Query.newBuilder(query).setParameters(params).buildPartial());
- }
-
- /**
- * Runs the provided write document query, Updates the query with parameters
- *
- * @param query write query to be executed
- * @return proto result set
- */
- public Uni executeWrite(QueryOuterClass.Query query) {
- QueryOuterClass.Consistency consistency = queriesConfig.consistency().writes();
- QueryOuterClass.ConsistencyValue.Builder consistencyValue =
- QueryOuterClass.ConsistencyValue.newBuilder().setValue(consistency);
- QueryOuterClass.Consistency serialConsistency = queriesConfig.serialConsistency();
- QueryOuterClass.ConsistencyValue.Builder serialConsistencyValue =
- QueryOuterClass.ConsistencyValue.newBuilder().setValue(serialConsistency);
- QueryOuterClass.QueryParameters.Builder params =
- QueryOuterClass.QueryParameters.newBuilder()
- .setConsistency(consistencyValue)
- .setSerialConsistency(serialConsistencyValue);
- return queryBridge(
- QueryOuterClass.Query.newBuilder(query).setParameters(params).buildPartial());
- }
-
- /**
- * Runs the provided schema change query like create collection, Updates the query with parameters
- *
- * @param query schema change query to be executed
- * @return proto result set
- */
- public Uni executeSchemaChange(QueryOuterClass.Query query) {
- QueryOuterClass.Consistency consistency = queriesConfig.consistency().schemaChanges();
- QueryOuterClass.ConsistencyValue.Builder consistencyValue =
- QueryOuterClass.ConsistencyValue.newBuilder().setValue(consistency);
- QueryOuterClass.QueryParameters.Builder params =
- QueryOuterClass.QueryParameters.newBuilder().setConsistency(consistencyValue);
- return queryBridge(
- QueryOuterClass.Query.newBuilder(query).setParameters(params).buildPartial());
- }
-
- private Uni queryBridge(QueryOuterClass.Query query) {
-
- // execute
- return stargateRequestInfo
- .getStargateBridge()
- .executeQuery(query)
- .map(
- response -> {
- QueryOuterClass.ResultSet resultSet = response.getResultSet();
- return resultSet;
- })
- .onFailure()
- .invoke(
- failure -> {
- logger.error("Error on bridge ", failure);
- });
- }
-
- /**
- * Gets the schema for the provided namespace and collection name
- *
- * @param namespace
- * @param collectionName
- * @return
- */
- protected Uni> getSchema(String namespace, String collectionName) {
- Schema.DescribeKeyspaceQuery describeKeyspaceQuery =
- Schema.DescribeKeyspaceQuery.newBuilder().setKeyspaceName(namespace).build();
- final Uni cqlKeyspaceDescribeUni =
- stargateRequestInfo.getStargateBridge().describeKeyspace(describeKeyspaceQuery);
- return cqlKeyspaceDescribeUni
- .onItemOrFailure()
- .transformToUni(
- (cqlKeyspaceDescribe, error) -> {
- if (error != null
- && (error instanceof StatusRuntimeException sre
- && sre.getStatus().getCode() == Status.Code.NOT_FOUND)) {
- return Uni.createFrom()
- .failure(
- new RuntimeException(
- new JsonApiException(
- ErrorCode.NAMESPACE_DOES_NOT_EXIST,
- "The provided namespace does not exist: " + namespace)));
- }
- Schema.CqlTable cqlTable = null;
- return Uni.createFrom()
- .item(
- cqlKeyspaceDescribe.getTablesList().stream()
- .filter(table -> table.getName().equals(collectionName))
- .findFirst());
- });
- }
-
- private static byte[] decodeBase64(String base64encoded) {
- return Base64.getDecoder().decode(base64encoded);
- }
-}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CQLSessionCache.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CQLSessionCache.java
new file mode 100644
index 0000000000..48b95b072d
--- /dev/null
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/CQLSessionCache.java
@@ -0,0 +1,198 @@
+package io.stargate.sgv2.jsonapi.service.cqldriver;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import io.quarkus.security.UnauthorizedException;
+import io.stargate.sgv2.api.common.StargateRequestInfo;
+import io.stargate.sgv2.jsonapi.JsonApiStartUp;
+import io.stargate.sgv2.jsonapi.config.OperationsConfig;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CQL session cache to reuse the session for the same tenant and token. The cache is configured to
+ * expire after CACHE_TTL_SECONDS
of inactivity and to have a maximum size of
+ * CACHE_TTL_SECONDS
sessions.
+ */
+@ApplicationScoped
+public class CQLSessionCache {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JsonApiStartUp.class);
+
+ /** Configuration for the JSON API operations. */
+ private final OperationsConfig operationsConfig;
+
+ /** Stargate request info. */
+ @Inject StargateRequestInfo stargateRequestInfo;
+
+ /**
+ * Default tenant to be used when the backend is OSS cassandra and when no tenant is passed in the
+ * request
+ */
+ private static final String DEFAULT_TENANT = "default_tenant";
+ /** CQL username to be used when the backend is AstraDB */
+ private static final String TOKEN = "token";
+
+ /** CQLSession cache. */
+ private final Cache sessionCache;
+
+ public static final String ASTRA = "astra";
+ public static final String CASSANDRA = "cassandra";
+ /** Default token property name which will be used by the integration tests */
+ @Inject
+ public CQLSessionCache(OperationsConfig operationsConfig) {
+ this.operationsConfig = operationsConfig;
+ sessionCache =
+ Caffeine.newBuilder()
+ .expireAfterAccess(
+ Duration.ofSeconds(operationsConfig.databaseConfig().sessionCacheTtlSeconds()))
+ .maximumSize(operationsConfig.databaseConfig().sessionCacheMaxSize())
+ .evictionListener(
+ (RemovalListener)
+ (sessionCacheKey, session, cause) -> {
+ if (sessionCacheKey != null) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "Removing session for tenant : {}", sessionCacheKey.tenantId);
+ }
+ }
+ if (session != null) {
+ session.close();
+ }
+ })
+ .build();
+ LOGGER.info(
+ "CQLSessionCache initialized with ttl of {} seconds and max size of {}",
+ operationsConfig.databaseConfig().sessionCacheTtlSeconds(),
+ operationsConfig.databaseConfig().sessionCacheMaxSize());
+ }
+
+ /**
+ * Loader for new CQLSession.
+ *
+ * @return CQLSession
+ * @throws RuntimeException if database type is not supported
+ */
+ private CqlSession getNewSession(SessionCacheKey cacheKey) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Creating new session for tenant : {}", cacheKey.tenantId);
+ }
+ OperationsConfig.DatabaseConfig databaseConfig = operationsConfig.databaseConfig();
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Database type: {}", databaseConfig.type());
+ }
+ if (CASSANDRA.equals(databaseConfig.type())) {
+ List seeds =
+ Objects.requireNonNull(operationsConfig.databaseConfig().cassandraEndPoints()).stream()
+ .map(
+ host ->
+ new InetSocketAddress(
+ host, operationsConfig.databaseConfig().cassandraPort()))
+ .collect(Collectors.toList());
+
+ return new TenantAwareCqlSessionBuilder(
+ stargateRequestInfo.getTenantId().orElse(DEFAULT_TENANT))
+ .withLocalDatacenter(operationsConfig.databaseConfig().localDatacenter())
+ .addContactPoints(seeds)
+ .withAuthCredentials(
+ Objects.requireNonNull(databaseConfig.userName()),
+ Objects.requireNonNull(databaseConfig.password()))
+ .build();
+ } else if (ASTRA.equals(databaseConfig.type())) {
+ return new TenantAwareCqlSessionBuilder(stargateRequestInfo.getTenantId().orElseThrow())
+ .withAuthCredentials(
+ TOKEN, Objects.requireNonNull(stargateRequestInfo.getCassandraToken().orElseThrow()))
+ .withLocalDatacenter(operationsConfig.databaseConfig().localDatacenter())
+ .build();
+ }
+ throw new RuntimeException("Unsupported database type: " + databaseConfig.type());
+ }
+
+ /**
+ * Get CQLSession from cache.
+ *
+ * @return CQLSession
+ */
+ public CqlSession getSession() {
+ String fixedToken;
+ if (!(fixedToken = getFixedToken()).equals("not in test")
+ && !stargateRequestInfo.getCassandraToken().orElseThrow().equals(fixedToken)) {
+ throw new UnauthorizedException("Unauthorized");
+ }
+ return sessionCache.get(getSessionCacheKey(), this::getNewSession);
+ }
+
+ /**
+ * Default token which will be used by the integration tests. If this property is set, then the
+ * token from the request will be compared with this to perform authentication.
+ */
+ private String getFixedToken() {
+ return operationsConfig.databaseConfig().fixedToken();
+ }
+
+ /**
+ * Build key for CQLSession cache from tenant and token if the database type is AstraDB or from
+ * tenant, username and password if the database type is OSS cassandra (also, if token is present
+ * in the request, that will be given priority for the cache key).
+ *
+ * @return key for CQLSession cache
+ */
+ private SessionCacheKey getSessionCacheKey() {
+ switch (operationsConfig.databaseConfig().type()) {
+ case CASSANDRA -> {
+ if (stargateRequestInfo.getCassandraToken().isPresent()) {
+ return new SessionCacheKey(
+ stargateRequestInfo.getTenantId().orElse(DEFAULT_TENANT),
+ new TokenCredentials(stargateRequestInfo.getCassandraToken().orElseThrow()));
+ }
+ return new SessionCacheKey(
+ stargateRequestInfo.getTenantId().orElse(DEFAULT_TENANT),
+ new UsernamePasswordCredentials(
+ operationsConfig.databaseConfig().userName(),
+ operationsConfig.databaseConfig().password()));
+ }
+ case ASTRA -> {
+ return new SessionCacheKey(
+ stargateRequestInfo.getTenantId().orElseThrow(),
+ new TokenCredentials(stargateRequestInfo.getCassandraToken().orElseThrow()));
+ }
+ }
+ throw new RuntimeException(
+ "Unsupported database type: " + operationsConfig.databaseConfig().type());
+ }
+
+ /**
+ * Key for CQLSession cache.
+ *
+ * @param tenantId tenant id
+ * @param credentials credentials (username/password or token)
+ */
+ private record SessionCacheKey(String tenantId, Credentials credentials) {}
+
+ /**
+ * Credentials for CQLSession cache when username and password is provided.
+ *
+ * @param userName
+ * @param password
+ */
+ private record UsernamePasswordCredentials(String userName, String password)
+ implements Credentials {}
+
+ /**
+ * Credentials for CQLSession cache when token is provided.
+ *
+ * @param token
+ */
+ private record TokenCredentials(String token) implements Credentials {}
+
+ /** A marker interface for credentials. */
+ private interface Credentials {}
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/TenantAwareCqlSessionBuilder.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/TenantAwareCqlSessionBuilder.java
new file mode 100644
index 0000000000..6862e7ddb8
--- /dev/null
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/TenantAwareCqlSessionBuilder.java
@@ -0,0 +1,88 @@
+package io.stargate.sgv2.jsonapi.service.cqldriver;
+
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
+import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
+import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
+import java.util.Map;
+
+/**
+ * This is an extension of the {@link CqlSessionBuilder} that allows to pass a tenant ID to the
+ * CQLSession via TenantAwareDriverContext which is an extension of the {@link DefaultDriverContext}
+ * that adds the tenant ID to the startup options.
+ */
+public class TenantAwareCqlSessionBuilder extends CqlSessionBuilder {
+ /**
+ * Property key that will be used to pass the tenant ID to the CQLSession via
+ * TenantAwareDriverContext
+ */
+ private static final String TENANT_ID_PROPERTY_KEY = "TENANT_ID";
+ /** Tenant ID that will be passed to the CQLSession via TenantAwareDriverContext */
+ private final String tenantId;
+
+ /**
+ * Constructor that takes the tenant ID as a parameter
+ *
+ * @param tenantId tenant id or database id
+ */
+ public TenantAwareCqlSessionBuilder(String tenantId) {
+ if (tenantId == null || tenantId.isEmpty()) {
+ throw new RuntimeException("Tenant ID cannot be null or empty");
+ }
+ this.tenantId = tenantId;
+ }
+
+ /**
+ * Overridden method that builds the custom driver context
+ *
+ * @param configLoader configuration loader
+ * @param programmaticArguments programmatic arguments
+ * @return custom driver context
+ */
+ @Override
+ protected DriverContext buildContext(
+ DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
+ return new TenantAwareDriverContext(tenantId, configLoader, programmaticArguments);
+ }
+
+ /**
+ * This is an extension of the {@link DefaultDriverContext} that adds the tenant ID to the startup
+ * options.
+ */
+ public static class TenantAwareDriverContext extends DefaultDriverContext {
+ /** Tenant ID that will be added to the startup options */
+ private final String tenantId;
+
+ /**
+ * Constructor that takes the tenant ID as a parameter
+ *
+ * @param tenantId tenant id or database id
+ * @param configLoader configuration loader
+ * @param programmaticArguments programmatic arguments
+ */
+ public TenantAwareDriverContext(
+ String tenantId,
+ DriverConfigLoader configLoader,
+ ProgrammaticArguments programmaticArguments) {
+ super(configLoader, programmaticArguments);
+ this.tenantId = tenantId;
+ }
+
+ /**
+ * Overridden method that adds the tenant ID to the startup options with the key {@link
+ * TenantAwareCqlSessionBuilder#TENANT_ID_PROPERTY_KEY}
+ *
+ * @return startup options
+ */
+ @Override
+ protected Map buildStartupOptions() {
+ Map existing = super.buildStartupOptions();
+ return NullAllowingImmutableMap.builder(existing.size() + 1)
+ .putAll(existing)
+ .put(TENANT_ID_PROPERTY_KEY, tenantId)
+ .build();
+ }
+ }
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/CollectionSettings.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/CollectionSettings.java
similarity index 75%
rename from src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/CollectionSettings.java
rename to src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/CollectionSettings.java
index e8fd0c483e..6c0a605e7c 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/CollectionSettings.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/CollectionSettings.java
@@ -1,15 +1,19 @@
-package io.stargate.sgv2.jsonapi.service.bridge.executor;
+package io.stargate.sgv2.jsonapi.service.cqldriver.executor;
import static io.stargate.sgv2.jsonapi.exception.ErrorCode.VECTORIZECONFIG_CHECK_FAIL;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
+import com.datastax.oss.driver.api.core.metadata.schema.IndexMetadata;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+import com.datastax.oss.driver.api.core.type.VectorType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import io.stargate.bridge.proto.QueryOuterClass;
-import io.stargate.bridge.proto.Schema;
import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants;
import io.stargate.sgv2.jsonapi.exception.ErrorCode;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
+import java.util.Map;
import java.util.Optional;
/**
@@ -54,34 +58,33 @@ public static SimilarityFunction fromString(String similarityFunction) {
}
public static CollectionSettings getCollectionSettings(
- Schema.CqlTable table, ObjectMapper objectMapper) {
- String collectionName = table.getName();
- final Optional first =
- table.getColumnsList().stream()
- .filter(
- c -> c.getName().equals(DocumentConstants.Fields.VECTOR_SEARCH_INDEX_COLUMN_NAME))
- .findFirst();
- boolean vectorEnabled = first.isPresent();
+ TableMetadata table, ObjectMapper objectMapper) {
+ String collectionName = table.getName().asCql(true);
+ // get vector column
+ final Optional vectorColumn =
+ table.getColumn(DocumentConstants.Fields.VECTOR_SEARCH_INDEX_COLUMN_NAME);
+ boolean vectorEnabled = vectorColumn.isPresent();
+ // if vector column exist
if (vectorEnabled) {
- final int vectorSize = first.get().getType().getVector().getSize();
- final Optional vectorIndex =
- table.getIndexesList().stream()
- .filter(
- i ->
- i.getColumnName()
- .equals(DocumentConstants.Fields.VECTOR_SEARCH_INDEX_COLUMN_NAME))
- .findFirst();
+ final int vectorSize = ((VectorType) vectorColumn.get().getType()).getDimensions();
+ // get vector index
+ IndexMetadata vectorIndex = null;
+ Map indexMap = table.getIndexes();
+ for (CqlIdentifier key : indexMap.keySet()) {
+ if (key.asInternal().endsWith(DocumentConstants.Fields.VECTOR_SEARCH_INDEX_COLUMN_NAME)) {
+ vectorIndex = indexMap.get(key);
+ break;
+ }
+ }
+ // default function
CollectionSettings.SimilarityFunction function = CollectionSettings.SimilarityFunction.COSINE;
- if (vectorIndex.isPresent()) {
+ if (vectorIndex != null) {
final String functionName =
- vectorIndex
- .get()
- .getOptionsMap()
- .get(DocumentConstants.Fields.VECTOR_INDEX_FUNCTION_NAME);
+ vectorIndex.getOptions().get(DocumentConstants.Fields.VECTOR_INDEX_FUNCTION_NAME);
if (functionName != null)
function = CollectionSettings.SimilarityFunction.fromString(functionName);
}
- final String comment = table.getOptionsOrDefault("comment", null);
+ final String comment = (String) table.getOptions().get(CqlIdentifier.fromCql("comment"));
if (comment != null && !comment.isBlank()) {
return createCollectionSettingsFromJson(
collectionName, vectorEnabled, vectorSize, function, comment, objectMapper);
@@ -89,7 +92,7 @@ public static CollectionSettings getCollectionSettings(
return new CollectionSettings(
collectionName, vectorEnabled, vectorSize, function, null, null);
}
- } else {
+ } else { // if not vector collection
return new CollectionSettings(
collectionName,
vectorEnabled,
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/NamespaceCache.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCache.java
similarity index 98%
rename from src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/NamespaceCache.java
rename to src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCache.java
index b3e6271de9..202eaf1bf9 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/NamespaceCache.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCache.java
@@ -1,4 +1,4 @@
-package io.stargate.sgv2.jsonapi.service.bridge.executor;
+package io.stargate.sgv2.jsonapi.service.cqldriver.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java
new file mode 100644
index 0000000000..8d1056e2d3
--- /dev/null
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java
@@ -0,0 +1,151 @@
+package io.stargate.sgv2.jsonapi.service.cqldriver.executor;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
+import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
+import io.smallrye.mutiny.Uni;
+import io.stargate.sgv2.api.common.StargateRequestInfo;
+import io.stargate.sgv2.jsonapi.config.OperationsConfig;
+import io.stargate.sgv2.jsonapi.exception.ErrorCode;
+import io.stargate.sgv2.jsonapi.exception.JsonApiException;
+import io.stargate.sgv2.jsonapi.service.cqldriver.CQLSessionCache;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ApplicationScoped
+public class QueryExecutor {
+ private static final Logger logger = LoggerFactory.getLogger(QueryExecutor.class);
+ private final OperationsConfig operationsConfig;
+
+ private final StargateRequestInfo stargateRequestInfo;
+ /** CQLSession cache. */
+ @Inject CQLSessionCache cqlSessionCache;
+
+ @Inject
+ public QueryExecutor(OperationsConfig operationsConfig, StargateRequestInfo stargateRequestInfo) {
+ this.operationsConfig = operationsConfig;
+ this.stargateRequestInfo = stargateRequestInfo;
+ }
+
+ /**
+ * Execute read query with bound statement.
+ *
+ * @param simpleStatement - Simple statement with query and parameters. The table name used in the
+ * query must have keyspace prefixed.
+ * @param pagingState - In case of pagination, the paging state needs to be passed to fetch
+ * subsequent pages
+ * @param pageSize - page size
+ * @return AsyncResultSet
+ */
+ public Uni executeRead(
+ SimpleStatement simpleStatement, Optional pagingState, int pageSize) {
+ simpleStatement =
+ simpleStatement
+ .setPageSize(pageSize)
+ .setConsistencyLevel(operationsConfig.queriesConfig().consistency().reads());
+ if (pagingState.isPresent()) {
+ simpleStatement =
+ simpleStatement.setPagingState(ByteBuffer.wrap(decodeBase64(pagingState.get())));
+ }
+ return Uni.createFrom()
+ .completionStage(cqlSessionCache.getSession().executeAsync(simpleStatement));
+ }
+
+ /**
+ * Execute write query with bound statement.
+ *
+ * @param statement - Bound statement with query and parameters. The table name used in the query
+ * must have keyspace prefixed.
+ * @return AsyncResultSet
+ */
+ public Uni executeWrite(SimpleStatement statement) {
+ return Uni.createFrom()
+ .completionStage(
+ cqlSessionCache
+ .getSession()
+ .executeAsync(
+ statement
+ .setConsistencyLevel(
+ operationsConfig.queriesConfig().consistency().writes())
+ .setSerialConsistencyLevel(
+ operationsConfig.queriesConfig().serialConsistency())));
+ }
+
+ /**
+ * Execute schema change query with bound statement.
+ *
+ * @param boundStatement - Bound statement with query and parameters. The table name used in the
+ * query must have keyspace prefixed.
+ * @return AsyncResultSet
+ */
+ public Uni executeSchemaChange(SimpleStatement boundStatement) {
+ return Uni.createFrom()
+ .completionStage(
+ cqlSessionCache
+ .getSession()
+ .executeAsync(
+ boundStatement.setSerialConsistencyLevel(
+ operationsConfig.queriesConfig().consistency().schemaChanges())));
+ }
+
+ /**
+ * Gets the schema for the provided namespace and collection name
+ *
+ * @param namespace
+ * @param collectionName
+ * @return
+ */
+ protected Uni> getSchema(String namespace, String collectionName) {
+ KeyspaceMetadata keyspaceMetadata;
+ try {
+ keyspaceMetadata =
+ cqlSessionCache
+ .getSession()
+ .getMetadata()
+ .getKeyspaces()
+ .get(CqlIdentifier.fromCql("\"" + namespace + "\""));
+ } catch (Exception e) {
+ return Uni.createFrom().failure(e);
+ }
+ // if namespace does not exist, throw error
+ if (keyspaceMetadata == null) {
+ return Uni.createFrom()
+ .failure(
+ new JsonApiException(
+ ErrorCode.NAMESPACE_DOES_NOT_EXIST,
+ "The provided namespace does not exist: " + namespace));
+ }
+ // else get the table
+ return Uni.createFrom().item(keyspaceMetadata.getTable("\"" + collectionName + "\""));
+ }
+
+ /**
+ * Gets the schema for the provided namespace and collection name
+ *
+ * @param namespace - namespace
+ * @param collectionName - collection name
+ * @return TableMetadata
+ */
+ protected Uni getCollectionSchema(String namespace, String collectionName) {
+ Optional keyspaceMetadata;
+ if ((keyspaceMetadata = cqlSessionCache.getSession().getMetadata().getKeyspace(namespace))
+ .isPresent()) {
+ Optional tableMetadata = keyspaceMetadata.get().getTable(collectionName);
+ if (tableMetadata.isPresent()) {
+ return Uni.createFrom().item(tableMetadata.get());
+ }
+ }
+ return Uni.createFrom().nullItem();
+ }
+
+ private static byte[] decodeBase64(String base64encoded) {
+ return Base64.getDecoder().decode(base64encoded);
+ }
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/SchemaCache.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/SchemaCache.java
similarity index 95%
rename from src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/SchemaCache.java
rename to src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/SchemaCache.java
index e5494fadda..586302803c 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/executor/SchemaCache.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/SchemaCache.java
@@ -1,4 +1,4 @@
-package io.stargate.sgv2.jsonapi.service.bridge.executor;
+package io.stargate.sgv2.jsonapi.service.cqldriver.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Cache;
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/serializer/CQLBindValues.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/serializer/CQLBindValues.java
new file mode 100644
index 0000000000..70acb47410
--- /dev/null
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/serializer/CQLBindValues.java
@@ -0,0 +1,91 @@
+package io.stargate.sgv2.jsonapi.service.cqldriver.serializer;
+
+import com.datastax.oss.driver.api.core.data.CqlVector;
+import com.datastax.oss.driver.api.core.data.TupleValue;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.core.type.TupleType;
+import io.stargate.sgv2.jsonapi.service.shredding.JsonPath;
+import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class CQLBindValues {
+
+ public static Map getIntegerMapValues(Map from) {
+ final Map to = new HashMap<>(from.size());
+ for (Map.Entry entry : from.entrySet()) {
+ to.put(entry.getKey().toString(), entry.getValue());
+ }
+ return to;
+ }
+
+ public static Set getSetValue(Set from) {
+ return from.stream().map(val -> val.toString()).collect(Collectors.toSet());
+ }
+
+ public static Set getStringSetValue(Set from) {
+ return from.stream().map(val -> val.toString()).collect(Collectors.toSet());
+ }
+
+ public static List getListValue(List from) {
+ return from.stream().map(val -> val.toString()).collect(Collectors.toList());
+ }
+
+ public static Map getStringMapValues(Map from) {
+ final Map to = new HashMap<>(from.size());
+ for (Map.Entry entry : from.entrySet()) {
+ to.put(entry.getKey().toString(), entry.getValue());
+ }
+ return to;
+ }
+
+ public static Map getBooleanMapValues(Map from) {
+ final Map to = new HashMap<>(from.size());
+ for (Map.Entry entry : from.entrySet()) {
+ to.put(entry.getKey().toString(), (byte) (entry.getValue() ? 1 : 0));
+ }
+ return to;
+ }
+
+ public static Map getDoubleMapValues(Map from) {
+ final Map to = new HashMap<>(from.size());
+ for (Map.Entry entry : from.entrySet()) {
+ to.put(entry.getKey().toString(), entry.getValue());
+ }
+ return to;
+ }
+
+ public static Map getTimestampMapValues(Map from) {
+ final Map to = new HashMap<>(from.size());
+ for (Map.Entry entry : from.entrySet()) {
+ to.put(entry.getKey().toString(), Instant.ofEpochMilli(entry.getValue().getTime()));
+ }
+ return to;
+ }
+
+ private static TupleType tupleType = DataTypes.tupleOf(DataTypes.TINYINT, DataTypes.TEXT);
+
+ public static TupleValue getDocumentIdValue(DocumentId documentId) {
+ // Temporary implementation until we convert it to Tuple in DB
+ final TupleValue tupleValue =
+ tupleType.newValue((byte) documentId.typeId(), documentId.asDBKey());
+ return tupleValue;
+ }
+
+ public static CqlVector getVectorValue(float[] vectors) {
+ if (vectors == null || vectors.length == 0) {
+ return null;
+ }
+
+ List vectorValues = new ArrayList<>(vectors.length);
+ for (float vectorValue : vectors) vectorValues.add(vectorValue);
+ return CqlVector.newInstance(vectorValues);
+ }
+}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/serializer/CustomValueSerializers.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/serializer/CustomValueSerializers.java
similarity index 98%
rename from src/main/java/io/stargate/sgv2/jsonapi/service/bridge/serializer/CustomValueSerializers.java
rename to src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/serializer/CustomValueSerializers.java
index ac158bb677..7a427f329e 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/serializer/CustomValueSerializers.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/serializer/CustomValueSerializers.java
@@ -1,4 +1,4 @@
-package io.stargate.sgv2.jsonapi.service.bridge.serializer;
+package io.stargate.sgv2.jsonapi.service.cqldriver.serializer;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
index b5b256396a..20bfd4a5f5 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
@@ -1,6 +1,7 @@
package io.stargate.sgv2.jsonapi.service.operation.model;
import com.bpodgursky.jbool_expressions.Expression;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.smallrye.mutiny.Uni;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.api.common.cql.builder.BuiltCondition;
@@ -8,9 +9,10 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.clause.filter.LogicalExpression;
-import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
+import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.CountOperationPage;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.ExpressionBuilder;
+import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
@@ -23,21 +25,29 @@ public record CountOperation(CommandContext commandContext, LogicalExpression lo
@Override
public Uni> execute(QueryExecutor queryExecutor) {
- QueryOuterClass.Query query = buildSelectQuery();
- return countDocuments(queryExecutor, query)
+ SimpleStatement simpleStatement = buildSelectQuery();
+ return countDocuments(queryExecutor, simpleStatement)
.onItem()
.transform(docs -> new CountOperationPage(docs.count()));
}
- private QueryOuterClass.Query buildSelectQuery() {
+ private SimpleStatement buildSelectQuery() {
List> expressions =
ExpressionBuilder.buildExpressions(logicalExpression, null);
- return new QueryBuilder()
- .select()
- .count()
- .as("count")
- .from(commandContext.namespace(), commandContext.collection())
- .where(expressions.get(0)) // TODO count will assume no id filter query split?
- .build();
+ List