diff --git a/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java b/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java
index d1389234a1..76311cca63 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java
@@ -6,6 +6,8 @@ public enum ErrorCode {
/** Command error codes. */
COMMAND_NOT_IMPLEMENTED("The provided command is not implemented."),
+ CONCURRENCY_FAILURE("Unable to complete transaction due to concurrent transactions"),
+
DOCUMENT_ALREADY_EXISTS("Document already exists with the id"),
DOCUMENT_UNPARSEABLE("Unable to parse the document"),
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java
index 068e6889df..79a6a16a98 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java
@@ -64,4 +64,15 @@ public interface DocumentConfig {
@Positive
@WithDefault("20")
int maxDocumentUpdateCount();
+
+ /** {@inheritDoc} */
+ LwtConfig lwt();
+
+ interface LwtConfig {
+ /** @return Defines the maximum retry for lwt failure 3
. */
+ @Max(5)
+ @Positive
+ @WithDefault("3")
+ int retries();
+ }
}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
index 2d99902220..107d30fdf1 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java
@@ -46,7 +46,8 @@ private QueryOuterClass.Query buildSelectQuery() {
}
@Override
- public Uni getDocuments(QueryExecutor queryExecutor, String pagingState) {
+ public Uni getDocuments(
+ QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter) {
return Uni.createFrom().failure(new JsonApiException(ErrorCode.UNSUPPORTED_OPERATION));
}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
index 4f704a27e4..b9852d7822 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java
@@ -9,6 +9,7 @@
import io.stargate.sgv2.jsonapi.exception.ErrorCode;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
+import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.ReadDocument;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.ArrayList;
@@ -116,9 +117,13 @@ default Uni countDocuments(
* used by other commands which needs a document to be read.
*
* @param queryExecutor
+ * @param pagingState
+ * @param additionalIdFilter Used if a additional id filter need to be added to already available
+ * filters
* @return
*/
- Uni getDocuments(QueryExecutor queryExecutor, String pagingState);
+ Uni getDocuments(
+ QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter);
/**
* A operation method which can return ReadDocument with an empty document, if the filter
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java
index 33b377acb7..cf16e317bc 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java
@@ -6,6 +6,8 @@
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
+import io.stargate.sgv2.jsonapi.exception.ErrorCode;
+import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers;
import io.stargate.sgv2.jsonapi.service.operation.model.ModifyOperation;
@@ -27,58 +29,88 @@ public record DeleteOperation(
* Added parameter to pass number of document to be deleted, this is needed because read
* documents limit changed to deleteLimit + 1
*/
- int deleteLimit)
+ int deleteLimit,
+ int retryLimit)
implements ModifyOperation {
@Override
public Uni> execute(QueryExecutor queryExecutor) {
final AtomicBoolean moreData = new AtomicBoolean(false);
final QueryOuterClass.Query delete = buildDeleteQuery();
- final Multi findResponses =
- Multi.createBy()
- .repeating()
- .uni(
- () -> new AtomicReference(null),
- stateRef -> {
- Uni docsToDelete =
- readOperation().getDocuments(queryExecutor, stateRef.get());
- return docsToDelete
- .onItem()
- .invoke(findResponse -> stateRef.set(findResponse.pagingState()));
- })
- .whilst(findResponse -> findResponse.pagingState() != null);
AtomicInteger totalCount = new AtomicInteger(0);
- final Uni counter =
- findResponses
- .onItem()
- .transformToMulti(
- findResponse -> {
- final List docs = findResponse.docs();
- // Below conditionality is because we read up to deleteLimit +1 record.
- if (totalCount.get() + docs.size() <= deleteLimit) {
- totalCount.addAndGet(docs.size());
- return Multi.createFrom().items(docs.stream());
- } else {
- int needed = deleteLimit - totalCount.get();
- totalCount.addAndGet(needed);
- moreData.set(true);
- return Multi.createFrom()
- .items(findResponse.docs().subList(0, needed).stream());
- }
- })
- .concatenate()
- .onItem()
- .transformToUniAndConcatenate(
- readDocument -> deleteDocument(queryExecutor, delete, readDocument))
- .collect()
- .in(
- AtomicInteger::new,
- (atomicCounter, flag) -> {
- if (flag) {
- atomicCounter.incrementAndGet();
- }
- });
+ final int retryAttempt = retryLimit - 2;
+ // Read the required records to be deleted
+ return Multi.createBy()
+ .repeating()
+ .uni(
+ () -> new AtomicReference(null),
+ stateRef -> {
+ Uni docsToDelete =
+ readOperation().getDocuments(queryExecutor, stateRef.get(), null);
+ return docsToDelete
+ .onItem()
+ .invoke(findResponse -> stateRef.set(findResponse.pagingState()));
+ })
- return counter
+ // Documents read until pagingState available, max records read is deleteLimit + 1
+ .whilst(findResponse -> findResponse.pagingState() != null)
+
+ // Get the deleteLimit # of documents to be delete and set moreData flag true if extra
+ // document is read.
+ .onItem()
+ .transformToMulti(
+ findResponse -> {
+ final List docs = findResponse.docs();
+ // Below conditionality is because we read up to deleteLimit +1 record.
+ if (totalCount.get() + docs.size() <= deleteLimit) {
+ totalCount.addAndGet(docs.size());
+ return Multi.createFrom().items(docs.stream());
+ } else {
+ int needed = deleteLimit - totalCount.get();
+ totalCount.addAndGet(needed);
+ moreData.set(true);
+ return Multi.createFrom().items(findResponse.docs().subList(0, needed).stream());
+ }
+ })
+ .concatenate()
+
+ // Run delete for selected documents and retry in case of
+ .onItem()
+ .transformToUniAndMerge(
+ document -> {
+ return deleteDocument(queryExecutor, delete, document)
+ // Retry `retryLimit` times in case of LWT failure
+ .onFailure(LWTException.class)
+ .recoverWithUni(
+ () -> {
+ return Uni.createFrom()
+ .item(document)
+ .flatMap(
+ prevDoc -> {
+ return readDocumentAgain(queryExecutor, prevDoc)
+ .onItem()
+ // Try deleting the document
+ .transformToUni(
+ reReadDocument ->
+ deleteDocument(
+ queryExecutor, delete, reReadDocument));
+ })
+ .onFailure(LWTException.class)
+ .retry()
+ // because it's already run twice before this
+ // check.
+ .atMost(retryLimit - 1);
+ });
+ })
+ .collect()
+
+ // Count the successful deletes
+ .in(
+ AtomicInteger::new,
+ (atomicCounter, flag) -> {
+ if (flag) {
+ atomicCounter.incrementAndGet();
+ }
+ })
.onItem()
.transform(deletedCounter -> new DeleteOperationPage(deletedCounter.get(), moreData.get()));
}
@@ -107,20 +139,60 @@ private QueryOuterClass.Query buildDeleteQuery() {
* @param queryExecutor
* @param query
* @param doc
- * @return Uni `true` if deleted successfully, else `false`
+ * @return Uni `true` if deleted successfully, else `false` if data changed and no longer
+ * match the conditions and throws JsonApiException if LWT failure.
*/
- private static Uni deleteDocument(
- QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc) {
- query = bindDeleteQuery(query, doc);
- return queryExecutor
- .executeWrite(query)
+ private Uni deleteDocument(
+ QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc)
+ throws JsonApiException {
+ return Uni.createFrom()
+ .item(doc)
+ // Read again if retryAttempt >`0`
.onItem()
.transformToUni(
- result -> {
- if (result.getRows(0).getValues(0).getBoolean()) {
- return Uni.createFrom().item(true);
- } else {
+ document -> {
+ if (document == null) {
return Uni.createFrom().item(false);
+ } else {
+ QueryOuterClass.Query boundQuery = bindDeleteQuery(query, document);
+ return queryExecutor
+ .executeWrite(boundQuery)
+ .onItem()
+ .transform(
+ result -> {
+ // LWT returns `true` for successful transaction, false on failure.
+ if (result.getRows(0).getValues(0).getBoolean()) {
+ // In case of successful document delete
+ return true;
+ } else {
+ // In case of successful document delete
+
+ throw new LWTException(
+ ErrorCode.CONCURRENCY_FAILURE,
+ "Delete failed for document with id %s because of concurrent transaction"
+ .formatted(document.id().value()));
+ }
+ });
+ }
+ });
+ }
+
+ private Uni readDocumentAgain(
+ QueryExecutor queryExecutor, ReadDocument prevReadDoc) {
+ // Read again if retry flag is `true`
+ return readOperation()
+ .getDocuments(
+ queryExecutor,
+ null,
+ new DBFilterBase.IDFilter(DBFilterBase.IDFilter.Operator.EQ, prevReadDoc.id()))
+ .onItem()
+ .transform(
+ response -> {
+ if (!response.docs().isEmpty()) {
+ return response.docs().get(0);
+ } else {
+ // If data changed and doesn't satisfy filter conditions
+ return null;
}
});
}
@@ -133,4 +205,11 @@ private static QueryOuterClass.Query bindDeleteQuery(
.addValues(Values.of(doc.txnId()));
return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build();
}
+
+ /** Inherited Exception class to handle retry */
+ private class LWTException extends JsonApiException {
+ public LWTException(ErrorCode errorCode, String message) {
+ super(errorCode, message);
+ }
+ }
}
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java
index bcc6675eac..b1a01a41fc 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java
@@ -34,18 +34,19 @@ public record FindOperation(
@Override
public Uni> execute(QueryExecutor queryExecutor) {
- return getDocuments(queryExecutor, pagingState())
+ return getDocuments(queryExecutor, pagingState(), null)
.onItem()
.transform(docs -> new ReadOperationPage(docs.docs(), docs.pagingState()));
}
@Override
- public Uni getDocuments(QueryExecutor queryExecutor, String pagingState) {
+ public Uni getDocuments(
+ QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter) {
switch (readType) {
case DOCUMENT:
case KEY:
{
- QueryOuterClass.Query query = buildSelectQuery();
+ QueryOuterClass.Query query = buildSelectQuery(additionalIdFilter);
return findDocument(
queryExecutor,
query,
@@ -83,10 +84,15 @@ public ReadDocument getNewDocument() {
return doc;
}
- private QueryOuterClass.Query buildSelectQuery() {
+ private QueryOuterClass.Query buildSelectQuery(DBFilterBase.IDFilter additionalIdFilter) {
List conditions = new ArrayList<>(filters.size());
for (DBFilterBase filter : filters) {
- conditions.add(filter.get());
+ if (additionalIdFilter == null
+ || (additionalIdFilter != null && !(filter instanceof DBFilterBase.IDFilter)))
+ conditions.add(filter.get());
+ }
+ if (additionalIdFilter != null) {
+ conditions.add(additionalIdFilter.get());
}
return new QueryBuilder()
.select()
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java
index f5677ca15d..14405ab03a 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java
@@ -54,7 +54,7 @@ public Uni> execute(QueryExecutor queryExecutor) {
() -> new AtomicReference(null),
stateRef -> {
Uni docsToUpdate =
- readOperation().getDocuments(queryExecutor, stateRef.get());
+ readOperation().getDocuments(queryExecutor, stateRef.get(), null);
return docsToUpdate
.onItem()
.invoke(findResponse -> stateRef.set(findResponse.pagingState()));
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java
index 080133be41..ab157a1617 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java
@@ -33,7 +33,10 @@ public DeleteManyCommandResolver(DocumentConfig documentConfig, ObjectMapper obj
public Operation resolveCommand(CommandContext commandContext, DeleteManyCommand command) {
ReadOperation readOperation = resolve(commandContext, command);
return new DeleteOperation(
- commandContext, readOperation, documentConfig.maxDocumentDeleteCount());
+ commandContext,
+ readOperation,
+ documentConfig.maxDocumentDeleteCount(),
+ documentConfig.lwt().retries());
}
@Override
diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java
index fb5af4fd9d..ba56b05b1c 100644
--- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java
+++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java
@@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.impl.DeleteOneCommand;
+import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadType;
@@ -20,15 +21,18 @@
public class DeleteOneCommandResolver extends FilterableResolver
implements CommandResolver {
+ private final DocumentConfig documentConfig;
+
@Inject
- public DeleteOneCommandResolver(ObjectMapper objectMapper) {
+ public DeleteOneCommandResolver(DocumentConfig documentConfig, ObjectMapper objectMapper) {
super(objectMapper);
+ this.documentConfig = documentConfig;
}
@Override
public Operation resolveCommand(CommandContext commandContext, DeleteOneCommand command) {
ReadOperation readOperation = resolve(commandContext, command);
- return new DeleteOperation(commandContext, readOperation, 1);
+ return new DeleteOperation(commandContext, readOperation, 1, documentConfig.lwt().retries());
}
@Override
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java
index 5b0be64390..34a590608d 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java
@@ -15,6 +15,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
+import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadType;
@@ -95,7 +96,7 @@ public void deleteWithId() {
1,
ReadType.KEY,
objectMapper);
- DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3);
Supplier execute =
operation
.execute(queryExecutor)
@@ -148,7 +149,7 @@ public void deleteWithIdNoData() {
ReadType.KEY,
objectMapper);
- DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3);
Supplier execute =
operation
.execute(queryExecutor)
@@ -216,7 +217,7 @@ public void deleteWithDynamic() {
1,
ReadType.KEY,
objectMapper);
- DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3);
Supplier execute =
operation
@@ -235,6 +236,314 @@ public void deleteWithDynamic() {
assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 1);
}
+ @Test
+ public void deleteWithDynamicRetry() {
+ UUID tx_id1 = UUID.randomUUID();
+ UUID tx_id2 = UUID.randomUUID();
+ String collectionReadCql =
+ "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert candidatesAssert =
+ withQuery(
+ collectionReadCql,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.VARCHAR)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(tx_id1))));
+
+ String collectionReadCql2 =
+ "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert candidatesAssert2 =
+ withQuery(
+ collectionReadCql2,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()),
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.VARCHAR)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(tx_id2))));
+
+ String collectionDeleteCql =
+ "DELETE FROM \"%s\".\"%s\" WHERE key = ? IF tx_id = ?"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert deleteAssert =
+ withQuery(
+ collectionDeleteCql,
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
+ Values.of(tx_id1))
+ .returning(List.of(List.of(Values.of(false))));
+ ValidatingStargateBridge.QueryAssert deleteAssert2 =
+ withQuery(
+ collectionDeleteCql,
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
+ Values.of(tx_id2))
+ .returning(List.of(List.of(Values.of(true))));
+
+ FindOperation findOperation =
+ new FindOperation(
+ COMMAND_CONTEXT,
+ List.of(
+ new DBFilterBase.TextFilter(
+ "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")),
+ null,
+ 1,
+ 1,
+ ReadType.KEY,
+ objectMapper);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 2);
+
+ Supplier execute =
+ operation
+ .execute(queryExecutor)
+ .subscribe()
+ .withSubscriber(UniAssertSubscriber.create())
+ .awaitItem()
+ .getItem();
+
+ // assert query execution
+ candidatesAssert.assertExecuteCount().isOne();
+ candidatesAssert2.assertExecuteCount().isOne();
+ deleteAssert.assertExecuteCount().isOne();
+ deleteAssert2.assertExecuteCount().isOne();
+ // then result
+
+ // then result
+ CommandResult result = execute.get();
+ assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 1);
+ }
+
+ @Test
+ public void deleteWithDynamicRetryFailure() {
+ UUID tx_id1 = UUID.randomUUID();
+ UUID tx_id2 = UUID.randomUUID();
+ String collectionReadCql =
+ "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert candidatesAssert =
+ withQuery(
+ collectionReadCql,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.VARCHAR)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(tx_id1))));
+
+ String collectionReadCql2 =
+ "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert candidatesAssert2 =
+ withQuery(
+ collectionReadCql2,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()),
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.VARCHAR)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(tx_id2))));
+
+ String collectionDeleteCql =
+ "DELETE FROM \"%s\".\"%s\" WHERE key = ? IF tx_id = ?"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert deleteAssert =
+ withQuery(
+ collectionDeleteCql,
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
+ Values.of(tx_id1))
+ .returning(List.of(List.of(Values.of(false))));
+ ValidatingStargateBridge.QueryAssert deleteAssert2 =
+ withQuery(
+ collectionDeleteCql,
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
+ Values.of(tx_id2))
+ .returning(List.of(List.of(Values.of(false))));
+
+ FindOperation findOperation =
+ new FindOperation(
+ COMMAND_CONTEXT,
+ List.of(
+ new DBFilterBase.TextFilter(
+ "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")),
+ null,
+ 1,
+ 1,
+ ReadType.KEY,
+ objectMapper);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 2);
+
+ final UniAssertSubscriber> supplierUniAssertSubscriber =
+ operation.execute(queryExecutor).subscribe().withSubscriber(UniAssertSubscriber.create());
+
+ // assert query execution
+ candidatesAssert.assertExecuteCount().isOne();
+ candidatesAssert2.assertExecuteCount().isEqualTo(2);
+ deleteAssert.assertExecuteCount().isOne();
+ deleteAssert2.assertExecuteCount().isEqualTo(2);
+ // then result
+
+ supplierUniAssertSubscriber.assertFailedWith(
+ JsonApiException.class,
+ "Delete failed for document with id %s because of concurrent transaction"
+ .formatted("doc1"));
+ }
+
+ @Test
+ public void deleteWithDynamicRetryConcurrentDelete() {
+ UUID tx_id1 = UUID.randomUUID();
+ String collectionReadCql =
+ "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert candidatesAssert =
+ withQuery(
+ collectionReadCql,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.VARCHAR)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(tx_id1))));
+
+ String collectionReadCql2 =
+ "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert candidatesAssert2 =
+ withQuery(
+ collectionReadCql2,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()),
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.VARCHAR)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build()))
+ .returning(List.of());
+
+ String collectionDeleteCql =
+ "DELETE FROM \"%s\".\"%s\" WHERE key = ? IF tx_id = ?"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ ValidatingStargateBridge.QueryAssert deleteAssert =
+ withQuery(
+ collectionDeleteCql,
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
+ Values.of(tx_id1))
+ .returning(List.of(List.of(Values.of(false))));
+
+ FindOperation findOperation =
+ new FindOperation(
+ COMMAND_CONTEXT,
+ List.of(
+ new DBFilterBase.TextFilter(
+ "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")),
+ null,
+ 1,
+ 1,
+ ReadType.KEY,
+ objectMapper);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 2);
+
+ Supplier execute =
+ operation
+ .execute(queryExecutor)
+ .subscribe()
+ .withSubscriber(UniAssertSubscriber.create())
+ .awaitItem()
+ .getItem();
+
+ // assert query execution
+ candidatesAssert.assertExecuteCount().isOne();
+ candidatesAssert2.assertExecuteCount().isOne();
+ deleteAssert.assertExecuteCount().isOne();
+ // then result
+
+ // then result
+ CommandResult result = execute.get();
+ assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 0);
+ }
+
@Test
public void deleteManyWithDynamic() {
UUID tx_id1 = UUID.randomUUID();
@@ -299,7 +608,7 @@ public void deleteManyWithDynamic() {
2,
ReadType.KEY,
objectMapper);
- DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2, 3);
Supplier execute =
operation
@@ -382,7 +691,7 @@ public void deleteManyWithDynamicPaging() {
1,
ReadType.KEY,
objectMapper);
- DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2, 3);
Supplier execute =
operation
@@ -471,7 +780,7 @@ public void deleteManyWithDynamicPagingAndMoreData() {
1,
ReadType.KEY,
objectMapper);
- DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2, 3);
Supplier execute =
operation
@@ -531,7 +840,7 @@ public void deleteWithNoResult() {
1,
ReadType.KEY,
objectMapper);
- DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1);
+ DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3);
Supplier execute =
operation
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java
index 631b59f74f..4ce62e3ce6 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java
@@ -15,6 +15,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers;
+import io.stargate.sgv2.jsonapi.service.operation.model.ReadOperation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadType;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocValueHasher;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
@@ -693,5 +694,202 @@ public void findWithNoResult() throws Exception {
assertThat(result.data().docs()).hasSize(0);
});
}
+
+ @Test
+ public void findWithIdWithIdRetry() throws Exception {
+ String collectionReadCql =
+ "SELECT key, tx_id, doc_json FROM \"%s\".\"%s\" WHERE key = ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ String doc1 =
+ """
+ {
+ "_id": "doc1",
+ "username": "user1"
+ }
+ """;
+ ValidatingStargateBridge.QueryAssert candidatesAssert =
+ withQuery(
+ collectionReadCql,
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.tuple(TypeSpecs.TINYINT, TypeSpecs.VARCHAR))
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("doc_json")
+ .setType(TypeSpecs.VARCHAR)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(UUID.randomUUID()),
+ Values.of(doc1))));
+ FindOperation findOperation =
+ new FindOperation(
+ commandContext,
+ List.of(
+ new DBFilterBase.IDFilter(
+ DBFilterBase.IDFilter.Operator.EQ, DocumentId.fromString("doc1"))),
+ null,
+ 1,
+ 1,
+ ReadType.DOCUMENT,
+ objectMapper);
+ final ReadOperation.FindResponse result =
+ findOperation
+ .getDocuments(
+ queryExecutor,
+ null,
+ new DBFilterBase.IDFilter(
+ DBFilterBase.IDFilter.Operator.EQ, DocumentId.fromString("doc1")))
+ .subscribeAsCompletionStage()
+ .get();
+ assertThat(result)
+ .satisfies(
+ commandResult -> {
+ assertThat(result.docs()).isNotNull();
+ assertThat(result.docs()).hasSize(1);
+ });
+ }
+
+ @Test
+ public void findWithDynamicGetDocument() throws Exception {
+ String collectionReadCql =
+ "SELECT key, tx_id, doc_json FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ String doc1 =
+ """
+ {
+ "_id": "doc1",
+ "username": "user1"
+ }
+ """;
+ ValidatingStargateBridge.QueryAssert candidatesAssert =
+ withQuery(
+ collectionReadCql,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.tuple(TypeSpecs.TINYINT, TypeSpecs.VARCHAR))
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("doc_json")
+ .setType(TypeSpecs.VARCHAR)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(UUID.randomUUID()),
+ Values.of(doc1))));
+ FindOperation findOperation =
+ new FindOperation(
+ commandContext,
+ List.of(
+ new DBFilterBase.TextFilter(
+ "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")),
+ null,
+ 1,
+ 1,
+ ReadType.DOCUMENT,
+ objectMapper);
+ final ReadOperation.FindResponse result =
+ findOperation.getDocuments(queryExecutor, null, null).subscribeAsCompletionStage().get();
+ assertThat(result)
+ .satisfies(
+ commandResult -> {
+ assertThat(result.docs()).isNotNull();
+ assertThat(result.docs()).hasSize(1);
+ });
+ }
+
+ @Test
+ public void findWithDynamicWithIdRetry() throws Exception {
+ String collectionReadCql =
+ "SELECT key, tx_id, doc_json FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1"
+ .formatted(KEYSPACE_NAME, COLLECTION_NAME);
+ String doc1 =
+ """
+ {
+ "_id": "doc1",
+ "username": "user1"
+ }
+ """;
+ ValidatingStargateBridge.QueryAssert candidatesAssert =
+ withQuery(
+ collectionReadCql,
+ Values.of("username " + new DocValueHasher().getHash("user1").hash()),
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))))
+ .withPageSize(1)
+ .withColumnSpec(
+ List.of(
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("key")
+ .setType(TypeSpecs.tuple(TypeSpecs.TINYINT, TypeSpecs.VARCHAR))
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("tx_id")
+ .setType(TypeSpecs.UUID)
+ .build(),
+ QueryOuterClass.ColumnSpec.newBuilder()
+ .setName("doc_json")
+ .setType(TypeSpecs.VARCHAR)
+ .build()))
+ .returning(
+ List.of(
+ List.of(
+ Values.of(
+ CustomValueSerializers.getDocumentIdValue(
+ DocumentId.fromString("doc1"))),
+ Values.of(UUID.randomUUID()),
+ Values.of(doc1))));
+ FindOperation findOperation =
+ new FindOperation(
+ commandContext,
+ List.of(
+ new DBFilterBase.TextFilter(
+ "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")),
+ null,
+ 1,
+ 1,
+ ReadType.DOCUMENT,
+ objectMapper);
+ final ReadOperation.FindResponse result =
+ findOperation
+ .getDocuments(
+ queryExecutor,
+ null,
+ new DBFilterBase.IDFilter(
+ DBFilterBase.IDFilter.Operator.EQ, DocumentId.fromString("doc1")))
+ .subscribeAsCompletionStage()
+ .get();
+ assertThat(result)
+ .satisfies(
+ commandResult -> {
+ assertThat(result.docs()).isNotNull();
+ assertThat(result.docs()).hasSize(1);
+ });
+ }
}
}
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java
index 569c6eca9a..28a2afb54b 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java
@@ -52,6 +52,7 @@ public void idFilterCondition() throws Exception {
op -> {
assertThat(op.commandContext()).isEqualTo(commandContext);
assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount());
+ assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries());
assertThat(op.readOperation())
.isInstanceOfSatisfying(
FindOperation.class,
@@ -91,6 +92,7 @@ public void noFilterCondition() throws Exception {
op -> {
assertThat(op.commandContext()).isEqualTo(commandContext);
assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount());
+ assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries());
assertThat(op.readOperation())
.isInstanceOfSatisfying(
FindOperation.class,
@@ -127,6 +129,7 @@ public void dynamicFilterCondition() throws Exception {
op -> {
assertThat(op.commandContext()).isEqualTo(commandContext);
assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount());
+ assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries());
assertThat(op.readOperation())
.isInstanceOfSatisfying(
FindOperation.class,
diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java
index c0c989622c..0c57b29b81 100644
--- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java
+++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java
@@ -9,6 +9,7 @@
import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.impl.DeleteOneCommand;
+import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.operation.model.ReadType;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase;
@@ -23,6 +24,7 @@
@TestProfile(NoGlobalResourcesTestProfile.Impl.class)
public class DeleteOneCommandResolverTest {
@Inject ObjectMapper objectMapper;
+ @Inject DocumentConfig documentConfig;
@Inject DeleteOneCommandResolver resolver;
@Nested
@@ -50,6 +52,7 @@ public void idFilterCondition() throws Exception {
op -> {
assertThat(op.commandContext()).isEqualTo(commandContext);
assertThat(op.deleteLimit()).isEqualTo(1);
+ assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries());
assertThat(op.readOperation())
.isInstanceOfSatisfying(
FindOperation.class,
@@ -88,6 +91,7 @@ public void noFilterCondition() throws Exception {
op -> {
assertThat(op.commandContext()).isEqualTo(commandContext);
assertThat(op.deleteLimit()).isEqualTo(1);
+ assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries());
assertThat(op.readOperation())
.isInstanceOfSatisfying(
FindOperation.class,
@@ -123,6 +127,7 @@ public void dynamicFilterCondition() throws Exception {
op -> {
assertThat(op.commandContext()).isEqualTo(commandContext);
assertThat(op.deleteLimit()).isEqualTo(1);
+ assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries());
assertThat(op.readOperation())
.isInstanceOfSatisfying(
FindOperation.class,