Skip to content

Commit

Permalink
Expand Schema Cache to handle tables
Browse files Browse the repository at this point in the history
added flag in FeatureFlags to enable getting tables in
the Namespace cache

unit tests passing other than session cache tests
  • Loading branch information
amorton committed Jul 8, 2024
1 parent 1296c55 commit 607f0d8
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,54 @@ public record CommandContext<T extends SchemaObject>(
public static final CommandContext<TableSchemaObject> EMPTY_TABLE =
new CommandContext<>(TableSchemaObject.MISSING, null, "testCommand", null);

public static CommandContext<CollectionSchemaObject> collectionCommandContext(
/**
* Factory method to create a new instance of {@link CommandContext} based on the schema object we
* are working with.
*
* <p>This one handles the super class of {@link SchemaObject}
*
* @param schemaObject
* @param embeddingProvider
* @param commandName
* @param jsonProcessingMetricsReporter
* @return
*/
@SuppressWarnings("unchecked")
public static <T extends SchemaObject> CommandContext<T> forSchemaObject(
T schemaObject,
EmbeddingProvider embeddingProvider,
String commandName,
JsonProcessingMetricsReporter jsonProcessingMetricsReporter) {

// TODO: upgrade to use the modern switch statements
// TODO: how to remove the unchecked cast ? Had to use unchecked cast to get back to the
// CommandContext<T>
if (schemaObject instanceof CollectionSchemaObject cso) {
return (CommandContext<T>)
forSchemaObject(cso, embeddingProvider, commandName, jsonProcessingMetricsReporter);
}
if (schemaObject instanceof TableSchemaObject tso) {
return (CommandContext<T>)
forSchemaObject(tso, embeddingProvider, commandName, jsonProcessingMetricsReporter);
}
if (schemaObject instanceof KeyspaceSchemaObject kso) {
return (CommandContext<T>)
forSchemaObject(kso, embeddingProvider, commandName, jsonProcessingMetricsReporter);
}
throw new IllegalArgumentException("Unknown schema object type: " + schemaObject.getClass());
}

/**
* Factory method to create a new instance of {@link CommandContext} based on the schema object we
* are working with
*
* @param schemaObject
* @param embeddingProvider
* @param commandName
* @param jsonProcessingMetricsReporter
* @return
*/
public static CommandContext<CollectionSchemaObject> forSchemaObject(
CollectionSchemaObject schemaObject,
EmbeddingProvider embeddingProvider,
String commandName,
Expand All @@ -40,7 +87,17 @@ public static CommandContext<CollectionSchemaObject> collectionCommandContext(
schemaObject, embeddingProvider, commandName, jsonProcessingMetricsReporter);
}

public static CommandContext<TableSchemaObject> tableCommandContext(
/**
* Factory method to create a new instance of {@link CommandContext} based on the schema object we
* are working with
*
* @param schemaObject
* @param embeddingProvider
* @param commandName
* @param jsonProcessingMetricsReporter
* @return
*/
public static CommandContext<TableSchemaObject> forSchemaObject(
TableSchemaObject schemaObject,
EmbeddingProvider embeddingProvider,
String commandName,
Expand All @@ -49,6 +106,25 @@ public static CommandContext<TableSchemaObject> tableCommandContext(
schemaObject, embeddingProvider, commandName, jsonProcessingMetricsReporter);
}

/**
* Factory method to create a new instance of {@link CommandContext} based on the schema object we
* are working with
*
* @param schemaObject
* @param embeddingProvider
* @param commandName
* @param jsonProcessingMetricsReporter
* @return
*/
public static CommandContext<KeyspaceSchemaObject> forSchemaObject(
KeyspaceSchemaObject schemaObject,
EmbeddingProvider embeddingProvider,
String commandName,
JsonProcessingMetricsReporter jsonProcessingMetricsReporter) {
return new CommandContext<>(
schemaObject, embeddingProvider, commandName, jsonProcessingMetricsReporter);
}

@SuppressWarnings("unchecked")
public CommandContext<CollectionSchemaObject> asCollectionContext() {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public Uni<RestResponse<CommandResult>> postCommand(
@Size(min = 1, max = 48)
String collection) {
return schemaCache
.getCollectionSettings(
.getSchemaObject(
dataApiRequestInfo, dataApiRequestInfo.getTenantId(), namespace, collection)
.onItemOrFailure()
.transformToUni(
Expand All @@ -178,7 +178,7 @@ else if (error instanceof JsonApiException jsonApiException) {
return Uni.createFrom().item(new ThrowableCommandResultSupplier(error));
} else {
// TODO No need for the else clause here, simplify
// TODO: this is where we know if it's a table or a collection
// TODO: refactor this code to be cleaner so it assigns on one line
EmbeddingProvider embeddingProvider = null;
final VectorConfig.VectorizeConfig vectorizeConfig =
schemaObject.vectorConfig().vectorizeConfig();
Expand All @@ -195,17 +195,13 @@ else if (error instanceof JsonApiException jsonApiException) {
command.getClass().getSimpleName());
}

// TODO: when the schema cache returns table and collections we switch here to
// create
// the correct command context
CommandContext<?> commandContext =
CommandContext.collectionCommandContext(
var commandContext =
CommandContext.forSchemaObject(
schemaObject,
embeddingProvider,
command.getClass().getSimpleName(),
jsonProcessingMetricsReporter);

// call processor
return meteredCommandProcessor.processCommand(
dataApiRequestInfo, commandContext, command);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.stargate.sgv2.jsonapi.service;

/** Hack / temp class to have run time checking for if Tables are supported */
public final class FeatureFlags {

public static final boolean TABLES_SUPPORTED = Boolean.getBoolean("stargate.tables.supported");
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import io.stargate.sgv2.jsonapi.api.request.DataApiRequestInfo;
import io.stargate.sgv2.jsonapi.exception.ErrorCode;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.FeatureFlags;
import io.stargate.sgv2.jsonapi.service.schema.model.JsonapiTableMatcher;
import java.time.Duration;

/** Caches the vector enabled status for the namespace */
// TODO: what is the vector status of a namespace ? vectors are per collection
// TODO: clarify the name of this class, it is a cache of the collections/ tables not a cache of
// namespaces ??
public class NamespaceCache {

public final String namespace;
Expand All @@ -21,9 +24,11 @@ public class NamespaceCache {

private final ObjectMapper objectMapper;

// TODO: move the settings to config
// TODO: set the cache loader when creating the cache
private static final long CACHE_TTL_SECONDS = 300;
private static final long CACHE_MAX_SIZE = 1000;
private final Cache<String, CollectionSchemaObject> vectorCache =
private final Cache<String, SchemaObject> schemaObjectCache =
Caffeine.newBuilder()
.expireAfterWrite(Duration.ofSeconds(CACHE_TTL_SECONDS))
.maximumSize(CACHE_MAX_SIZE)
Expand All @@ -35,21 +40,22 @@ public NamespaceCache(String namespace, QueryExecutor queryExecutor, ObjectMappe
this.objectMapper = objectMapper;
}

protected Uni<CollectionSchemaObject> getCollectionProperties(
protected Uni<SchemaObject> getSchemaObject(
DataApiRequestInfo dataApiRequestInfo, String collectionName) {

// TODO: why is the cache loader not been used ??
CollectionSchemaObject collectionProperty = vectorCache.getIfPresent(collectionName);
// TODO: why is this not using the loader pattern ?
SchemaObject schemaObject = schemaObjectCache.getIfPresent(collectionName);

if (null != collectionProperty) {
return Uni.createFrom().item(collectionProperty);
if (null != schemaObject) {
return Uni.createFrom().item(schemaObject);
} else {
return getVectorProperties(dataApiRequestInfo, collectionName)
return loadSchemaObject(dataApiRequestInfo, collectionName)
.onItemOrFailure()
.transformToUni(
(result, error) -> {
if (null != error) {
// not a valid collection schema
// TODO: Explain why this changes the error code
if (error instanceof JsonApiException
&& ((JsonApiException) error).getErrorCode()
== ErrorCode.VECTORIZECONFIG_CHECK_FAIL) {
Expand All @@ -62,6 +68,8 @@ protected Uni<CollectionSchemaObject> getCollectionProperties(
.concat(collectionName)));
}
// collection does not exist
// TODO: DO NOT do a string starts with , use property error structures
// again, why is this here, looks like it returns the same error code ?
if (error instanceof RuntimeException rte
&& rte.getMessage().startsWith(ErrorCode.COLLECTION_NOT_EXIST.getMessage())) {
return Uni.createFrom()
Expand All @@ -76,6 +84,8 @@ protected Uni<CollectionSchemaObject> getCollectionProperties(
// TODO This if block can be deleted? grpc code
// ignoring the error and return false. This will be handled while trying to
// execute the query
// TODO: WHY ARE WE IGNORING THE ERROR AND RETURNING FAKE COLLECTION SCHEMA ? This
// is a bad practice
if ((error instanceof StatusRuntimeException sre
&& (sre.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND
|| sre.getStatus().getCode() == io.grpc.Status.Code.INVALID_ARGUMENT))) {
Expand All @@ -90,36 +100,48 @@ protected Uni<CollectionSchemaObject> getCollectionProperties(
}
return Uni.createFrom().failure(error);
} else {
vectorCache.put(collectionName, result);
schemaObjectCache.put(collectionName, result);
return Uni.createFrom().item(result);
}
});
}
}

private Uni<CollectionSchemaObject> getVectorProperties(
private Uni<SchemaObject> loadSchemaObject(
DataApiRequestInfo dataApiRequestInfo, String collectionName) {

return queryExecutor
.getSchema(dataApiRequestInfo, namespace, collectionName)
.onItem()
.transform(
table -> {
if (table.isPresent()) {
// check if its a valid json api table
if (!new JsonapiTableMatcher().test(table.get())) {
throw new JsonApiException(
ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA,
ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA.getMessage() + collectionName);
}
return CollectionSchemaObject.getCollectionSettings(table.get(), objectMapper);
} else {
throw new RuntimeException(
ErrorCode.COLLECTION_NOT_EXIST.getMessage() + collectionName);
optionalTable -> {
// TODO: AARON - I changed the logic here, needs to be checked
// TODO: error code here needs to be for collections and tables
var table =
optionalTable.orElseThrow(
() ->
new RuntimeException(
ErrorCode.COLLECTION_NOT_EXIST.getMessage() + collectionName));

// check if its a valid json api table
// TODO: re-use the table matcher this is on the request hot path
if (new JsonapiTableMatcher().test(table)) {
return CollectionSchemaObject.getCollectionSettings(
optionalTable.get(), objectMapper);
}

if (FeatureFlags.TABLES_SUPPORTED) {
return new TableSchemaObject(namespace, collectionName);
}

// Target is not a collection and we are not supporting tables
throw new JsonApiException(
ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA,
ErrorCode.INVALID_JSONAPI_COLLECTION_SCHEMA.getMessage() + collectionName);
});
}

public void evictCollectionSettingCacheEntry(String collectionName) {
vectorCache.invalidate(collectionName);
schemaObjectCache.invalidate(collectionName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ protected Uni<Optional<TableMetadata>> getSchema(
.getKeyspaces()
.get(CqlIdentifier.fromInternal(namespace));
} catch (Exception e) {
// TODO: this ^^ is a very wide error catch, confirm what it should actually be catcing
return Uni.createFrom().failure(e);
}
// if namespace does not exist, throw error
Expand All @@ -268,6 +269,7 @@ protected Uni<Optional<TableMetadata>> getSchema(
"The provided namespace does not exist: " + namespace));
}
// else get the table
// TODO: this should probably use CqlIdentifier.fromCql() if we want to be case sensitive
return Uni.createFrom().item(keyspaceMetadata.getTable("\"" + collectionName + "\""));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,34 @@ public class SchemaCache {

@Inject OperationsConfig operationsConfig;

// TODO: The size of the cache should be in configuration.
// TODO: set the cache loader when creating the cache
private final Cache<CacheKey, NamespaceCache> schemaCache =
Caffeine.newBuilder().maximumSize(1000).build();

public Uni<CollectionSchemaObject> getCollectionSettings(
public Uni<SchemaObject> getSchemaObject(
DataApiRequestInfo dataApiRequestInfo,
Optional<String> tenant,
String namespace,
String collectionName) {

// TODO: refactor, this has duplicate code, the only special handling the OSS has is the tenant
// check

if (CASSANDRA.equals(operationsConfig.databaseConfig().type())) {
// default_tenant is for oss run
// TODO: move the string to a constant or config, why does this still check the tenant if this
// is for OSS ?
final NamespaceCache namespaceCache =
schemaCache.get(
new CacheKey(Optional.of(tenant.orElse("default_tenant")), namespace),
this::addNamespaceCache);
return namespaceCache.getCollectionProperties(dataApiRequestInfo, collectionName);
return namespaceCache.getSchemaObject(dataApiRequestInfo, collectionName);
}

final NamespaceCache namespaceCache =
schemaCache.get(new CacheKey(tenant, namespace), this::addNamespaceCache);
return namespaceCache.getCollectionProperties(dataApiRequestInfo, collectionName);
return namespaceCache.getSchemaObject(dataApiRequestInfo, collectionName);
}

/** Evict collectionSetting Cache entry when there is a drop table event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ public class TableSchemaObject extends SchemaObject {
/** Represents missing schema, e.g. when we are running a create table. */
public static final TableSchemaObject MISSING = new TableSchemaObject(SchemaObjectName.MISSING);

// TODO: hold the table meta data, need to work out how we handle mock tables in test etc.
// public final TableMetadata tableMetadata;

public TableSchemaObject(String keyspace, String name) {
this(new SchemaObjectName(keyspace, name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public record VectorConfig(
VectorizeConfig vectorizeConfig) {

// TODO: this is an immutable record, this can be singleton
// TODO: Remove the use of NULL for the objects like vectorizeConfig
public static VectorConfig notEnabledVectorConfig() {
return new VectorConfig(false, -1, null, null);
}
Expand Down
Loading

0 comments on commit 607f0d8

Please sign in to comment.