Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete - LWT failure retries #218

Merged
merged 16 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public enum ErrorCode {
/** Command error codes. */
COMMAND_NOT_IMPLEMENTED("The provided command is not implemented."),

CONCURRENCY_FAILURE("Unable to complete transaction due to concurrent transactions"),

DOCUMENT_ALREADY_EXISTS("Document already exists with the id"),

DOCUMENT_UNPARSEABLE("Unable to parse the document"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ public interface DocumentConfig {
@Positive
@WithDefault("20")
int maxDocumentUpdateCount();

/** @return Defines the maximum retry for lwt failure <code>3</code>. */
@Max(5)
@Positive
@WithDefault("3")
int maxLWTFailureRetry();
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private QueryOuterClass.Query buildSelectQuery() {
}

@Override
public Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String pagingState) {
public Uni<FindResponse> getDocuments(
QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride) {
return Uni.createFrom().failure(new JsonApiException(ErrorCode.UNSUPPORTED_OPERATION));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.stargate.sgv2.jsonapi.exception.ErrorCode;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.ReadDocument;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.ArrayList;
Expand Down Expand Up @@ -118,7 +119,8 @@ default Uni<CountResponse> countDocuments(
* @param queryExecutor
* @return
*/
Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String pagingState);
Uni<FindResponse> getDocuments(
QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride);
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved

/**
* A operation method which can return ReadDocument with an empty document, if the filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.exception.ErrorCode;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers;
import io.stargate.sgv2.jsonapi.service.operation.model.ModifyOperation;
Expand All @@ -27,7 +29,8 @@ public record DeleteOperation(
* Added parameter to pass number of document to be deleted, this is needed because read
* documents limit changed to deleteLimit + 1
*/
int deleteLimit)
int deleteLimit,
int retryLimit)
implements ModifyOperation {
@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -40,7 +43,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
() -> new AtomicReference<String>(null),
stateRef -> {
Uni<ReadOperation.FindResponse> docsToDelete =
readOperation().getDocuments(queryExecutor, stateRef.get());
readOperation().getDocuments(queryExecutor, stateRef.get(), null);
return docsToDelete
.onItem()
.invoke(findResponse -> stateRef.set(findResponse.pagingState()));
Expand Down Expand Up @@ -68,7 +71,34 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
.concatenate()
.onItem()
.transformToUniAndConcatenate(
readDocument -> deleteDocument(queryExecutor, delete, readDocument))
readDocument -> {
AtomicInteger attempt = new AtomicInteger(0);
return Multi.createBy()
.repeating()
.uni(() -> deleteDocument(queryExecutor, delete, readDocument, attempt))
tatu-at-datastax marked this conversation as resolved.
Show resolved Hide resolved
.whilst(
respVal ->
(respVal == DeleteResponse.CONCURRENCY_FAILURE
&& attempt.incrementAndGet() < retryLimit))
.collect()
.last()
.onItem()
.transform(
respVal -> {
switch (respVal) {
case DELETED:
return true;
case MODIFIED_BY_CONCURRENT_PROCESS:
return false;
case CONCURRENCY_FAILURE:
default:
throw new JsonApiException(
ErrorCode.CONCURRENCY_FAILURE,
"Delete failed for %s because of concurrent transaction"
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
.formatted(readDocument.id().toString()));
}
});
})
.collect()
.in(
AtomicInteger::new,
Expand Down Expand Up @@ -107,20 +137,67 @@ private QueryOuterClass.Query buildDeleteQuery() {
* @param queryExecutor
* @param query
* @param doc
* @return Uni<Boolean> `true` if deleted successfully, else `false`
* @param attempt
* @return Uni<DeleteResponse>
*/
private static Uni<Boolean> deleteDocument(
QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc) {
query = bindDeleteQuery(query, doc);
return queryExecutor
.executeWrite(query)
private Uni<DeleteResponse> deleteDocument(
QueryExecutor queryExecutor,
QueryOuterClass.Query query,
ReadDocument doc,
AtomicInteger attempt) {
final Uni<ReadDocument> documentToDelete =
Uni.createFrom()
.item(attempt.get())
.onItem()
.transformToUni(
attemptValue -> {
if (attemptValue > 0) {
return readDocumentAgain(queryExecutor, doc);
} else {
return Uni.createFrom().item(doc);
}
});
return documentToDelete
.onItem()
.transformToUni(
result -> {
if (result.getRows(0).getValues(0).getBoolean()) {
return Uni.createFrom().item(true);
docToDelete -> {
if (docToDelete == null) {
return Uni.createFrom().item(DeleteResponse.MODIFIED_BY_CONCURRENT_PROCESS);
} else {
return Uni.createFrom().item(false);
QueryOuterClass.Query boundQuery = bindDeleteQuery(query, docToDelete);
return queryExecutor
.executeWrite(boundQuery)
.onItem()
.transform(
result -> {
if (result.getRows(0).getValues(0).getBoolean()) {
return DeleteResponse.DELETED;
} else {
return DeleteResponse.CONCURRENCY_FAILURE;
}
});
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

private Uni<? extends ReadDocument> readDocumentAgain(
QueryExecutor queryExecutor, ReadDocument prevReadDoc) {
// Read again if retry flag is `true`
final Uni<ReadOperation.FindResponse> findResponse =
readOperation()
.getDocuments(
queryExecutor,
null,
new DBFilterBase.IDFilter(DBFilterBase.IDFilter.Operator.EQ, prevReadDoc.id()));
return findResponse
.onItem()
.transform(
response -> {
if (!response.docs().isEmpty()) {
return response.docs().get(0);
} else {
// If data changed and doesn't satisfy filter conditions
return null;
}
});
}
Expand All @@ -133,4 +210,17 @@ private static QueryOuterClass.Query bindDeleteQuery(
.addValues(Values.of(doc.txnId()));
return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build();
}

public enum DeleteResponse {
/** Successfully deleted a document */
DELETED,
/**
* Document modified by concurrent process and doesn't match the condition Could have changed
* value or deleted
*/
MODIFIED_BY_CONCURRENT_PROCESS,

/** Failed because of concurrent process */
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
CONCURRENCY_FAILURE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ public record FindOperation(

@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
return getDocuments(queryExecutor, pagingState())
return getDocuments(queryExecutor, pagingState(), null)
.onItem()
.transform(docs -> new ReadOperationPage(docs.docs(), docs.pagingState()));
}

@Override
public Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String pagingState) {
public Uni<FindResponse> getDocuments(
QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride) {
switch (readType) {
case DOCUMENT:
case KEY:
{
QueryOuterClass.Query query = buildSelectQuery();
QueryOuterClass.Query query = buildSelectQuery(idOverride);
return findDocument(
queryExecutor,
query,
Expand Down Expand Up @@ -83,11 +84,14 @@ public ReadDocument getNewDocument() {
return doc;
}

private QueryOuterClass.Query buildSelectQuery() {
private QueryOuterClass.Query buildSelectQuery(DBFilterBase.IDFilter idOverride) {
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
List<BuiltCondition> conditions = new ArrayList<>(filters.size());
for (DBFilterBase filter : filters) {
conditions.add(filter.get());
}
if (idOverride != null) {
conditions.add(idOverride.get());
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
}
return new QueryBuilder()
.select()
.column(ReadType.DOCUMENT == readType ? documentColumns : documentKeyColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
() -> new AtomicReference<String>(null),
stateRef -> {
Uni<ReadOperation.FindResponse> docsToUpdate =
readOperation().getDocuments(queryExecutor, stateRef.get());
readOperation().getDocuments(queryExecutor, stateRef.get(), null);
return docsToUpdate
.onItem()
.invoke(findResponse -> stateRef.set(findResponse.pagingState()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public DeleteManyCommandResolver(DocumentConfig documentConfig, ObjectMapper obj
public Operation resolveCommand(CommandContext commandContext, DeleteManyCommand command) {
ReadOperation readOperation = resolve(commandContext, command);
return new DeleteOperation(
commandContext, readOperation, documentConfig.maxDocumentDeleteCount());
commandContext,
readOperation,
documentConfig.maxDocumentDeleteCount(),
documentConfig.maxLWTFailureRetry());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.impl.DeleteOneCommand;
import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadType;
Expand All @@ -20,15 +21,19 @@
public class DeleteOneCommandResolver extends FilterableResolver<DeleteOneCommand>
implements CommandResolver<DeleteOneCommand> {

private final DocumentConfig documentConfig;

@Inject
public DeleteOneCommandResolver(ObjectMapper objectMapper) {
public DeleteOneCommandResolver(DocumentConfig documentConfig, ObjectMapper objectMapper) {
super(objectMapper);
this.documentConfig = documentConfig;
}

@Override
public Operation resolveCommand(CommandContext commandContext, DeleteOneCommand command) {
ReadOperation readOperation = resolve(commandContext, command);
return new DeleteOperation(commandContext, readOperation, 1);
return new DeleteOperation(
commandContext, readOperation, 1, documentConfig.maxLWTFailureRetry());
}

@Override
Expand Down
Loading