Skip to content

Commit

Permalink
Update on tables using OperationAttempt (#1552)
Browse files Browse the repository at this point in the history
Co-authored-by: Yuqi Du <istimdu@gmail.com>
  • Loading branch information
amorton and Yuqi-Du authored Oct 24, 2024
1 parent cb80e2b commit ef33b5a
Show file tree
Hide file tree
Showing 24 changed files with 1,131 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public enum Scope implements ErrorScope {
DOCUMENT,
/** See {@link FilterException} */
FILTER,
/** See {@link UpdateException} */
UPDATE,
/** See {@link SchemaException} */
SCHEMA,
/** See {@link WarningException} */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.stargate.sgv2.jsonapi.exception;

/**
* Errors related to the filter clause in a request.
*
* <p>See {@link APIException} for steps to add a new code.
*/
public class UpdateException extends RequestException {

public static final Scope SCOPE = Scope.UPDATE;

public UpdateException(ErrorInstance errorInstance) {
super(errorInstance);
}

public enum Code implements ErrorCode<UpdateException> {
UNKNOWN_TABLE_COLUMNS,
UNSUPPORTED_UPDATE_OPERATION_FOR_TABLE,
UPDATE_PRIMARY_KEY_COLUMNS,
ZERO_UPDATE_OPERATIONS_FOR_TABLE;

private final ErrorTemplate<UpdateException> template;

Code() {
template = ErrorTemplate.load(UpdateException.class, FAMILY, SCOPE, name());
}

@Override
public ErrorTemplate<UpdateException> template() {
return template;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.stargate.sgv2.jsonapi.service.operation;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.update;

import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.querybuilder.update.Update;
import com.datastax.oss.driver.api.querybuilder.update.UpdateWithAssignments;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.CommandQueryExecutor;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableBasedSchemaObject;
import io.stargate.sgv2.jsonapi.service.operation.query.UpdateValuesCQLClause;
import io.stargate.sgv2.jsonapi.service.operation.query.WhereCQLClause;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** An attempt to update a row from a table */
public class UpdateAttempt<SchemaT extends TableBasedSchemaObject>
extends OperationAttempt<UpdateAttempt<SchemaT>, SchemaT> {

private static final Logger LOGGER = LoggerFactory.getLogger(UpdateAttempt.class);

private final UpdateValuesCQLClause updateCQLClause;
private final WhereCQLClause<Update> whereCQLClause;

public UpdateAttempt(
int position,
SchemaT schemaObject,
UpdateValuesCQLClause updateCQLClause,
WhereCQLClause<Update> whereCQLClause) {
super(position, schemaObject, RetryPolicy.NO_RETRY);

// nullable, because the subclass may want to implement method itself.
// and if there is an error shredding we will not have the insert clause
this.whereCQLClause = whereCQLClause;
this.updateCQLClause = updateCQLClause;
setStatus(OperationStatus.READY);
}

@Override
protected Uni<AsyncResultSet> executeStatement(CommandQueryExecutor queryExecutor) {
// bind and execute
var statement = buildUpdateStatement();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"execute() - {}, cql={}, values={}",
positionAndAttemptId(),
statement.getQuery(),
statement.getPositionalValues());
}
return queryExecutor.executeWrite(statement);
}

/**
* The framework for WhereCQLClause expects something extending OngoingWhereClause, and there is
* no easy way to get that for update, we know this will be work DefaultUpdate implements Update -
* no doing an instanceOf check because of this.
*/
@SuppressWarnings("unchecked")
protected static Update uncheckedToUpdate(UpdateWithAssignments updateWithAssignments) {
return (Update) updateWithAssignments;
}

protected SimpleStatement buildUpdateStatement() {
Objects.requireNonNull(updateCQLClause, "updateCQLClause must not be null");
Objects.requireNonNull(whereCQLClause, "whereCQLClause must not be null");

var metadata = schemaObject.tableMetadata();
// Start the update
var updateStart = update(metadata.getKeyspace(), metadata.getName());

// Update the columns
List<Object> positionalValues = new ArrayList<>();
var updateWithAssignments = updateCQLClause.apply(updateStart, positionalValues);
// Add the where clause
var update = whereCQLClause.apply(uncheckedToUpdate(updateWithAssignments), positionalValues);

// There are no options for update so far
return update.build(positionalValues.toArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.stargate.sgv2.jsonapi.service.operation;

import com.datastax.oss.driver.api.querybuilder.update.Update;
import io.stargate.sgv2.jsonapi.exception.FilterException;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableBasedSchemaObject;
import io.stargate.sgv2.jsonapi.service.operation.query.UpdateValuesCQLClause;
import io.stargate.sgv2.jsonapi.service.operation.query.WhereCQLClause;
import io.stargate.sgv2.jsonapi.service.operation.tables.WhereCQLClauseAnalyzer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Builds an attempt to delete a row from an API Table, create a single instance and then call
* {@link #build(WhereCQLClause)} for each different where clause the command creates.
*/
public class UpdateAttemptBuilder<SchemaT extends TableBasedSchemaObject> {

private static final Logger LOGGER = LoggerFactory.getLogger(UpdateAttemptBuilder.class);

// first value is zero, but we increment before we use it
private int readPosition = -1;

private final SchemaT tableBasedSchema;
private final WhereCQLClauseAnalyzer whereCQLClauseAnalyzer;

public UpdateAttemptBuilder(SchemaT tableBasedSchema) {

this.tableBasedSchema = tableBasedSchema;
this.whereCQLClauseAnalyzer =
new WhereCQLClauseAnalyzer(tableBasedSchema, WhereCQLClauseAnalyzer.StatementType.UPDATE);
}

public UpdateAttempt<SchemaT> build(
WhereCQLClause<Update> whereCQLClause, UpdateValuesCQLClause updateCQLClause) {

readPosition += 1;

// TODO: this may be common for creating a read / delete / where attempt will look at how to
// refactor once all done
WhereCQLClauseAnalyzer.WhereClauseAnalysis analyzedResult = null;
Exception exception = null;
try {
analyzedResult = whereCQLClauseAnalyzer.analyse(whereCQLClause);
} catch (FilterException filterException) {
exception = filterException;
}

var attempt =
new UpdateAttempt<>(readPosition, tableBasedSchema, updateCQLClause, whereCQLClause);

// ok to pass null exception, will be ignored
attempt.maybeAddFailure(exception);

// There should not be any warnings, we cannot turn on allow filtering for delete
// and we should not be turning on allow filtering for delete
// sanity check
if (analyzedResult != null && !analyzedResult.isEmpty()) {
throw new IllegalStateException(
"Where clause analysis for update was not empty, analysis:%s".formatted(analyzedResult));
}

return attempt;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.stargate.sgv2.jsonapi.service.operation;

import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResultBuilder;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableBasedSchemaObject;

/**
* A page of results from a update command, use {@link #builder()} to get a builder to pass to
* {@link GenericOperation}.
*/
public class UpdateAttemptPage<SchemaT extends TableBasedSchemaObject>
extends OperationAttemptPage<SchemaT, UpdateAttempt<SchemaT>> {

private UpdateAttemptPage(
OperationAttemptContainer<SchemaT, UpdateAttempt<SchemaT>> attempts,
CommandResultBuilder resultBuilder) {
super(attempts, resultBuilder);
}

public static <SchemaT extends TableBasedSchemaObject> Builder<SchemaT> builder() {
return new Builder<>();
}

@Override
protected void buildCommandResult() {

// set errors and warnings
super.buildCommandResult();

// Because CQL UPDATE is a upsert it will always match and always modify a row, even
// if that means inserting
// However - we do not know if an upsert happened :(
// NOTE when update collection uses operation attempt this will get more complex
// If there is error, we won't add this status.
if (attempts.errorAttempts().isEmpty()) {
resultBuilder.addStatus(CommandStatus.MATCHED_COUNT, 1);
resultBuilder.addStatus(CommandStatus.MODIFIED_COUNT, 1);
}
}

public static class Builder<SchemaT extends TableBasedSchemaObject>
extends OperationAttemptPageBuilder<SchemaT, UpdateAttempt<SchemaT>> {

Builder() {}

@Override
public UpdateAttemptPage<SchemaT> getOperationPage() {

// when we refactor collections to use the OperationAttempt this will need to support
// returning a document
// e.g. for findOneAndDelete, for now it is always status only

return new UpdateAttemptPage<>(
attempts, CommandResult.statusOnlyBuilder(useErrorObjectV2, debugMode));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class ColumnAssignment implements CQLAssignment {

private final TableMetadata tableMetadata;
private final CqlIdentifier column;
public final CqlIdentifier column;
private final JsonLiteral<?> value;

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ private void checkNoFilters(Map<CqlIdentifier, TableFilter> identifierToFilter)
/**
* Check if there is other columns are filtered against other than primary key columns.
*
* <p>For UPDATE, DELETE (TODO, DELETE MANY?). If there are additional columns are specified in
* the where clause other than primary key columns, [Invalid query] message="Non PRIMARY KEY
* columns found in where clause: xxx"
* <p>For UPDATE, DELETE. If there are additional columns are specified in the where clause other
* than primary key columns, [Invalid query] message="Non PRIMARY KEY columns found in where
* clause: xxx"
*/
private void checkNonPrimaryKeyFilters(Map<CqlIdentifier, TableFilter> identifierToFilter) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ default Operation resolveCollectionCommand(
"%s Command does not support operating on Collections, target was %s",
command.getClass().getSimpleName(), ctx.schemaObject().name());
}
;

/**
* Implementors should use this method when they can resolve commands for a table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

/** Resolves the {@link UpdateManyCommand } */
/**
* Resolves the {@link UpdateManyCommand } <b>NOTE:</b> cannot run updateMany command on a table!
* only on collections
*/
@ApplicationScoped
public class UpdateManyCommandResolver implements CommandResolver<UpdateManyCommand> {
private final DocumentShredder documentShredder;
Expand Down
Loading

0 comments on commit ef33b5a

Please sign in to comment.