From 607f0d8643b3a350f4034906b87ea0a4cb47e1a2 Mon Sep 17 00:00:00 2001 From: Aaron Morton Date: Mon, 8 Jul 2024 13:32:09 +1200 Subject: [PATCH] Expand Schema Cache to handle tables added flag in FeatureFlags to enable getting tables in the Namespace cache unit tests passing other than session cache tests --- .../api/model/command/CommandContext.java | 80 ++++++++++++++++++- .../jsonapi/api/v1/CollectionResource.java | 12 +-- .../sgv2/jsonapi/service/FeatureFlags.java | 7 ++ .../cqldriver/executor/NamespaceCache.java | 66 ++++++++++----- .../cqldriver/executor/QueryExecutor.java | 2 + .../cqldriver/executor/SchemaCache.java | 15 +++- .../cqldriver/executor/TableSchemaObject.java | 3 + .../cqldriver/executor/VectorConfig.java | 1 + .../executor/NamespaceCacheTest.java | 17 ++-- 9 files changed, 161 insertions(+), 42 deletions(-) create mode 100644 src/main/java/io/stargate/sgv2/jsonapi/service/FeatureFlags.java diff --git a/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java b/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java index a578961639..2a998b0638 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/CommandContext.java @@ -31,7 +31,54 @@ public record CommandContext( public static final CommandContext EMPTY_TABLE = new CommandContext<>(TableSchemaObject.MISSING, null, "testCommand", null); - public static CommandContext collectionCommandContext( + /** + * Factory method to create a new instance of {@link CommandContext} based on the schema object we + * are working with. + * + *

This one handles the super class of {@link SchemaObject} + * + * @param schemaObject + * @param embeddingProvider + * @param commandName + * @param jsonProcessingMetricsReporter + * @return + */ + @SuppressWarnings("unchecked") + public static CommandContext forSchemaObject( + T schemaObject, + EmbeddingProvider embeddingProvider, + String commandName, + JsonProcessingMetricsReporter jsonProcessingMetricsReporter) { + + // TODO: upgrade to use the modern switch statements + // TODO: how to remove the unchecked cast ? Had to use unchecked cast to get back to the + // CommandContext + if (schemaObject instanceof CollectionSchemaObject cso) { + return (CommandContext) + forSchemaObject(cso, embeddingProvider, commandName, jsonProcessingMetricsReporter); + } + if (schemaObject instanceof TableSchemaObject tso) { + return (CommandContext) + forSchemaObject(tso, embeddingProvider, commandName, jsonProcessingMetricsReporter); + } + if (schemaObject instanceof KeyspaceSchemaObject kso) { + return (CommandContext) + forSchemaObject(kso, embeddingProvider, commandName, jsonProcessingMetricsReporter); + } + throw new IllegalArgumentException("Unknown schema object type: " + schemaObject.getClass()); + } + + /** + * Factory method to create a new instance of {@link CommandContext} based on the schema object we + * are working with + * + * @param schemaObject + * @param embeddingProvider + * @param commandName + * @param jsonProcessingMetricsReporter + * @return + */ + public static CommandContext forSchemaObject( CollectionSchemaObject schemaObject, EmbeddingProvider embeddingProvider, String commandName, @@ -40,7 +87,17 @@ public static CommandContext collectionCommandContext( schemaObject, embeddingProvider, commandName, jsonProcessingMetricsReporter); } - public static CommandContext tableCommandContext( + /** + * Factory method to create a new instance of {@link CommandContext} based on the schema object we + * are working with + * + * @param schemaObject + * @param embeddingProvider + * @param commandName + * @param jsonProcessingMetricsReporter + * @return + */ + public static CommandContext forSchemaObject( TableSchemaObject schemaObject, EmbeddingProvider embeddingProvider, String commandName, @@ -49,6 +106,25 @@ public static CommandContext tableCommandContext( schemaObject, embeddingProvider, commandName, jsonProcessingMetricsReporter); } + /** + * Factory method to create a new instance of {@link CommandContext} based on the schema object we + * are working with + * + * @param schemaObject + * @param embeddingProvider + * @param commandName + * @param jsonProcessingMetricsReporter + * @return + */ + public static CommandContext forSchemaObject( + KeyspaceSchemaObject schemaObject, + EmbeddingProvider embeddingProvider, + String commandName, + JsonProcessingMetricsReporter jsonProcessingMetricsReporter) { + return new CommandContext<>( + schemaObject, embeddingProvider, commandName, jsonProcessingMetricsReporter); + } + @SuppressWarnings("unchecked") public CommandContext asCollectionContext() { Preconditions.checkArgument( diff --git a/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java b/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java index 7e09749d1a..dfe789aab4 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/api/v1/CollectionResource.java @@ -161,7 +161,7 @@ public Uni> postCommand( @Size(min = 1, max = 48) String collection) { return schemaCache - .getCollectionSettings( + .getSchemaObject( dataApiRequestInfo, dataApiRequestInfo.getTenantId(), namespace, collection) .onItemOrFailure() .transformToUni( @@ -178,7 +178,7 @@ else if (error instanceof JsonApiException jsonApiException) { return Uni.createFrom().item(new ThrowableCommandResultSupplier(error)); } else { // TODO No need for the else clause here, simplify - // TODO: this is where we know if it's a table or a collection + // TODO: refactor this code to be cleaner so it assigns on one line EmbeddingProvider embeddingProvider = null; final VectorConfig.VectorizeConfig vectorizeConfig = schemaObject.vectorConfig().vectorizeConfig(); @@ -195,17 +195,13 @@ else if (error instanceof JsonApiException jsonApiException) { command.getClass().getSimpleName()); } - // TODO: when the schema cache returns table and collections we switch here to - // create - // the correct command context - CommandContext commandContext = - CommandContext.collectionCommandContext( + var commandContext = + CommandContext.forSchemaObject( schemaObject, embeddingProvider, command.getClass().getSimpleName(), jsonProcessingMetricsReporter); - // call processor return meteredCommandProcessor.processCommand( dataApiRequestInfo, commandContext, command); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/FeatureFlags.java b/src/main/java/io/stargate/sgv2/jsonapi/service/FeatureFlags.java new file mode 100644 index 0000000000..a3b2d1f996 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/FeatureFlags.java @@ -0,0 +1,7 @@ +package io.stargate.sgv2.jsonapi.service; + +/** Hack / temp class to have run time checking for if Tables are supported */ +public final class FeatureFlags { + + public static final boolean TABLES_SUPPORTED = Boolean.getBoolean("stargate.tables.supported"); +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCache.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCache.java index 590505d980..24cdaf95cf 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCache.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCache.java @@ -8,11 +8,14 @@ import io.stargate.sgv2.jsonapi.api.request.DataApiRequestInfo; import io.stargate.sgv2.jsonapi.exception.ErrorCode; import io.stargate.sgv2.jsonapi.exception.JsonApiException; +import io.stargate.sgv2.jsonapi.service.FeatureFlags; import io.stargate.sgv2.jsonapi.service.schema.model.JsonapiTableMatcher; import java.time.Duration; /** Caches the vector enabled status for the namespace */ // TODO: what is the vector status of a namespace ? vectors are per collection +// TODO: clarify the name of this class, it is a cache of the collections/ tables not a cache of +// namespaces ?? public class NamespaceCache { public final String namespace; @@ -21,9 +24,11 @@ public class NamespaceCache { private final ObjectMapper objectMapper; + // TODO: move the settings to config + // TODO: set the cache loader when creating the cache private static final long CACHE_TTL_SECONDS = 300; private static final long CACHE_MAX_SIZE = 1000; - private final Cache vectorCache = + private final Cache schemaObjectCache = Caffeine.newBuilder() .expireAfterWrite(Duration.ofSeconds(CACHE_TTL_SECONDS)) .maximumSize(CACHE_MAX_SIZE) @@ -35,21 +40,22 @@ public NamespaceCache(String namespace, QueryExecutor queryExecutor, ObjectMappe this.objectMapper = objectMapper; } - protected Uni getCollectionProperties( + protected Uni getSchemaObject( DataApiRequestInfo dataApiRequestInfo, String collectionName) { - // TODO: why is the cache loader not been used ?? - CollectionSchemaObject collectionProperty = vectorCache.getIfPresent(collectionName); + // TODO: why is this not using the loader pattern ? + SchemaObject schemaObject = schemaObjectCache.getIfPresent(collectionName); - if (null != collectionProperty) { - return Uni.createFrom().item(collectionProperty); + if (null != schemaObject) { + return Uni.createFrom().item(schemaObject); } else { - return getVectorProperties(dataApiRequestInfo, collectionName) + return loadSchemaObject(dataApiRequestInfo, collectionName) .onItemOrFailure() .transformToUni( (result, error) -> { if (null != error) { // not a valid collection schema + // TODO: Explain why this changes the error code if (error instanceof JsonApiException && ((JsonApiException) error).getErrorCode() == ErrorCode.VECTORIZECONFIG_CHECK_FAIL) { @@ -62,6 +68,8 @@ protected Uni getCollectionProperties( .concat(collectionName))); } // collection does not exist + // TODO: DO NOT do a string starts with , use property error structures + // again, why is this here, looks like it returns the same error code ? if (error instanceof RuntimeException rte && rte.getMessage().startsWith(ErrorCode.COLLECTION_NOT_EXIST.getMessage())) { return Uni.createFrom() @@ -76,6 +84,8 @@ protected Uni getCollectionProperties( // TODO This if block can be deleted? grpc code // ignoring the error and return false. This will be handled while trying to // execute the query + // TODO: WHY ARE WE IGNORING THE ERROR AND RETURNING FAKE COLLECTION SCHEMA ? This + // is a bad practice if ((error instanceof StatusRuntimeException sre && (sre.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND || sre.getStatus().getCode() == io.grpc.Status.Code.INVALID_ARGUMENT))) { @@ -90,36 +100,48 @@ protected Uni getCollectionProperties( } return Uni.createFrom().failure(error); } else { - vectorCache.put(collectionName, result); + schemaObjectCache.put(collectionName, result); return Uni.createFrom().item(result); } }); } } - private Uni getVectorProperties( + private Uni loadSchemaObject( DataApiRequestInfo dataApiRequestInfo, String collectionName) { + return queryExecutor .getSchema(dataApiRequestInfo, namespace, collectionName) .onItem() .transform( - table -> { - if (table.isPresent()) { - // check if its a valid json api table - if (!new JsonapiTableMatcher().test(table.get())) { - throw new JsonApiException( - ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA, - ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA.getMessage() + collectionName); - } - return CollectionSchemaObject.getCollectionSettings(table.get(), objectMapper); - } else { - throw new RuntimeException( - ErrorCode.COLLECTION_NOT_EXIST.getMessage() + collectionName); + optionalTable -> { + // TODO: AARON - I changed the logic here, needs to be checked + // TODO: error code here needs to be for collections and tables + var table = + optionalTable.orElseThrow( + () -> + new RuntimeException( + ErrorCode.COLLECTION_NOT_EXIST.getMessage() + collectionName)); + + // check if its a valid json api table + // TODO: re-use the table matcher this is on the request hot path + if (new JsonapiTableMatcher().test(table)) { + return CollectionSchemaObject.getCollectionSettings( + optionalTable.get(), objectMapper); } + + if (FeatureFlags.TABLES_SUPPORTED) { + return new TableSchemaObject(namespace, collectionName); + } + + // Target is not a collection and we are not supporting tables + throw new JsonApiException( + ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA, + ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA.getMessage() + collectionName); }); } public void evictCollectionSettingCacheEntry(String collectionName) { - vectorCache.invalidate(collectionName); + schemaObjectCache.invalidate(collectionName); } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java index 652238ac25..e3d782a8c5 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/QueryExecutor.java @@ -257,6 +257,7 @@ protected Uni> getSchema( .getKeyspaces() .get(CqlIdentifier.fromInternal(namespace)); } catch (Exception e) { + // TODO: this ^^ is a very wide error catch, confirm what it should actually be catcing return Uni.createFrom().failure(e); } // if namespace does not exist, throw error @@ -268,6 +269,7 @@ protected Uni> getSchema( "The provided namespace does not exist: " + namespace)); } // else get the table + // TODO: this should probably use CqlIdentifier.fromCql() if we want to be case sensitive return Uni.createFrom().item(keyspaceMetadata.getTable("\"" + collectionName + "\"")); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/SchemaCache.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/SchemaCache.java index 9897e4aa60..864087a9a8 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/SchemaCache.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/SchemaCache.java @@ -25,25 +25,34 @@ public class SchemaCache { @Inject OperationsConfig operationsConfig; + // TODO: The size of the cache should be in configuration. + // TODO: set the cache loader when creating the cache private final Cache schemaCache = Caffeine.newBuilder().maximumSize(1000).build(); - public Uni getCollectionSettings( + public Uni getSchemaObject( DataApiRequestInfo dataApiRequestInfo, Optional tenant, String namespace, String collectionName) { + + // TODO: refactor, this has duplicate code, the only special handling the OSS has is the tenant + // check + if (CASSANDRA.equals(operationsConfig.databaseConfig().type())) { // default_tenant is for oss run + // TODO: move the string to a constant or config, why does this still check the tenant if this + // is for OSS ? final NamespaceCache namespaceCache = schemaCache.get( new CacheKey(Optional.of(tenant.orElse("default_tenant")), namespace), this::addNamespaceCache); - return namespaceCache.getCollectionProperties(dataApiRequestInfo, collectionName); + return namespaceCache.getSchemaObject(dataApiRequestInfo, collectionName); } + final NamespaceCache namespaceCache = schemaCache.get(new CacheKey(tenant, namespace), this::addNamespaceCache); - return namespaceCache.getCollectionProperties(dataApiRequestInfo, collectionName); + return namespaceCache.getSchemaObject(dataApiRequestInfo, collectionName); } /** Evict collectionSetting Cache entry when there is a drop table event */ diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/TableSchemaObject.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/TableSchemaObject.java index d321e067d6..6587929b0b 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/TableSchemaObject.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/TableSchemaObject.java @@ -7,6 +7,9 @@ public class TableSchemaObject extends SchemaObject { /** Represents missing schema, e.g. when we are running a create table. */ public static final TableSchemaObject MISSING = new TableSchemaObject(SchemaObjectName.MISSING); + // TODO: hold the table meta data, need to work out how we handle mock tables in test etc. + // public final TableMetadata tableMetadata; + public TableSchemaObject(String keyspace, String name) { this(new SchemaObjectName(keyspace, name)); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/VectorConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/VectorConfig.java index ea9918426e..4ae19e9437 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/VectorConfig.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/VectorConfig.java @@ -19,6 +19,7 @@ public record VectorConfig( VectorizeConfig vectorizeConfig) { // TODO: this is an immutable record, this can be singleton + // TODO: Remove the use of NULL for the objects like vectorizeConfig public static VectorConfig notEnabledVectorConfig() { return new VectorConfig(false, -1, null, null); } diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCacheTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCacheTest.java index b736dfd13f..6ba54a8271 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCacheTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/cqldriver/executor/NamespaceCacheTest.java @@ -153,15 +153,16 @@ public void checkValidJsonApiTable() { new HashMap<>()))); }); NamespaceCache namespaceCache = new NamespaceCache("ks", queryExecutor, objectMapper); - CollectionSchemaObject collectionSettings = + var schemaObject = namespaceCache - .getCollectionProperties(dataApiRequestInfo, "table") + .getSchemaObject(dataApiRequestInfo, "table") .subscribe() .withSubscriber(UniAssertSubscriber.create()) .awaitItem() .getItem(); - assertThat(collectionSettings) + assertThat(schemaObject instanceof CollectionSchemaObject); + assertThat(schemaObject) .satisfies( s -> { assertThat(s.vectorConfig().vectorEnabled()).isFalse(); @@ -283,15 +284,17 @@ public void checkValidJsonApiTableWithIndexing() { new HashMap<>()))); }); NamespaceCache namespaceCache = new NamespaceCache("ks", queryExecutor, objectMapper); - CollectionSchemaObject collectionSettings = + var schemaObject = namespaceCache - .getCollectionProperties(dataApiRequestInfo, "table") + .getSchemaObject(dataApiRequestInfo, "table") .subscribe() .withSubscriber(UniAssertSubscriber.create()) .awaitItem() .getItem(); - assertThat(collectionSettings) + assertThat(schemaObject instanceof CollectionSchemaObject); + var collectionSchemaObject = (CollectionSchemaObject) schemaObject; + assertThat(collectionSchemaObject) .satisfies( s -> { assertThat(s.vectorConfig().vectorEnabled()).isFalse(); @@ -349,7 +352,7 @@ public void checkInvalidJsonApiTable() { NamespaceCache namespaceCache = new NamespaceCache("ks", queryExecutor, objectMapper); Throwable error = namespaceCache - .getCollectionProperties(dataApiRequestInfo, "table") + .getSchemaObject(dataApiRequestInfo, "table") .subscribe() .withSubscriber(UniAssertSubscriber.create()) .awaitFailure()