Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete - LWT failure retries #218

Merged
merged 16 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public enum ErrorCode {
/** Command error codes. */
COMMAND_NOT_IMPLEMENTED("The provided command is not implemented."),

CONCURRENCY_FAILURE("Unable to complete transaction due to concurrent transactions"),

DOCUMENT_ALREADY_EXISTS("Document already exists with the id"),

DOCUMENT_UNPARSEABLE("Unable to parse the document"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,15 @@ public interface DocumentConfig {
@Positive
@WithDefault("20")
int maxDocumentUpdateCount();

/** {@inheritDoc} */
LwtConfig lwt();

interface LwtConfig {
/** @return Defines the maximum retry for lwt failure <code>3</code>. */
@Max(5)
@Positive
@WithDefault("3")
int retries();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ private QueryOuterClass.Query buildSelectQuery() {
}

@Override
public Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String pagingState) {
public Uni<FindResponse> getDocuments(
QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter) {
return Uni.createFrom().failure(new JsonApiException(ErrorCode.UNSUPPORTED_OPERATION));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.stargate.sgv2.jsonapi.exception.ErrorCode;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.ReadDocument;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.ArrayList;
Expand Down Expand Up @@ -116,9 +117,13 @@ default Uni<CountResponse> countDocuments(
* used by other commands which needs a document to be read.
*
* @param queryExecutor
* @param pagingState
* @param additionalIdFilter Used if a additional id filter need to be added to already available
* filters
* @return
*/
Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String pagingState);
Uni<FindResponse> getDocuments(
QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter);

/**
* A operation method which can return ReadDocument with an empty document, if the filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.exception.ErrorCode;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers;
import io.stargate.sgv2.jsonapi.service.operation.model.ModifyOperation;
Expand All @@ -27,58 +29,88 @@ public record DeleteOperation(
* Added parameter to pass number of document to be deleted, this is needed because read
* documents limit changed to deleteLimit + 1
*/
int deleteLimit)
int deleteLimit,
int retryLimit)
implements ModifyOperation {
@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
final AtomicBoolean moreData = new AtomicBoolean(false);
final QueryOuterClass.Query delete = buildDeleteQuery();
final Multi<ReadOperation.FindResponse> findResponses =
Multi.createBy()
.repeating()
.uni(
() -> new AtomicReference<String>(null),
stateRef -> {
Uni<ReadOperation.FindResponse> docsToDelete =
readOperation().getDocuments(queryExecutor, stateRef.get());
return docsToDelete
.onItem()
.invoke(findResponse -> stateRef.set(findResponse.pagingState()));
})
.whilst(findResponse -> findResponse.pagingState() != null);
AtomicInteger totalCount = new AtomicInteger(0);
final Uni<AtomicInteger> counter =
findResponses
.onItem()
.transformToMulti(
findResponse -> {
final List<ReadDocument> docs = findResponse.docs();
// Below conditionality is because we read up to deleteLimit +1 record.
if (totalCount.get() + docs.size() <= deleteLimit) {
totalCount.addAndGet(docs.size());
return Multi.createFrom().items(docs.stream());
} else {
int needed = deleteLimit - totalCount.get();
totalCount.addAndGet(needed);
moreData.set(true);
return Multi.createFrom()
.items(findResponse.docs().subList(0, needed).stream());
}
})
.concatenate()
.onItem()
.transformToUniAndConcatenate(
readDocument -> deleteDocument(queryExecutor, delete, readDocument))
.collect()
.in(
AtomicInteger::new,
(atomicCounter, flag) -> {
if (flag) {
atomicCounter.incrementAndGet();
}
});
final int retryAttempt = retryLimit - 2;
// Read the required records to be deleted
return Multi.createBy()
.repeating()
.uni(
() -> new AtomicReference<String>(null),
stateRef -> {
Uni<ReadOperation.FindResponse> docsToDelete =
readOperation().getDocuments(queryExecutor, stateRef.get(), null);
return docsToDelete
.onItem()
.invoke(findResponse -> stateRef.set(findResponse.pagingState()));
})

return counter
// Documents read until pagingState available, max records read is deleteLimit + 1
.whilst(findResponse -> findResponse.pagingState() != null)

// Get the deleteLimit # of documents to be delete and set moreData flag true if extra
// document is read.
.onItem()
.transformToMulti(
findResponse -> {
final List<ReadDocument> docs = findResponse.docs();
// Below conditionality is because we read up to deleteLimit +1 record.
if (totalCount.get() + docs.size() <= deleteLimit) {
totalCount.addAndGet(docs.size());
return Multi.createFrom().items(docs.stream());
} else {
int needed = deleteLimit - totalCount.get();
totalCount.addAndGet(needed);
moreData.set(true);
return Multi.createFrom().items(findResponse.docs().subList(0, needed).stream());
}
})
.concatenate()

// Run delete for selected documents and retry in case of
.onItem()
.transformToUniAndMerge(
document -> {
return deleteDocument(queryExecutor, delete, document)
// Retry `retryLimit` times in case of LWT failure
.onFailure(LWTException.class)
.recoverWithUni(
() -> {
return Uni.createFrom()
.item(document)
.flatMap(
prevDoc -> {
return readDocumentAgain(queryExecutor, prevDoc)
.onItem()
// Try deleting the document
.transformToUni(
reReadDocument ->
deleteDocument(
queryExecutor, delete, reReadDocument));
})
.onFailure(LWTException.class)
.retry()
// because it's already run twice before this
// check.
.atMost(retryLimit - 1);
});
})
.collect()

// Count the successful deletes
.in(
AtomicInteger::new,
(atomicCounter, flag) -> {
if (flag) {
atomicCounter.incrementAndGet();
}
})
.onItem()
.transform(deletedCounter -> new DeleteOperationPage(deletedCounter.get(), moreData.get()));
}
Expand Down Expand Up @@ -107,20 +139,60 @@ private QueryOuterClass.Query buildDeleteQuery() {
* @param queryExecutor
* @param query
* @param doc
* @return Uni<Boolean> `true` if deleted successfully, else `false`
* @return Uni<Boolean> `true` if deleted successfully, else `false` if data changed and no longer
* match the conditions and throws JsonApiException if LWT failure.
*/
private static Uni<Boolean> deleteDocument(
QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc) {
query = bindDeleteQuery(query, doc);
return queryExecutor
.executeWrite(query)
private Uni<Boolean> deleteDocument(
QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc)
throws JsonApiException {
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
return Uni.createFrom()
.item(doc)
// Read again if retryAttempt >`0`
.onItem()
.transformToUni(
result -> {
if (result.getRows(0).getValues(0).getBoolean()) {
return Uni.createFrom().item(true);
} else {
document -> {
if (document == null) {
return Uni.createFrom().item(false);
} else {
QueryOuterClass.Query boundQuery = bindDeleteQuery(query, document);
return queryExecutor
.executeWrite(boundQuery)
.onItem()
.transform(
result -> {
// LWT returns `true` for successful transaction, false on failure.
if (result.getRows(0).getValues(0).getBoolean()) {
// In case of successful document delete
return true;
} else {
// In case of successful document delete

throw new LWTException(
ErrorCode.CONCURRENCY_FAILURE,
"Delete failed for document with id %s because of concurrent transaction"
.formatted(document.id().value()));
}
});
}
});
}

private Uni<ReadDocument> readDocumentAgain(
QueryExecutor queryExecutor, ReadDocument prevReadDoc) {
// Read again if retry flag is `true`
return readOperation()
.getDocuments(
queryExecutor,
null,
new DBFilterBase.IDFilter(DBFilterBase.IDFilter.Operator.EQ, prevReadDoc.id()))
.onItem()
.transform(
response -> {
if (!response.docs().isEmpty()) {
return response.docs().get(0);
} else {
// If data changed and doesn't satisfy filter conditions
return null;
}
});
}
Expand All @@ -133,4 +205,11 @@ private static QueryOuterClass.Query bindDeleteQuery(
.addValues(Values.of(doc.txnId()));
return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build();
}

/** Inherited Exception class to handle retry */
private class LWTException extends JsonApiException {
public LWTException(ErrorCode errorCode, String message) {
super(errorCode, message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ public record FindOperation(

@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
return getDocuments(queryExecutor, pagingState())
return getDocuments(queryExecutor, pagingState(), null)
.onItem()
.transform(docs -> new ReadOperationPage(docs.docs(), docs.pagingState()));
}

@Override
public Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String pagingState) {
public Uni<FindResponse> getDocuments(
QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter) {
switch (readType) {
case DOCUMENT:
case KEY:
{
QueryOuterClass.Query query = buildSelectQuery();
QueryOuterClass.Query query = buildSelectQuery(additionalIdFilter);
return findDocument(
queryExecutor,
query,
Expand Down Expand Up @@ -83,10 +84,15 @@ public ReadDocument getNewDocument() {
return doc;
}

private QueryOuterClass.Query buildSelectQuery() {
private QueryOuterClass.Query buildSelectQuery(DBFilterBase.IDFilter additionalIdFilter) {
List<BuiltCondition> conditions = new ArrayList<>(filters.size());
for (DBFilterBase filter : filters) {
conditions.add(filter.get());
if (additionalIdFilter == null
|| (additionalIdFilter != null && !(filter instanceof DBFilterBase.IDFilter)))
conditions.add(filter.get());
}
if (additionalIdFilter != null) {
conditions.add(additionalIdFilter.get());
}
return new QueryBuilder()
.select()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
() -> new AtomicReference<String>(null),
stateRef -> {
Uni<ReadOperation.FindResponse> docsToUpdate =
readOperation().getDocuments(queryExecutor, stateRef.get());
readOperation().getDocuments(queryExecutor, stateRef.get(), null);
return docsToUpdate
.onItem()
.invoke(findResponse -> stateRef.set(findResponse.pagingState()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public DeleteManyCommandResolver(DocumentConfig documentConfig, ObjectMapper obj
public Operation resolveCommand(CommandContext commandContext, DeleteManyCommand command) {
ReadOperation readOperation = resolve(commandContext, command);
return new DeleteOperation(
commandContext, readOperation, documentConfig.maxDocumentDeleteCount());
commandContext,
readOperation,
documentConfig.maxDocumentDeleteCount(),
documentConfig.lwt().retries());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.impl.DeleteOneCommand;
import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadType;
Expand All @@ -20,15 +21,18 @@
public class DeleteOneCommandResolver extends FilterableResolver<DeleteOneCommand>
implements CommandResolver<DeleteOneCommand> {

private final DocumentConfig documentConfig;

@Inject
public DeleteOneCommandResolver(ObjectMapper objectMapper) {
public DeleteOneCommandResolver(DocumentConfig documentConfig, ObjectMapper objectMapper) {
super(objectMapper);
this.documentConfig = documentConfig;
}

@Override
public Operation resolveCommand(CommandContext commandContext, DeleteOneCommand command) {
ReadOperation readOperation = resolve(commandContext, command);
return new DeleteOperation(commandContext, readOperation, 1);
return new DeleteOperation(commandContext, readOperation, 1, documentConfig.lwt().retries());
}

@Override
Expand Down
Loading