From 8ad57b20376888a90bb3a1d32312c8a455396150 Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Tue, 28 Feb 2023 10:26:25 -0500 Subject: [PATCH 01/15] Changes to Add Equals DBFilters to newly created document --- .../command/clause/update/SetOperation.java | 14 +++ .../operation/model/impl/DBFilterBase.java | 119 +++++++++++++++++- .../operation/model/impl/FindOperation.java | 13 +- .../api/v1/FindAndUpdateIntegrationTest.java | 47 +++++++ .../api/v1/UpdateManyIntegrationTest.java | 47 +++++++ 5 files changed, 233 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/clause/update/SetOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/clause/update/SetOperation.java index bc5dc50718..32ad1b160e 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/clause/update/SetOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/api/model/command/clause/update/SetOperation.java @@ -27,6 +27,20 @@ public static SetOperation construct(ObjectNode args) { return new SetOperation(additions); } + /** + * Override method used to set update filter condition fields to the document + * + * @param filterPath + * @param value + * @return + */ + public static SetOperation construct(String filterPath, JsonNode value) { + List additions = new ArrayList<>(); + String path = validateUpdatePath(UpdateOperator.SET, filterPath); + additions.add(new SetAction(path, value)); + return new SetOperation(additions); + } + @Override public boolean updateDocument(ObjectNode doc, UpdateTargetLocator targetLocator) { boolean modified = false; diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java index a92a340a7d..2d6c1089bf 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java @@ -1,5 +1,9 @@ package io.stargate.sgv2.jsonapi.service.operation.model.impl; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.stargate.bridge.grpc.Values; import io.stargate.bridge.proto.QueryOuterClass; import io.stargate.sgv2.api.common.cql.builder.BuiltCondition; @@ -17,6 +21,18 @@ /** Base for the DB filters / conditions that we want to use with queries */ public abstract class DBFilterBase implements Supplier { + private final String path; + + protected DBFilterBase(String path) { + this.path = path; + } + + abstract JsonNode asJson(JsonNodeFactory nodeFactory); + + protected String getPath() { + return path; + } + /** Filter for the map columns we have in the super shredding table. */ public abstract static class MapFilterBase extends DBFilterBase { @@ -47,6 +63,7 @@ public enum Operator { protected MapFilterBase( String columnName, String key, MapFilterBase.Operator operator, T value) { + super(key); this.columnName = columnName; this.key = key; this.operator = operator; @@ -92,22 +109,46 @@ public BuiltCondition get() { /** Filters db documents based on a text field value */ public static class TextFilter extends MapFilterBase { + private final String strValue; + public TextFilter(String path, Operator operator, String value) { super("query_text_values", path, operator, value); + this.strValue = value; + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return nodeFactory.textNode(strValue); } } /** Filters db documents based on a boolean field value */ public static class BoolFilter extends MapFilterBase { + private final boolean boolValue; + public BoolFilter(String path, Operator operator, Boolean value) { super("query_bool_values", path, operator, value); + this.boolValue = value; + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return nodeFactory.booleanNode(boolValue); } } /** Filters db documents based on a numeric field value */ public static class NumberFilter extends MapFilterBase { + private final BigDecimal numberValue; + public NumberFilter(String path, Operator operator, BigDecimal value) { super("query_dbl_values", path, operator, value); + this.numberValue = value; + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return nodeFactory.numberNode(numberValue); } } @@ -121,6 +162,7 @@ public enum Operator { protected final DocumentId value; public IDFilter(IDFilter.Operator operator, DocumentId value) { + super("_id"); this.operator = operator; this.value = value; } @@ -150,6 +192,11 @@ public BuiltCondition get() { String.format("Unsupported id column operation %s", operator)); } } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return DBFilterBase.getJsonNode(nodeFactory, value); + } } /** * DB filter / condition for testing a set value Note: we can only do CONTAINS until SAI indexes @@ -164,7 +211,9 @@ public enum Operator { protected final T value; protected final SetFilterBase.Operator operator; - protected SetFilterBase(String columnName, T value, SetFilterBase.Operator operator) { + protected SetFilterBase( + String columnName, String filterPath, T value, SetFilterBase.Operator operator) { + super(filterPath); this.columnName = columnName; this.value = value; this.operator = operator; @@ -205,7 +254,12 @@ public BuiltCondition get() { */ public static class IsNullFilter extends SetFilterBase { public IsNullFilter(String path) { - super("query_null_values", path, Operator.CONTAINS); + super("query_null_values", path, path, Operator.CONTAINS); + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return DBFilterBase.getJsonNode(nodeFactory, null); } } @@ -216,14 +270,24 @@ public IsNullFilter(String path) { */ public static class ExistsFilter extends SetFilterBase { public ExistsFilter(String path, boolean existFlag) { - super("exist_keys", path, Operator.CONTAINS); + super("exist_keys", path, path, Operator.CONTAINS); + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return null; } } /** Filter for document where all values exists for an array */ public static class AllFilter extends SetFilterBase { public AllFilter(DocValueHasher hasher, String path, Object arrayValue) { - super("array_contains", getHashValue(hasher, path, arrayValue), Operator.CONTAINS); + super("array_contains", path, getHashValue(hasher, path, arrayValue), Operator.CONTAINS); + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return null; } } @@ -232,12 +296,25 @@ public static class SizeFilter extends MapFilterBase { public SizeFilter(String path, Integer size) { super("array_size", path, Operator.MAP_EQUALS, size); } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return null; + } } /** Filter for document where array matches (data in same order) as the array in request */ public static class ArrayEqualsFilter extends MapFilterBase { + private final List arrayValue; + public ArrayEqualsFilter(DocValueHasher hasher, String path, List arrayData) { super("array_equals", path, Operator.MAP_EQUALS, getHash(hasher, arrayData)); + this.arrayValue = arrayData; + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return DBFilterBase.getJsonNode(nodeFactory, arrayValue); } } @@ -246,8 +323,16 @@ public ArrayEqualsFilter(DocValueHasher hasher, String path, List arrayD * filter sub document */ public static class SubDocEqualsFilter extends MapFilterBase { + private final Map subDocValue; + public SubDocEqualsFilter(DocValueHasher hasher, String path, Map subDocData) { super("sub_doc_equals", path, Operator.MAP_EQUALS, getHash(hasher, subDocData)); + this.subDocValue = subDocData; + } + + @Override + JsonNode asJson(JsonNodeFactory nodeFactory) { + return DBFilterBase.getJsonNode(nodeFactory, subDocValue); } } @@ -264,6 +349,32 @@ private static QueryOuterClass.Value getGrpcValue(Object value) { return Values.of((String) null); } + private static JsonNode getJsonNode(JsonNodeFactory nodeFactory, Object value) { + if (value == null) return nodeFactory.nullNode(); + if (value instanceof DocumentId) { + return ((DocumentId) value).asJson(nodeFactory); + } else if (value instanceof String) { + return nodeFactory.textNode((String) value); + } else if (value instanceof BigDecimal) { + return nodeFactory.numberNode((BigDecimal) value); + } else if (value instanceof Boolean) { + return nodeFactory.booleanNode((Boolean) value); + } else if (value instanceof List) { + List listValues = (List) value; + final ArrayNode arrayNode = nodeFactory.arrayNode(listValues.size()); + listValues.forEach(listValue -> arrayNode.add(getJsonNode(nodeFactory, listValue))); + return arrayNode; + } else if (value instanceof Map) { + Map mapValues = (Map) value; + final ObjectNode objectNode = nodeFactory.objectNode(); + mapValues + .entrySet() + .forEach(kv -> objectNode.put(kv.getKey(), getJsonNode(nodeFactory, kv.getValue()))); + return objectNode; + } + return nodeFactory.nullNode(); + } + /** * @param hasher * @param path Path value is prefixed to the hash value of arrays. diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java index 1a00623932..6e4ade854d 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java @@ -9,6 +9,8 @@ import io.stargate.sgv2.api.common.cql.builder.QueryBuilder; import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; +import io.stargate.sgv2.jsonapi.api.model.command.clause.update.SetOperation; +import io.stargate.sgv2.jsonapi.api.model.command.clause.update.UpdateTargetLocator; import io.stargate.sgv2.jsonapi.exception.ErrorCode; import io.stargate.sgv2.jsonapi.exception.JsonApiException; import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor; @@ -62,14 +64,19 @@ public Uni getDocuments(QueryExecutor queryExecutor, String paging public ReadDocument getNewDocument() { ObjectNode rootNode = objectMapper().createObjectNode(); DocumentId documentId = null; + UpdateTargetLocator targetLocator = new UpdateTargetLocator(); for (DBFilterBase filter : filters) { if (filter instanceof DBFilterBase.IDFilter) { documentId = ((DBFilterBase.IDFilter) filter).value; - JsonNode id = ((DBFilterBase.IDFilter) filter).value.asJson(objectMapper()); - rootNode.putIfAbsent("_id", id); + rootNode.putIfAbsent(filter.getPath(), filter.asJson(objectMapper().getNodeFactory())); + } else { + JsonNode value = filter.asJson(objectMapper().getNodeFactory()); + if (value != null) { + String filterPath = filter.getPath(); + SetOperation.construct(filterPath, value).updateDocument(rootNode, targetLocator); + } } } - ReadDocument doc = new ReadDocument(documentId, null, rootNode); return doc; } diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java index 1bd6e5c23a..664c5b6a6e 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java @@ -455,6 +455,53 @@ public void findByIdUpsert() { .body("data.docs[0]", jsonEquals(expected)); } + @Test + @Order(2) + public void findByIdAndColumnUpsert() { + String json = + """ + { + "updateOne": { + "filter" : {"_id" : "afterDoc7", "username" : "afterName7"}, + "update" : {"$set" : {"active_user": false}}, + "options" : {"upsert" : true} + } + } + """; + + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(json) + .when() + .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) + .then() + .statusCode(200) + .body("status.upsertedId", is("afterDoc7")) + .body("status.matchedCount", is(0)) + .body("status.modifiedCount", is(0)); + + json = + """ + { + "find": { + "filter" : {"_id" : "afterDoc7"} + } + } + """; + String expected = + "{\"_id\":\"afterDoc7\", \"username\" : \"afterName7\", \"active_user\":false}"; + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(json) + .when() + .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) + .then() + .statusCode(200) + .body("data.docs[0]", jsonEquals(expected)); + } + @Test @Order(2) public void findByColumnAndSet() { diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java index cd845731cf..4566485757 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/UpdateManyIntegrationTest.java @@ -320,6 +320,53 @@ public void updateManyByIdNoChange() { .body("data.docs[0]", jsonEquals(expected)); } + @Test + @Order(8) + public void updateManyUpsertAddFilterColumn() { + insert(5); + String json = + """ + { + "updateMany": { + "filter" : {"_id": "doc6", "answer" : 42}, + "update" : {"$set" : {"active_user": false}}, + "options" : {"upsert" : true} + } + } + """; + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(json) + .when() + .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) + .then() + .statusCode(200) + .body("status.upsertedId", is("doc6")) + .body("status.matchedCount", is(0)) + .body("status.modifiedCount", is(0)) + .body("status.moreData", nullValue()); + + String expected = "{\"_id\":\"doc6\", \"answer\" : 42, \"active_user\":false}"; + json = + """ + { + "find": { + "filter" : {"_id" : "doc6"} + } + } + """; + given() + .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) + .contentType(ContentType.JSON) + .body(json) + .when() + .post(CollectionResource.BASE_PATH, keyspaceId.asInternal(), collectionName) + .then() + .statusCode(200) + .body("data.docs[0]", jsonEquals(expected)); + } + @AfterEach public void cleanUpData() { String json = From 85e6f6b2035a03108d663df938dfa83340a0069a Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Tue, 28 Feb 2023 10:47:40 -0500 Subject: [PATCH 02/15] Added condition to check filter is EQ condition --- .../operation/model/impl/DBFilterBase.java | 83 ++++++++++++++++++- .../operation/model/impl/FindOperation.java | 10 ++- 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java index 2d6c1089bf..09d8c1f57f 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java @@ -21,18 +21,37 @@ /** Base for the DB filters / conditions that we want to use with queries */ public abstract class DBFilterBase implements Supplier { + /** Filter condition element path. */ private final String path; protected DBFilterBase(String path) { this.path = path; } + /** + * Get JsonNode for the representing filter condition value. + * + * @param nodeFactory + * @return + */ abstract JsonNode asJson(JsonNodeFactory nodeFactory); + /** + * Returns filter condition element path. + * + * @return + */ protected String getPath() { return path; } + /** + * Returns `true` if the filter condition can be added to upsert row + * + * @return + */ + abstract boolean canAddField(); + /** Filter for the map columns we have in the super shredding table. */ public abstract static class MapFilterBase extends DBFilterBase { @@ -52,7 +71,7 @@ public enum Operator { private final String columnName; private final String key; - private final DBFilterBase.MapFilterBase.Operator operator; + protected final DBFilterBase.MapFilterBase.Operator operator; private final T value; /** @@ -120,6 +139,11 @@ public TextFilter(String path, Operator operator, String value) { JsonNode asJson(JsonNodeFactory nodeFactory) { return nodeFactory.textNode(strValue); } + + @Override + boolean canAddField() { + return Operator.MAP_EQUALS.equals(operator); + } } /** Filters db documents based on a boolean field value */ @@ -135,6 +159,11 @@ public BoolFilter(String path, Operator operator, Boolean value) { JsonNode asJson(JsonNodeFactory nodeFactory) { return nodeFactory.booleanNode(boolValue); } + + @Override + boolean canAddField() { + return Operator.MAP_EQUALS.equals(operator); + } } /** Filters db documents based on a numeric field value */ @@ -150,6 +179,11 @@ public NumberFilter(String path, Operator operator, BigDecimal value) { JsonNode asJson(JsonNodeFactory nodeFactory) { return nodeFactory.numberNode(numberValue); } + + @Override + boolean canAddField() { + return Operator.MAP_EQUALS.equals(operator); + } } /** Filters db documents based on a document id field value */ @@ -197,6 +231,11 @@ public BuiltCondition get() { JsonNode asJson(JsonNodeFactory nodeFactory) { return DBFilterBase.getJsonNode(nodeFactory, value); } + + @Override + boolean canAddField() { + return true; + } } /** * DB filter / condition for testing a set value Note: we can only do CONTAINS until SAI indexes @@ -261,6 +300,11 @@ public IsNullFilter(String path) { JsonNode asJson(JsonNodeFactory nodeFactory) { return DBFilterBase.getJsonNode(nodeFactory, null); } + + @Override + boolean canAddField() { + return true; + } } /** @@ -277,17 +321,30 @@ public ExistsFilter(String path, boolean existFlag) { JsonNode asJson(JsonNodeFactory nodeFactory) { return null; } + + @Override + boolean canAddField() { + return false; + } } /** Filter for document where all values exists for an array */ public static class AllFilter extends SetFilterBase { + private final Object arrayValue; + public AllFilter(DocValueHasher hasher, String path, Object arrayValue) { super("array_contains", path, getHashValue(hasher, path, arrayValue), Operator.CONTAINS); + this.arrayValue = arrayValue; } @Override JsonNode asJson(JsonNodeFactory nodeFactory) { - return null; + return DBFilterBase.getJsonNode(nodeFactory, arrayValue); + } + + @Override + boolean canAddField() { + return false; } } @@ -301,6 +358,11 @@ public SizeFilter(String path, Integer size) { JsonNode asJson(JsonNodeFactory nodeFactory) { return null; } + + @Override + boolean canAddField() { + return false; + } } /** Filter for document where array matches (data in same order) as the array in request */ @@ -316,6 +378,11 @@ public ArrayEqualsFilter(DocValueHasher hasher, String path, List arrayD JsonNode asJson(JsonNodeFactory nodeFactory) { return DBFilterBase.getJsonNode(nodeFactory, arrayValue); } + + @Override + boolean canAddField() { + return true; + } } /** @@ -334,6 +401,11 @@ public SubDocEqualsFilter(DocValueHasher hasher, String path, Map Date: Tue, 28 Feb 2023 11:19:40 -0500 Subject: [PATCH 03/15] Check if the filter condition is eq. --- .../jsonapi/service/operation/model/impl/DBFilterBase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java index 09d8c1f57f..67f0fbdced 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java @@ -142,7 +142,7 @@ JsonNode asJson(JsonNodeFactory nodeFactory) { @Override boolean canAddField() { - return Operator.MAP_EQUALS.equals(operator); + return Operator.EQ.equals(operator); } } @@ -162,7 +162,7 @@ JsonNode asJson(JsonNodeFactory nodeFactory) { @Override boolean canAddField() { - return Operator.MAP_EQUALS.equals(operator); + return Operator.EQ.equals(operator); } } @@ -182,7 +182,7 @@ JsonNode asJson(JsonNodeFactory nodeFactory) { @Override boolean canAddField() { - return Operator.MAP_EQUALS.equals(operator); + return Operator.EQ.equals(operator); } } From cb27e57dc6d777e7aa5fe4a7d726499aeef0d1cb Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Tue, 28 Feb 2023 11:51:35 -0500 Subject: [PATCH 04/15] Return Uni.nullItem instead of nothing --- .../service/operation/model/impl/ReadAndUpdateOperation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java index 184336413a..c312092cd2 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java @@ -142,7 +142,7 @@ private Uni updatedDocument( if (result.getRows(0).getValues(0).getBoolean()) { return Uni.createFrom().item(writableShreddedDocument.id()); } else { - return Uni.createFrom().nothing(); + return Uni.createFrom().nullItem(); } }); } From 2f95a937c3dc03a86afa0ae94843194be95b4359 Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Tue, 28 Feb 2023 15:53:07 -0500 Subject: [PATCH 05/15] Changes based on review comment --- .../jsonapi/service/operation/model/impl/DBFilterBase.java | 5 +++-- .../sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java index 67f0fbdced..f47f488b91 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DBFilterBase.java @@ -8,6 +8,7 @@ import io.stargate.bridge.proto.QueryOuterClass; import io.stargate.sgv2.api.common.cql.builder.BuiltCondition; import io.stargate.sgv2.api.common.cql.builder.Predicate; +import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants; import io.stargate.sgv2.jsonapi.exception.ErrorCode; import io.stargate.sgv2.jsonapi.exception.JsonApiException; import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers; @@ -46,7 +47,7 @@ protected String getPath() { } /** - * Returns `true` if the filter condition can be added to upsert row + * Returns `true` if the filter condition should be added to upsert row * * @return */ @@ -196,7 +197,7 @@ public enum Operator { protected final DocumentId value; public IDFilter(IDFilter.Operator operator, DocumentId value) { - super("_id"); + super(DocumentConstants.Fields.DOC_ID); this.operator = operator; this.value = value; } diff --git a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java index 664c5b6a6e..c835b0fd39 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/api/v1/FindAndUpdateIntegrationTest.java @@ -462,7 +462,7 @@ public void findByIdAndColumnUpsert() { """ { "updateOne": { - "filter" : {"_id" : "afterDoc7", "username" : "afterName7"}, + "filter" : {"_id" : "afterDoc7", "username" : "afterName7", "phone" : null}, "update" : {"$set" : {"active_user": false}}, "options" : {"upsert" : true} } @@ -490,7 +490,7 @@ public void findByIdAndColumnUpsert() { } """; String expected = - "{\"_id\":\"afterDoc7\", \"username\" : \"afterName7\", \"active_user\":false}"; + "{\"_id\":\"afterDoc7\", \"username\" : \"afterName7\", \"phone\" : null, \"active_user\":false}"; given() .header(HttpConstants.AUTHENTICATION_TOKEN_HEADER_NAME, getAuthToken()) .contentType(ContentType.JSON) From 7dc900898690efcf150a15eb116368aa3e390138 Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Thu, 2 Mar 2023 10:55:30 -0500 Subject: [PATCH 06/15] Changes for LWT retry for delete --- .../sgv2/jsonapi/exception/ErrorCode.java | 2 + .../service/bridge/config/DocumentConfig.java | 6 ++ .../operation/model/CountOperation.java | 3 +- .../operation/model/ReadOperation.java | 4 +- .../operation/model/impl/DeleteOperation.java | 80 ++++++++++++++++--- .../operation/model/impl/FindOperation.java | 12 ++- .../model/impl/ReadAndUpdateOperation.java | 2 +- .../model/impl/DeleteManyCommandResolver.java | 5 +- .../model/impl/DeleteOneCommandResolver.java | 9 ++- .../service/shredding/model/DocumentId.java | 2 +- .../model/impl/DeleteOperationTest.java | 22 +++-- .../impl/DeleteManyCommandResolverTest.java | 15 +++- .../impl/DeleteOneCommandResolverTest.java | 14 +++- 13 files changed, 142 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java b/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java index d1389234a1..76311cca63 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/exception/ErrorCode.java @@ -6,6 +6,8 @@ public enum ErrorCode { /** Command error codes. */ COMMAND_NOT_IMPLEMENTED("The provided command is not implemented."), + CONCURRENCY_FAILURE("Unable to complete transaction due to concurrent transactions"), + DOCUMENT_ALREADY_EXISTS("Document already exists with the id"), DOCUMENT_UNPARSEABLE("Unable to parse the document"), diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java index 068e6889df..69b332fab2 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java @@ -64,4 +64,10 @@ public interface DocumentConfig { @Positive @WithDefault("20") int maxDocumentUpdateCount(); + + /** @return Defines the maximum retry for lwt failure 3. */ + @Max(5) + @Positive + @WithDefault("3") + int maxLWTFailureRetry(); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java index 2d99902220..bd04d05252 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java @@ -46,7 +46,8 @@ private QueryOuterClass.Query buildSelectQuery() { } @Override - public Uni getDocuments(QueryExecutor queryExecutor, String pagingState) { + public Uni getDocuments( + QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride) { return Uni.createFrom().failure(new JsonApiException(ErrorCode.UNSUPPORTED_OPERATION)); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java index 4f704a27e4..744a41d4c2 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java @@ -9,6 +9,7 @@ import io.stargate.sgv2.jsonapi.exception.ErrorCode; import io.stargate.sgv2.jsonapi.exception.JsonApiException; import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor; +import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase; import io.stargate.sgv2.jsonapi.service.operation.model.impl.ReadDocument; import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId; import java.util.ArrayList; @@ -118,7 +119,8 @@ default Uni countDocuments( * @param queryExecutor * @return */ - Uni getDocuments(QueryExecutor queryExecutor, String pagingState); + Uni getDocuments( + QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride); /** * A operation method which can return ReadDocument with an empty document, if the filter diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java index 33b377acb7..f7f5be9cee 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java @@ -6,6 +6,8 @@ import io.stargate.bridge.proto.QueryOuterClass; import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; +import io.stargate.sgv2.jsonapi.exception.ErrorCode; +import io.stargate.sgv2.jsonapi.exception.JsonApiException; import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor; import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers; import io.stargate.sgv2.jsonapi.service.operation.model.ModifyOperation; @@ -27,7 +29,8 @@ public record DeleteOperation( * Added parameter to pass number of document to be deleted, this is needed because read * documents limit changed to deleteLimit + 1 */ - int deleteLimit) + int deleteLimit, + int retryLimit) implements ModifyOperation { @Override public Uni> execute(QueryExecutor queryExecutor) { @@ -40,7 +43,7 @@ public Uni> execute(QueryExecutor queryExecutor) { () -> new AtomicReference(null), stateRef -> { Uni docsToDelete = - readOperation().getDocuments(queryExecutor, stateRef.get()); + readOperation().getDocuments(queryExecutor, stateRef.get(), null); return docsToDelete .onItem() .invoke(findResponse -> stateRef.set(findResponse.pagingState())); @@ -68,7 +71,13 @@ public Uni> execute(QueryExecutor queryExecutor) { .concatenate() .onItem() .transformToUniAndConcatenate( - readDocument -> deleteDocument(queryExecutor, delete, readDocument)) + readDocument -> { + AtomicBoolean retry = new AtomicBoolean(false); + return deleteDocument(queryExecutor, delete, readDocument, retry) + .onFailure() + .retry() + .atMost(retryLimit); + }) .collect() .in( AtomicInteger::new, @@ -109,18 +118,65 @@ private QueryOuterClass.Query buildDeleteQuery() { * @param doc * @return Uni `true` if deleted successfully, else `false` */ - private static Uni deleteDocument( - QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc) { - query = bindDeleteQuery(query, doc); - return queryExecutor - .executeWrite(query) + private Uni deleteDocument( + QueryExecutor queryExecutor, + QueryOuterClass.Query query, + ReadDocument doc, + AtomicBoolean retryFlag) { + + final Uni documentToDelete = + Uni.createFrom() + .item(retryFlag.get()) + .onItem() + .transformToUni( + retry -> { + if (retry) { + // Read again if retry flag is `true` + final Uni findResponse = + readOperation() + .getDocuments( + queryExecutor, + null, + new DBFilterBase.IDFilter( + DBFilterBase.IDFilter.Operator.EQ, doc.id())); + return findResponse + .onItem() + .transform( + response -> { + if (response.docs().isEmpty()) { + return response.docs().get(0); + } else { + // If data changed and doesn't satisfy filter conditions + return null; + } + }); + } else { + return Uni.createFrom().item(doc); + } + }); + return documentToDelete .onItem() .transformToUni( - result -> { - if (result.getRows(0).getValues(0).getBoolean()) { - return Uni.createFrom().item(true); - } else { + docToDelete -> { + if (docToDelete == null) { return Uni.createFrom().item(false); + } else { + QueryOuterClass.Query boundQuery = bindDeleteQuery(query, docToDelete); + return queryExecutor + .executeWrite(boundQuery) + .onItem() + .transform( + result -> { + if (result.getRows(0).getValues(0).getBoolean()) { + return true; + } else { + retryFlag.set(true); + throw new JsonApiException( + ErrorCode.CONCURRENCY_FAILURE, + "Delete failed for %s because of concurrent transaction" + .formatted(docToDelete.id().value())); + } + }); } }); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java index bcc6675eac..4035624dac 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java @@ -34,18 +34,19 @@ public record FindOperation( @Override public Uni> execute(QueryExecutor queryExecutor) { - return getDocuments(queryExecutor, pagingState()) + return getDocuments(queryExecutor, pagingState(), null) .onItem() .transform(docs -> new ReadOperationPage(docs.docs(), docs.pagingState())); } @Override - public Uni getDocuments(QueryExecutor queryExecutor, String pagingState) { + public Uni getDocuments( + QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride) { switch (readType) { case DOCUMENT: case KEY: { - QueryOuterClass.Query query = buildSelectQuery(); + QueryOuterClass.Query query = buildSelectQuery(idOverride); return findDocument( queryExecutor, query, @@ -83,11 +84,14 @@ public ReadDocument getNewDocument() { return doc; } - private QueryOuterClass.Query buildSelectQuery() { + private QueryOuterClass.Query buildSelectQuery(DBFilterBase.IDFilter idOverride) { List conditions = new ArrayList<>(filters.size()); for (DBFilterBase filter : filters) { conditions.add(filter.get()); } + if (idOverride != null) { + conditions.add(idOverride.get()); + } return new QueryBuilder() .select() .column(ReadType.DOCUMENT == readType ? documentColumns : documentKeyColumns) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java index c312092cd2..9708116fca 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/ReadAndUpdateOperation.java @@ -54,7 +54,7 @@ public Uni> execute(QueryExecutor queryExecutor) { () -> new AtomicReference(null), stateRef -> { Uni docsToUpdate = - readOperation().getDocuments(queryExecutor, stateRef.get()); + readOperation().getDocuments(queryExecutor, stateRef.get(), null); return docsToUpdate .onItem() .invoke(findResponse -> stateRef.set(findResponse.pagingState())); diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java index 080133be41..43cf8d6fea 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java @@ -33,7 +33,10 @@ public DeleteManyCommandResolver(DocumentConfig documentConfig, ObjectMapper obj public Operation resolveCommand(CommandContext commandContext, DeleteManyCommand command) { ReadOperation readOperation = resolve(commandContext, command); return new DeleteOperation( - commandContext, readOperation, documentConfig.maxDocumentDeleteCount()); + commandContext, + readOperation, + documentConfig.maxDocumentDeleteCount(), + documentConfig.maxLWTFailureRetry()); } @Override diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java index fb5af4fd9d..6b4f4cfc36 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.impl.DeleteOneCommand; +import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig; import io.stargate.sgv2.jsonapi.service.operation.model.Operation; import io.stargate.sgv2.jsonapi.service.operation.model.ReadOperation; import io.stargate.sgv2.jsonapi.service.operation.model.ReadType; @@ -20,15 +21,19 @@ public class DeleteOneCommandResolver extends FilterableResolver implements CommandResolver { + private final DocumentConfig documentConfig; + @Inject - public DeleteOneCommandResolver(ObjectMapper objectMapper) { + public DeleteOneCommandResolver(DocumentConfig documentConfig, ObjectMapper objectMapper) { super(objectMapper); + this.documentConfig = documentConfig; } @Override public Operation resolveCommand(CommandContext commandContext, DeleteOneCommand command) { ReadOperation readOperation = resolve(commandContext, command); - return new DeleteOperation(commandContext, readOperation, 1); + return new DeleteOperation( + commandContext, readOperation, 1, documentConfig.maxLWTFailureRetry()); } @Override diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java index 1b34dbae49..824cc528a7 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java @@ -151,7 +151,7 @@ public JsonNode asJson(JsonNodeFactory nodeFactory) { @Override public String toString() { - return String.valueOf(key); + return key.toPlainString(); } } diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java index 89892139db..054121d016 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java @@ -14,6 +14,7 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus; +import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig; import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor; import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers; import io.stargate.sgv2.jsonapi.service.operation.model.ReadType; @@ -36,6 +37,7 @@ public class DeleteOperationTest extends AbstractValidatingStargateBridgeTest { @Inject QueryExecutor queryExecutor; @Inject ObjectMapper objectMapper; + @Inject DocumentConfig documentConfig; @Nested class DeleteOperationsTest { @@ -92,7 +94,9 @@ public void deleteWithId() throws Exception { ReadType.KEY, objectMapper); - DeleteOperation operation = new DeleteOperation(commandContext, findOperation, 1); + DeleteOperation operation = + new DeleteOperation( + commandContext, findOperation, 1, documentConfig.maxLWTFailureRetry()); final Supplier execute = operation.execute(queryExecutor).subscribeAsCompletionStage().get(); CommandResult result = execute.get(); @@ -140,7 +144,9 @@ public void deleteWithIdNoData() throws Exception { ReadType.KEY, objectMapper); - DeleteOperation operation = new DeleteOperation(commandContext, findOperation, 1); + DeleteOperation operation = + new DeleteOperation( + commandContext, findOperation, 1, documentConfig.maxLWTFailureRetry()); final Supplier execute = operation.execute(queryExecutor).subscribeAsCompletionStage().get(); CommandResult result = execute.get(); @@ -203,7 +209,9 @@ public void deleteWithDynamic() throws Exception { ReadType.KEY, objectMapper); - DeleteOperation operation = new DeleteOperation(commandContext, findOperation, 1); + DeleteOperation operation = + new DeleteOperation( + commandContext, findOperation, 1, documentConfig.maxLWTFailureRetry()); final Supplier execute = operation.execute(queryExecutor).subscribeAsCompletionStage().get(); CommandResult result = execute.get(); @@ -276,7 +284,9 @@ public void deleteManyWithDynamic() throws Exception { ReadType.KEY, objectMapper); - DeleteOperation operation = new DeleteOperation(commandContext, findOperation, 2); + DeleteOperation operation = + new DeleteOperation( + commandContext, findOperation, 2, documentConfig.maxLWTFailureRetry()); final Supplier execute = operation.execute(queryExecutor).subscribeAsCompletionStage().get(); CommandResult result = execute.get(); @@ -331,7 +341,9 @@ public void deleteWithNoResult() throws Exception { 1, ReadType.KEY, objectMapper); - DeleteOperation operation = new DeleteOperation(commandContext, findOperation, 1); + DeleteOperation operation = + new DeleteOperation( + commandContext, findOperation, 1, documentConfig.maxLWTFailureRetry()); final Supplier execute = operation.execute(queryExecutor).subscribeAsCompletionStage().get(); CommandResult result = execute.get(); diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java index cb16249f0a..2745d93650 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java @@ -58,7 +58,10 @@ public void idFilterCondition() throws Exception { objectMapper); DeleteOperation expected = new DeleteOperation( - commandContext, findOperation, documentConfig.maxDocumentDeleteCount()); + commandContext, + findOperation, + documentConfig.maxDocumentDeleteCount(), + documentConfig.maxLWTFailureRetry()); assertThat(operation) .isInstanceOf(DeleteOperation.class) .satisfies( @@ -92,7 +95,10 @@ public void noFilterCondition() throws Exception { objectMapper); DeleteOperation expected = new DeleteOperation( - commandContext, findOperation, documentConfig.maxDocumentDeleteCount()); + commandContext, + findOperation, + documentConfig.maxDocumentDeleteCount(), + documentConfig.maxLWTFailureRetry()); assertThat(operation) .isInstanceOf(DeleteOperation.class) .satisfies( @@ -129,7 +135,10 @@ public void dynamicFilterCondition() throws Exception { objectMapper); DeleteOperation expected = new DeleteOperation( - commandContext, findOperation, documentConfig.maxDocumentDeleteCount()); + commandContext, + findOperation, + documentConfig.maxDocumentDeleteCount(), + documentConfig.maxLWTFailureRetry()); assertThat(operation) .isInstanceOf(DeleteOperation.class) .satisfies( diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java index df320c78de..0f2948f2bf 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java @@ -8,6 +8,7 @@ import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile; import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.impl.DeleteOneCommand; +import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig; import io.stargate.sgv2.jsonapi.service.operation.model.Operation; import io.stargate.sgv2.jsonapi.service.operation.model.ReadType; import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase; @@ -23,6 +24,7 @@ @TestProfile(NoGlobalResourcesTestProfile.Impl.class) public class DeleteOneCommandResolverTest { @Inject ObjectMapper objectMapper; + @Inject DocumentConfig documentConfig; @Inject DeleteOneCommandResolver deleteOneCommandResolver; @Nested @@ -54,7 +56,9 @@ public void idFilterCondition() throws Exception { 1, ReadType.KEY, objectMapper); - DeleteOperation expected = new DeleteOperation(commandContext, findOperation, 1); + DeleteOperation expected = + new DeleteOperation( + commandContext, findOperation, 1, documentConfig.maxLWTFailureRetry()); assertThat(operation) .isInstanceOf(DeleteOperation.class) .satisfies( @@ -79,7 +83,9 @@ public void noFilterCondition() throws Exception { deleteOneCommandResolver.resolveCommand(commandContext, deleteOneCommand); FindOperation findOperation = new FindOperation(commandContext, List.of(), null, 1, 1, ReadType.KEY, objectMapper); - DeleteOperation expected = new DeleteOperation(commandContext, findOperation, 1); + DeleteOperation expected = + new DeleteOperation( + commandContext, findOperation, 1, documentConfig.maxLWTFailureRetry()); assertThat(operation) .isInstanceOf(DeleteOperation.class) .satisfies( @@ -114,7 +120,9 @@ public void dynamicFilterCondition() throws Exception { 1, ReadType.KEY, objectMapper); - DeleteOperation expected = new DeleteOperation(commandContext, findOperation, 1); + DeleteOperation expected = + new DeleteOperation( + commandContext, findOperation, 1, documentConfig.maxLWTFailureRetry()); assertThat(operation) .isInstanceOf(DeleteOperation.class) .satisfies( From 070221ccdaab74a4a3c2dbe07ebf46741a12b8bc Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Thu, 2 Mar 2023 15:42:19 -0500 Subject: [PATCH 07/15] LWT retry logic --- .../operation/model/impl/DeleteOperation.java | 110 ++++--- .../model/impl/DeleteOperationTest.java | 308 ++++++++++++++++++ .../impl/DeleteManyCommandResolverTest.java | 3 + .../impl/DeleteOneCommandResolverTest.java | 5 + 4 files changed, 388 insertions(+), 38 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java index f7f5be9cee..cb5a25ec4e 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java @@ -72,11 +72,32 @@ public Uni> execute(QueryExecutor queryExecutor) { .onItem() .transformToUniAndConcatenate( readDocument -> { - AtomicBoolean retry = new AtomicBoolean(false); - return deleteDocument(queryExecutor, delete, readDocument, retry) - .onFailure() - .retry() - .atMost(retryLimit); + AtomicInteger attempt = new AtomicInteger(0); + return Multi.createBy() + .repeating() + .uni(() -> deleteDocument(queryExecutor, delete, readDocument, attempt)) + .whilst( + respVal -> + (respVal == DeleteResponse.CONCURRENCY_FAILURE + && attempt.incrementAndGet() < retryLimit)) + .collect() + .last() + .onItem() + .transform( + respVal -> { + switch (respVal) { + case DELETED: + return true; + case MODIFIED_BY_CONCURRENT_PROCESS: + return false; + case CONCURRENCY_FAILURE: + default: + throw new JsonApiException( + ErrorCode.CONCURRENCY_FAILURE, + "Delete failed for %s because of concurrent transaction" + .formatted(readDocument.id().toString())); + } + }); }) .collect() .in( @@ -116,40 +137,22 @@ private QueryOuterClass.Query buildDeleteQuery() { * @param queryExecutor * @param query * @param doc - * @return Uni `true` if deleted successfully, else `false` + * @param attempt + * @return Uni */ - private Uni deleteDocument( + private Uni deleteDocument( QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc, - AtomicBoolean retryFlag) { - + AtomicInteger attempt) { final Uni documentToDelete = Uni.createFrom() - .item(retryFlag.get()) + .item(attempt.get()) .onItem() .transformToUni( - retry -> { - if (retry) { - // Read again if retry flag is `true` - final Uni findResponse = - readOperation() - .getDocuments( - queryExecutor, - null, - new DBFilterBase.IDFilter( - DBFilterBase.IDFilter.Operator.EQ, doc.id())); - return findResponse - .onItem() - .transform( - response -> { - if (response.docs().isEmpty()) { - return response.docs().get(0); - } else { - // If data changed and doesn't satisfy filter conditions - return null; - } - }); + attemptValue -> { + if (attemptValue > 0) { + return readDocumentAgain(queryExecutor, doc); } else { return Uni.createFrom().item(doc); } @@ -159,7 +162,7 @@ private Uni deleteDocument( .transformToUni( docToDelete -> { if (docToDelete == null) { - return Uni.createFrom().item(false); + return Uni.createFrom().item(DeleteResponse.MODIFIED_BY_CONCURRENT_PROCESS); } else { QueryOuterClass.Query boundQuery = bindDeleteQuery(query, docToDelete); return queryExecutor @@ -168,19 +171,37 @@ private Uni deleteDocument( .transform( result -> { if (result.getRows(0).getValues(0).getBoolean()) { - return true; + return DeleteResponse.DELETED; } else { - retryFlag.set(true); - throw new JsonApiException( - ErrorCode.CONCURRENCY_FAILURE, - "Delete failed for %s because of concurrent transaction" - .formatted(docToDelete.id().value())); + return DeleteResponse.CONCURRENCY_FAILURE; } }); } }); } + private Uni readDocumentAgain( + QueryExecutor queryExecutor, ReadDocument prevReadDoc) { + // Read again if retry flag is `true` + final Uni findResponse = + readOperation() + .getDocuments( + queryExecutor, + null, + new DBFilterBase.IDFilter(DBFilterBase.IDFilter.Operator.EQ, prevReadDoc.id())); + return findResponse + .onItem() + .transform( + response -> { + if (!response.docs().isEmpty()) { + return response.docs().get(0); + } else { + // If data changed and doesn't satisfy filter conditions + return null; + } + }); + } + private static QueryOuterClass.Query bindDeleteQuery( QueryOuterClass.Query builtQuery, ReadDocument doc) { QueryOuterClass.Values.Builder values = @@ -189,4 +210,17 @@ private static QueryOuterClass.Query bindDeleteQuery( .addValues(Values.of(doc.txnId())); return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build(); } + + public enum DeleteResponse { + /** Successfully deleted a document */ + DELETED, + /** + * Document modified by concurrent process and doesn't match the condition Could have changed + * value or deleted + */ + MODIFIED_BY_CONCURRENT_PROCESS, + + /** Failed because of concurrent process */ + CONCURRENCY_FAILURE; + } } diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java index d63d7a8afb..0c593d9243 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java @@ -15,6 +15,7 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus; +import io.stargate.sgv2.jsonapi.exception.JsonApiException; import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig; import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor; import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers; @@ -243,6 +244,313 @@ public void deleteWithDynamic() { assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 1); } + @Test + public void deleteWithDynamicRetry() { + UUID tx_id1 = UUID.randomUUID(); + UUID tx_id2 = UUID.randomUUID(); + String collectionReadCql = + "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert candidatesAssert = + withQuery( + collectionReadCql, + Values.of("username " + new DocValueHasher().getHash("user1").hash())) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.VARCHAR) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(tx_id1)))); + + String collectionReadCql2 = + "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert candidatesAssert2 = + withQuery( + collectionReadCql2, + Values.of("username " + new DocValueHasher().getHash("user1").hash()), + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1")))) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.VARCHAR) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(tx_id2)))); + + String collectionDeleteCql = + "DELETE FROM \"%s\".\"%s\" WHERE key = ? IF tx_id = ?" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert deleteAssert = + withQuery( + collectionDeleteCql, + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))), + Values.of(tx_id1)) + .returning(List.of(List.of(Values.of(false)))); + ValidatingStargateBridge.QueryAssert deleteAssert2 = + withQuery( + collectionDeleteCql, + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))), + Values.of(tx_id2)) + .returning(List.of(List.of(Values.of(true)))); + + FindOperation findOperation = + new FindOperation( + COMMAND_CONTEXT, + List.of( + new DBFilterBase.TextFilter( + "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")), + null, + 1, + 1, + ReadType.KEY, + objectMapper); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 2); + + Supplier execute = + operation + .execute(queryExecutor) + .subscribe() + .withSubscriber(UniAssertSubscriber.create()) + .awaitItem() + .getItem(); + + // assert query execution + candidatesAssert.assertExecuteCount().isOne(); + candidatesAssert2.assertExecuteCount().isOne(); + deleteAssert.assertExecuteCount().isOne(); + deleteAssert2.assertExecuteCount().isOne(); + // then result + + // then result + CommandResult result = execute.get(); + assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 1); + } + + @Test + public void deleteWithDynamicRetryFailure() { + UUID tx_id1 = UUID.randomUUID(); + UUID tx_id2 = UUID.randomUUID(); + String collectionReadCql = + "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert candidatesAssert = + withQuery( + collectionReadCql, + Values.of("username " + new DocValueHasher().getHash("user1").hash())) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.VARCHAR) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(tx_id1)))); + + String collectionReadCql2 = + "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert candidatesAssert2 = + withQuery( + collectionReadCql2, + Values.of("username " + new DocValueHasher().getHash("user1").hash()), + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1")))) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.VARCHAR) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(tx_id2)))); + + String collectionDeleteCql = + "DELETE FROM \"%s\".\"%s\" WHERE key = ? IF tx_id = ?" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert deleteAssert = + withQuery( + collectionDeleteCql, + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))), + Values.of(tx_id1)) + .returning(List.of(List.of(Values.of(false)))); + ValidatingStargateBridge.QueryAssert deleteAssert2 = + withQuery( + collectionDeleteCql, + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))), + Values.of(tx_id2)) + .returning(List.of(List.of(Values.of(false)))); + + FindOperation findOperation = + new FindOperation( + COMMAND_CONTEXT, + List.of( + new DBFilterBase.TextFilter( + "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")), + null, + 1, + 1, + ReadType.KEY, + objectMapper); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 2); + + final UniAssertSubscriber> supplierUniAssertSubscriber = + operation.execute(queryExecutor).subscribe().withSubscriber(UniAssertSubscriber.create()); + + // assert query execution + candidatesAssert.assertExecuteCount().isOne(); + candidatesAssert2.assertExecuteCount().isOne(); + deleteAssert.assertExecuteCount().isOne(); + deleteAssert2.assertExecuteCount().isOne(); + // then result + + supplierUniAssertSubscriber.assertFailedWith( + JsonApiException.class, + "Delete failed for %s because of concurrent transaction".formatted("doc1")); + } + + @Test + public void deleteWithDynamicRetryConcurrentDelete() { + UUID tx_id1 = UUID.randomUUID(); + String collectionReadCql = + "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert candidatesAssert = + withQuery( + collectionReadCql, + Values.of("username " + new DocValueHasher().getHash("user1").hash())) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.VARCHAR) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(tx_id1)))); + + String collectionReadCql2 = + "SELECT key, tx_id FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert candidatesAssert2 = + withQuery( + collectionReadCql2, + Values.of("username " + new DocValueHasher().getHash("user1").hash()), + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1")))) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.VARCHAR) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build())) + .returning(List.of()); + + String collectionDeleteCql = + "DELETE FROM \"%s\".\"%s\" WHERE key = ? IF tx_id = ?" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + ValidatingStargateBridge.QueryAssert deleteAssert = + withQuery( + collectionDeleteCql, + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))), + Values.of(tx_id1)) + .returning(List.of(List.of(Values.of(false)))); + + FindOperation findOperation = + new FindOperation( + COMMAND_CONTEXT, + List.of( + new DBFilterBase.TextFilter( + "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")), + null, + 1, + 1, + ReadType.KEY, + objectMapper); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 2); + + Supplier execute = + operation + .execute(queryExecutor) + .subscribe() + .withSubscriber(UniAssertSubscriber.create()) + .awaitItem() + .getItem(); + + // assert query execution + candidatesAssert.assertExecuteCount().isOne(); + candidatesAssert2.assertExecuteCount().isOne(); + deleteAssert.assertExecuteCount().isOne(); + // then result + + // then result + CommandResult result = execute.get(); + assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 0); + } + @Test public void deleteManyWithDynamic() { UUID tx_id1 = UUID.randomUUID(); diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java index 569c6eca9a..a73e504bbb 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java @@ -52,6 +52,7 @@ public void idFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -91,6 +92,7 @@ public void noFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -127,6 +129,7 @@ public void dynamicFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java index c0c989622c..2ee691674e 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java @@ -9,6 +9,7 @@ import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile; import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.impl.DeleteOneCommand; +import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig; import io.stargate.sgv2.jsonapi.service.operation.model.Operation; import io.stargate.sgv2.jsonapi.service.operation.model.ReadType; import io.stargate.sgv2.jsonapi.service.operation.model.impl.DBFilterBase; @@ -23,6 +24,7 @@ @TestProfile(NoGlobalResourcesTestProfile.Impl.class) public class DeleteOneCommandResolverTest { @Inject ObjectMapper objectMapper; + @Inject DocumentConfig documentConfig; @Inject DeleteOneCommandResolver resolver; @Nested @@ -50,6 +52,7 @@ public void idFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(1); + assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -88,6 +91,7 @@ public void noFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(1); + assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -123,6 +127,7 @@ public void dynamicFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(1); + assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, From 0547de5e88f8f84ac8cc7d3a1cc46674c22d2edf Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Thu, 2 Mar 2023 15:57:09 -0500 Subject: [PATCH 08/15] Reverted Document Id change --- .../sgv2/jsonapi/service/shredding/model/DocumentId.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java index 824cc528a7..1b34dbae49 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java @@ -151,7 +151,7 @@ public JsonNode asJson(JsonNodeFactory nodeFactory) { @Override public String toString() { - return key.toPlainString(); + return String.valueOf(key); } } From e2de3fb5204cf97ef22b7172e69e32661cb34d17 Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Thu, 2 Mar 2023 18:22:44 -0500 Subject: [PATCH 09/15] Fixed javadoc and error message --- .../operation/model/impl/DeleteOperation.java | 12 ++++++------ .../operation/model/impl/DeleteOperationTest.java | 3 ++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java index cb5a25ec4e..b1df733bd3 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java @@ -88,13 +88,13 @@ public Uni> execute(QueryExecutor queryExecutor) { switch (respVal) { case DELETED: return true; - case MODIFIED_BY_CONCURRENT_PROCESS: + case MODIFIED_BY_CONCURRENT_TRANSACTION: return false; case CONCURRENCY_FAILURE: default: throw new JsonApiException( ErrorCode.CONCURRENCY_FAILURE, - "Delete failed for %s because of concurrent transaction" + "Delete failed for document with id %s because of concurrent transaction" .formatted(readDocument.id().toString())); } }); @@ -162,7 +162,7 @@ private Uni deleteDocument( .transformToUni( docToDelete -> { if (docToDelete == null) { - return Uni.createFrom().item(DeleteResponse.MODIFIED_BY_CONCURRENT_PROCESS); + return Uni.createFrom().item(DeleteResponse.MODIFIED_BY_CONCURRENT_TRANSACTION); } else { QueryOuterClass.Query boundQuery = bindDeleteQuery(query, docToDelete); return queryExecutor @@ -215,10 +215,10 @@ public enum DeleteResponse { /** Successfully deleted a document */ DELETED, /** - * Document modified by concurrent process and doesn't match the condition Could have changed - * value or deleted + * Document modified by concurrent transaction and document doesn't match the condition or + * document is deleted */ - MODIFIED_BY_CONCURRENT_PROCESS, + MODIFIED_BY_CONCURRENT_TRANSACTION, /** Failed because of concurrent process */ CONCURRENCY_FAILURE; diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java index 0c593d9243..c1e8926875 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java @@ -454,7 +454,8 @@ public void deleteWithDynamicRetryFailure() { supplierUniAssertSubscriber.assertFailedWith( JsonApiException.class, - "Delete failed for %s because of concurrent transaction".formatted("doc1")); + "Delete failed for document with id %s because of concurrent transaction" + .formatted("doc1")); } @Test From ad1abb5a16f0a83c437122c8c995cb880bd86ea5 Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Mon, 6 Mar 2023 10:38:25 -0500 Subject: [PATCH 10/15] Changes based on review comment from Ivan --- .../service/bridge/config/DocumentConfig.java | 15 +- .../operation/model/CountOperation.java | 2 +- .../operation/model/ReadOperation.java | 5 +- .../operation/model/impl/DeleteOperation.java | 206 ++++++++---------- .../operation/model/impl/FindOperation.java | 10 +- .../model/impl/DeleteManyCommandResolver.java | 2 +- .../model/impl/DeleteOneCommandResolver.java | 3 +- .../model/impl/DeleteOperationTest.java | 56 +++-- .../impl/DeleteManyCommandResolverTest.java | 6 +- .../impl/DeleteOneCommandResolverTest.java | 6 +- 10 files changed, 156 insertions(+), 155 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java b/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java index 69b332fab2..79a6a16a98 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/bridge/config/DocumentConfig.java @@ -65,9 +65,14 @@ public interface DocumentConfig { @WithDefault("20") int maxDocumentUpdateCount(); - /** @return Defines the maximum retry for lwt failure 3. */ - @Max(5) - @Positive - @WithDefault("3") - int maxLWTFailureRetry(); + /** {@inheritDoc} */ + LwtConfig lwt(); + + interface LwtConfig { + /** @return Defines the maximum retry for lwt failure 3. */ + @Max(5) + @Positive + @WithDefault("3") + int retries(); + } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java index bd04d05252..107d30fdf1 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/CountOperation.java @@ -47,7 +47,7 @@ private QueryOuterClass.Query buildSelectQuery() { @Override public Uni getDocuments( - QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride) { + QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter) { return Uni.createFrom().failure(new JsonApiException(ErrorCode.UNSUPPORTED_OPERATION)); } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java index 744a41d4c2..b9852d7822 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/ReadOperation.java @@ -117,10 +117,13 @@ default Uni countDocuments( * used by other commands which needs a document to be read. * * @param queryExecutor + * @param pagingState + * @param additionalIdFilter Used if a additional id filter need to be added to already available + * filters * @return */ Uni getDocuments( - QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride); + QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter); /** * A operation method which can return ReadDocument with an empty document, if the filter diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java index b1df733bd3..23e1c6cc93 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java @@ -36,79 +36,66 @@ public record DeleteOperation( public Uni> execute(QueryExecutor queryExecutor) { final AtomicBoolean moreData = new AtomicBoolean(false); final QueryOuterClass.Query delete = buildDeleteQuery(); - final Multi findResponses = - Multi.createBy() - .repeating() - .uni( - () -> new AtomicReference(null), - stateRef -> { - Uni docsToDelete = - readOperation().getDocuments(queryExecutor, stateRef.get(), null); - return docsToDelete - .onItem() - .invoke(findResponse -> stateRef.set(findResponse.pagingState())); - }) - .whilst(findResponse -> findResponse.pagingState() != null); AtomicInteger totalCount = new AtomicInteger(0); - final Uni counter = - findResponses - .onItem() - .transformToMulti( - findResponse -> { - final List docs = findResponse.docs(); - // Below conditionality is because we read up to deleteLimit +1 record. - if (totalCount.get() + docs.size() <= deleteLimit) { - totalCount.addAndGet(docs.size()); - return Multi.createFrom().items(docs.stream()); - } else { - int needed = deleteLimit - totalCount.get(); - totalCount.addAndGet(needed); - moreData.set(true); - return Multi.createFrom() - .items(findResponse.docs().subList(0, needed).stream()); - } - }) - .concatenate() - .onItem() - .transformToUniAndConcatenate( - readDocument -> { - AtomicInteger attempt = new AtomicInteger(0); - return Multi.createBy() - .repeating() - .uni(() -> deleteDocument(queryExecutor, delete, readDocument, attempt)) - .whilst( - respVal -> - (respVal == DeleteResponse.CONCURRENCY_FAILURE - && attempt.incrementAndGet() < retryLimit)) - .collect() - .last() - .onItem() - .transform( - respVal -> { - switch (respVal) { - case DELETED: - return true; - case MODIFIED_BY_CONCURRENT_TRANSACTION: - return false; - case CONCURRENCY_FAILURE: - default: - throw new JsonApiException( - ErrorCode.CONCURRENCY_FAILURE, - "Delete failed for document with id %s because of concurrent transaction" - .formatted(readDocument.id().toString())); - } - }); - }) - .collect() - .in( - AtomicInteger::new, - (atomicCounter, flag) -> { - if (flag) { - atomicCounter.incrementAndGet(); - } - }); + // Read the required records to be deleted + return Multi.createBy() + .repeating() + .uni( + () -> new AtomicReference(null), + stateRef -> { + Uni docsToDelete = + readOperation().getDocuments(queryExecutor, stateRef.get(), null); + return docsToDelete + .onItem() + .invoke(findResponse -> stateRef.set(findResponse.pagingState())); + }) - return counter + // Documents read until pagingState available, max records read is deleteLimit + 1 + .whilst(findResponse -> findResponse.pagingState() != null) + + // Get the deleteLimit # of documents to be delete and set moreData flag true if extra + // document is read. + .onItem() + .transformToMulti( + findResponse -> { + final List docs = findResponse.docs(); + // Below conditionality is because we read up to deleteLimit +1 record. + if (totalCount.get() + docs.size() <= deleteLimit) { + totalCount.addAndGet(docs.size()); + return Multi.createFrom().items(docs.stream()); + } else { + int needed = deleteLimit - totalCount.get(); + totalCount.addAndGet(needed); + moreData.set(true); + return Multi.createFrom().items(findResponse.docs().subList(0, needed).stream()); + } + }) + .concatenate() + + // Run delete for selected documents and retry in case of + .onItem() + .transformToUniAndConcatenate( + document -> { + AtomicInteger retryAttempt = new AtomicInteger(0); + return deleteDocument(queryExecutor, delete, document, retryAttempt) + + // Retry `retryLimit` times in case of LWT failure + .onFailure() + .retry() + .until( + error -> + error instanceof JsonApiException && retryAttempt.get() < retryLimit); + }) + .collect() + + // Count the successful deletes + .in( + AtomicInteger::new, + (atomicCounter, flag) -> { + if (flag) { + atomicCounter.incrementAndGet(); + } + }) .onItem() .transform(deletedCounter -> new DeleteOperationPage(deletedCounter.get(), moreData.get())); } @@ -137,32 +124,37 @@ private QueryOuterClass.Query buildDeleteQuery() { * @param queryExecutor * @param query * @param doc - * @param attempt - * @return Uni + * @param retryAttempt + * @return Uni `true` if deleted successfully, else `false` if data changed and no longer + * match the conditions and throws JsonApiException if LWT failure. */ - private Uni deleteDocument( + private Uni deleteDocument( QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc, - AtomicInteger attempt) { - final Uni documentToDelete = - Uni.createFrom() - .item(attempt.get()) - .onItem() - .transformToUni( - attemptValue -> { - if (attemptValue > 0) { - return readDocumentAgain(queryExecutor, doc); - } else { - return Uni.createFrom().item(doc); - } - }); - return documentToDelete + AtomicInteger retryAttempt) + throws JsonApiException { + + return Uni.createFrom() + .item(doc) + // Read again if retryAttempt >`0` + .onItem() + .transformToUni( + document -> { + if (retryAttempt.get() > 0) { + retryAttempt.incrementAndGet(); + return readDocumentAgain(queryExecutor, document); + } else { + retryAttempt.incrementAndGet(); + return Uni.createFrom().item(document); + } + }) .onItem() .transformToUni( docToDelete -> { + // In case document resolved after the retry read if (docToDelete == null) { - return Uni.createFrom().item(DeleteResponse.MODIFIED_BY_CONCURRENT_TRANSACTION); + return Uni.createFrom().item(false); } else { QueryOuterClass.Query boundQuery = bindDeleteQuery(query, docToDelete); return queryExecutor @@ -170,10 +162,17 @@ private Uni deleteDocument( .onItem() .transform( result -> { + // LWT returns `true` for successful transaction, false on failure. if (result.getRows(0).getValues(0).getBoolean()) { - return DeleteResponse.DELETED; + // In case of successful document delete + return true; } else { - return DeleteResponse.CONCURRENCY_FAILURE; + // In case of successful document delete + + throw new JsonApiException( + ErrorCode.CONCURRENCY_FAILURE, + "Delete failed for document with id %s because of concurrent transaction" + .formatted(docToDelete.id().value())); } }); } @@ -183,13 +182,11 @@ private Uni deleteDocument( private Uni readDocumentAgain( QueryExecutor queryExecutor, ReadDocument prevReadDoc) { // Read again if retry flag is `true` - final Uni findResponse = - readOperation() - .getDocuments( - queryExecutor, - null, - new DBFilterBase.IDFilter(DBFilterBase.IDFilter.Operator.EQ, prevReadDoc.id())); - return findResponse + return readOperation() + .getDocuments( + queryExecutor, + null, + new DBFilterBase.IDFilter(DBFilterBase.IDFilter.Operator.EQ, prevReadDoc.id())) .onItem() .transform( response -> { @@ -210,17 +207,4 @@ private static QueryOuterClass.Query bindDeleteQuery( .addValues(Values.of(doc.txnId())); return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build(); } - - public enum DeleteResponse { - /** Successfully deleted a document */ - DELETED, - /** - * Document modified by concurrent transaction and document doesn't match the condition or - * document is deleted - */ - MODIFIED_BY_CONCURRENT_TRANSACTION, - - /** Failed because of concurrent process */ - CONCURRENCY_FAILURE; - } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java index 4035624dac..ef0dcf5e33 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java @@ -41,12 +41,12 @@ public Uni> execute(QueryExecutor queryExecutor) { @Override public Uni getDocuments( - QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter idOverride) { + QueryExecutor queryExecutor, String pagingState, DBFilterBase.IDFilter additionalIdFilter) { switch (readType) { case DOCUMENT: case KEY: { - QueryOuterClass.Query query = buildSelectQuery(idOverride); + QueryOuterClass.Query query = buildSelectQuery(additionalIdFilter); return findDocument( queryExecutor, query, @@ -84,13 +84,13 @@ public ReadDocument getNewDocument() { return doc; } - private QueryOuterClass.Query buildSelectQuery(DBFilterBase.IDFilter idOverride) { + private QueryOuterClass.Query buildSelectQuery(DBFilterBase.IDFilter additionalIdFilter) { List conditions = new ArrayList<>(filters.size()); for (DBFilterBase filter : filters) { conditions.add(filter.get()); } - if (idOverride != null) { - conditions.add(idOverride.get()); + if (additionalIdFilter != null) { + conditions.add(additionalIdFilter.get()); } return new QueryBuilder() .select() diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java index 43cf8d6fea..ab157a1617 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolver.java @@ -36,7 +36,7 @@ public Operation resolveCommand(CommandContext commandContext, DeleteManyCommand commandContext, readOperation, documentConfig.maxDocumentDeleteCount(), - documentConfig.maxLWTFailureRetry()); + documentConfig.lwt().retries()); } @Override diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java index 6b4f4cfc36..ba56b05b1c 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolver.java @@ -32,8 +32,7 @@ public DeleteOneCommandResolver(DocumentConfig documentConfig, ObjectMapper obje @Override public Operation resolveCommand(CommandContext commandContext, DeleteOneCommand command) { ReadOperation readOperation = resolve(commandContext, command); - return new DeleteOperation( - commandContext, readOperation, 1, documentConfig.maxLWTFailureRetry()); + return new DeleteOperation(commandContext, readOperation, 1, documentConfig.lwt().retries()); } @Override diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java index c1e8926875..6512ebdb40 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; +import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import io.stargate.bridge.grpc.TypeSpecs; import io.stargate.bridge.grpc.Values; @@ -16,7 +17,6 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus; import io.stargate.sgv2.jsonapi.exception.JsonApiException; -import io.stargate.sgv2.jsonapi.service.bridge.config.DocumentConfig; import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor; import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers; import io.stargate.sgv2.jsonapi.service.operation.model.ReadType; @@ -24,6 +24,7 @@ import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.inject.Inject; import org.apache.commons.lang3.RandomStringUtils; @@ -40,7 +41,6 @@ public class DeleteOperationTest extends AbstractValidatingStargateBridgeTest { @Inject QueryExecutor queryExecutor; @Inject ObjectMapper objectMapper; - @Inject DocumentConfig documentConfig; @Nested class Execute { @@ -98,9 +98,7 @@ public void deleteWithId() { 1, ReadType.KEY, objectMapper); - DeleteOperation operation = - new DeleteOperation( - COMMAND_CONTEXT, findOperation, 1, documentConfig.maxLWTFailureRetry()); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3); Supplier execute = operation .execute(queryExecutor) @@ -153,9 +151,7 @@ public void deleteWithIdNoData() { ReadType.KEY, objectMapper); - DeleteOperation operation = - new DeleteOperation( - COMMAND_CONTEXT, findOperation, 1, documentConfig.maxLWTFailureRetry()); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3); Supplier execute = operation .execute(queryExecutor) @@ -223,9 +219,7 @@ public void deleteWithDynamic() { 1, ReadType.KEY, objectMapper); - DeleteOperation operation = - new DeleteOperation( - COMMAND_CONTEXT, findOperation, 1, documentConfig.maxLWTFailureRetry()); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3); Supplier execute = operation @@ -616,9 +610,7 @@ public void deleteManyWithDynamic() { 2, ReadType.KEY, objectMapper); - DeleteOperation operation = - new DeleteOperation( - COMMAND_CONTEXT, findOperation, 2, documentConfig.maxLWTFailureRetry()); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2, 3); Supplier execute = operation @@ -701,9 +693,7 @@ public void deleteManyWithDynamicPaging() { 1, ReadType.KEY, objectMapper); - DeleteOperation operation = - new DeleteOperation( - COMMAND_CONTEXT, findOperation, 2, documentConfig.maxLWTFailureRetry()); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2, 3); Supplier execute = operation @@ -792,9 +782,7 @@ public void deleteManyWithDynamicPagingAndMoreData() { 1, ReadType.KEY, objectMapper); - DeleteOperation operation = - new DeleteOperation( - COMMAND_CONTEXT, findOperation, 2, documentConfig.maxLWTFailureRetry()); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 2, 3); Supplier execute = operation @@ -854,9 +842,7 @@ public void deleteWithNoResult() { 1, ReadType.KEY, objectMapper); - DeleteOperation operation = - new DeleteOperation( - COMMAND_CONTEXT, findOperation, 1, documentConfig.maxLWTFailureRetry()); + DeleteOperation operation = new DeleteOperation(COMMAND_CONTEXT, findOperation, 1, 3); Supplier execute = operation @@ -874,6 +860,30 @@ public void deleteWithNoResult() { assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 0); } + @Test + public void testMutiny() { + Uni input = Uni.createFrom().item(new AtomicInteger(0)); + Uni output = + input + .onItem() + .transformToUni( + val -> { + return tryVal(val).onFailure().retry().atMost(3); + }); + + AtomicInteger execute = + output.subscribe().withSubscriber(UniAssertSubscriber.create()).awaitItem().getItem(); + + System.out.println("incoming value : " + execute.get()); + } + + private Uni tryVal(AtomicInteger value) { + System.out.println("incoming value : " + value.get()); + value.incrementAndGet(); + System.out.println("Increment successfully : " + value.get()); + return Uni.createFrom().item(new AtomicInteger(1 / value.get())); + } + @Test public void errorPartial() { // TODO with stargate v2.0.9 diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java index a73e504bbb..28a2afb54b 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteManyCommandResolverTest.java @@ -52,7 +52,7 @@ public void idFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount()); - assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -92,7 +92,7 @@ public void noFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount()); - assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -129,7 +129,7 @@ public void dynamicFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(documentConfig.maxDocumentDeleteCount()); - assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java index 2ee691674e..0c57b29b81 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/DeleteOneCommandResolverTest.java @@ -52,7 +52,7 @@ public void idFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(1); - assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -91,7 +91,7 @@ public void noFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(1); - assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, @@ -127,7 +127,7 @@ public void dynamicFilterCondition() throws Exception { op -> { assertThat(op.commandContext()).isEqualTo(commandContext); assertThat(op.deleteLimit()).isEqualTo(1); - assertThat(op.retryLimit()).isEqualTo(documentConfig.maxLWTFailureRetry()); + assertThat(op.retryLimit()).isEqualTo(documentConfig.lwt().retries()); assertThat(op.readOperation()) .isInstanceOfSatisfying( FindOperation.class, From 8ea172bc8a8ae529f4df7dfa62bb0eb43cb95b08 Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Mon, 6 Mar 2023 12:04:36 -0500 Subject: [PATCH 11/15] Handle id filter already part of filter clause from the request. --- .../operation/model/impl/FindOperation.java | 4 ++- .../model/impl/DeleteOperationTest.java | 26 ------------------- 2 files changed, 3 insertions(+), 27 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java index ef0dcf5e33..b1a01a41fc 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperation.java @@ -87,7 +87,9 @@ public ReadDocument getNewDocument() { private QueryOuterClass.Query buildSelectQuery(DBFilterBase.IDFilter additionalIdFilter) { List conditions = new ArrayList<>(filters.size()); for (DBFilterBase filter : filters) { - conditions.add(filter.get()); + if (additionalIdFilter == null + || (additionalIdFilter != null && !(filter instanceof DBFilterBase.IDFilter))) + conditions.add(filter.get()); } if (additionalIdFilter != null) { conditions.add(additionalIdFilter.get()); diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java index 6512ebdb40..196c2e16b4 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.TestProfile; -import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import io.stargate.bridge.grpc.TypeSpecs; import io.stargate.bridge.grpc.Values; @@ -24,7 +23,6 @@ import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId; import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.inject.Inject; import org.apache.commons.lang3.RandomStringUtils; @@ -860,30 +858,6 @@ public void deleteWithNoResult() { assertThat(result.status()).hasSize(1).containsEntry(CommandStatus.DELETED_COUNT, 0); } - @Test - public void testMutiny() { - Uni input = Uni.createFrom().item(new AtomicInteger(0)); - Uni output = - input - .onItem() - .transformToUni( - val -> { - return tryVal(val).onFailure().retry().atMost(3); - }); - - AtomicInteger execute = - output.subscribe().withSubscriber(UniAssertSubscriber.create()).awaitItem().getItem(); - - System.out.println("incoming value : " + execute.get()); - } - - private Uni tryVal(AtomicInteger value) { - System.out.println("incoming value : " + value.get()); - value.incrementAndGet(); - System.out.println("Increment successfully : " + value.get()); - return Uni.createFrom().item(new AtomicInteger(1 / value.get())); - } - @Test public void errorPartial() { // TODO with stargate v2.0.9 From 39781843c91f6cc6798dea5b3094570b1207bb9a Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Mon, 6 Mar 2023 12:26:53 -0500 Subject: [PATCH 12/15] Added test case to test additional id filter to getDocuments --- .../model/impl/FindOperationTest.java | 198 ++++++++++++++++++ 1 file changed, 198 insertions(+) diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java index 631b59f74f..4ce62e3ce6 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/FindOperationTest.java @@ -15,6 +15,7 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; import io.stargate.sgv2.jsonapi.service.bridge.executor.QueryExecutor; import io.stargate.sgv2.jsonapi.service.bridge.serializer.CustomValueSerializers; +import io.stargate.sgv2.jsonapi.service.operation.model.ReadOperation; import io.stargate.sgv2.jsonapi.service.operation.model.ReadType; import io.stargate.sgv2.jsonapi.service.shredding.model.DocValueHasher; import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId; @@ -693,5 +694,202 @@ public void findWithNoResult() throws Exception { assertThat(result.data().docs()).hasSize(0); }); } + + @Test + public void findWithIdWithIdRetry() throws Exception { + String collectionReadCql = + "SELECT key, tx_id, doc_json FROM \"%s\".\"%s\" WHERE key = ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + String doc1 = + """ + { + "_id": "doc1", + "username": "user1" + } + """; + ValidatingStargateBridge.QueryAssert candidatesAssert = + withQuery( + collectionReadCql, + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1")))) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.tuple(TypeSpecs.TINYINT, TypeSpecs.VARCHAR)) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("doc_json") + .setType(TypeSpecs.VARCHAR) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(UUID.randomUUID()), + Values.of(doc1)))); + FindOperation findOperation = + new FindOperation( + commandContext, + List.of( + new DBFilterBase.IDFilter( + DBFilterBase.IDFilter.Operator.EQ, DocumentId.fromString("doc1"))), + null, + 1, + 1, + ReadType.DOCUMENT, + objectMapper); + final ReadOperation.FindResponse result = + findOperation + .getDocuments( + queryExecutor, + null, + new DBFilterBase.IDFilter( + DBFilterBase.IDFilter.Operator.EQ, DocumentId.fromString("doc1"))) + .subscribeAsCompletionStage() + .get(); + assertThat(result) + .satisfies( + commandResult -> { + assertThat(result.docs()).isNotNull(); + assertThat(result.docs()).hasSize(1); + }); + } + + @Test + public void findWithDynamicGetDocument() throws Exception { + String collectionReadCql = + "SELECT key, tx_id, doc_json FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + String doc1 = + """ + { + "_id": "doc1", + "username": "user1" + } + """; + ValidatingStargateBridge.QueryAssert candidatesAssert = + withQuery( + collectionReadCql, + Values.of("username " + new DocValueHasher().getHash("user1").hash())) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.tuple(TypeSpecs.TINYINT, TypeSpecs.VARCHAR)) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("doc_json") + .setType(TypeSpecs.VARCHAR) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(UUID.randomUUID()), + Values.of(doc1)))); + FindOperation findOperation = + new FindOperation( + commandContext, + List.of( + new DBFilterBase.TextFilter( + "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")), + null, + 1, + 1, + ReadType.DOCUMENT, + objectMapper); + final ReadOperation.FindResponse result = + findOperation.getDocuments(queryExecutor, null, null).subscribeAsCompletionStage().get(); + assertThat(result) + .satisfies( + commandResult -> { + assertThat(result.docs()).isNotNull(); + assertThat(result.docs()).hasSize(1); + }); + } + + @Test + public void findWithDynamicWithIdRetry() throws Exception { + String collectionReadCql = + "SELECT key, tx_id, doc_json FROM \"%s\".\"%s\" WHERE array_contains CONTAINS ? AND key = ? LIMIT 1" + .formatted(KEYSPACE_NAME, COLLECTION_NAME); + String doc1 = + """ + { + "_id": "doc1", + "username": "user1" + } + """; + ValidatingStargateBridge.QueryAssert candidatesAssert = + withQuery( + collectionReadCql, + Values.of("username " + new DocValueHasher().getHash("user1").hash()), + Values.of( + CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1")))) + .withPageSize(1) + .withColumnSpec( + List.of( + QueryOuterClass.ColumnSpec.newBuilder() + .setName("key") + .setType(TypeSpecs.tuple(TypeSpecs.TINYINT, TypeSpecs.VARCHAR)) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("tx_id") + .setType(TypeSpecs.UUID) + .build(), + QueryOuterClass.ColumnSpec.newBuilder() + .setName("doc_json") + .setType(TypeSpecs.VARCHAR) + .build())) + .returning( + List.of( + List.of( + Values.of( + CustomValueSerializers.getDocumentIdValue( + DocumentId.fromString("doc1"))), + Values.of(UUID.randomUUID()), + Values.of(doc1)))); + FindOperation findOperation = + new FindOperation( + commandContext, + List.of( + new DBFilterBase.TextFilter( + "username", DBFilterBase.MapFilterBase.Operator.EQ, "user1")), + null, + 1, + 1, + ReadType.DOCUMENT, + objectMapper); + final ReadOperation.FindResponse result = + findOperation + .getDocuments( + queryExecutor, + null, + new DBFilterBase.IDFilter( + DBFilterBase.IDFilter.Operator.EQ, DocumentId.fromString("doc1"))) + .subscribeAsCompletionStage() + .get(); + assertThat(result) + .satisfies( + commandResult -> { + assertThat(result.docs()).isNotNull(); + assertThat(result.docs()).hasSize(1); + }); + } } } From 85129f7094566159fb0faa5e2f37bea87150391e Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Tue, 7 Mar 2023 10:11:43 -0500 Subject: [PATCH 13/15] Inherited Exception class so retry is done only for LWT failures. --- .../operation/model/impl/DeleteOperation.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java index 23e1c6cc93..cab17250be 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java @@ -80,11 +80,9 @@ public Uni> execute(QueryExecutor queryExecutor) { return deleteDocument(queryExecutor, delete, document, retryAttempt) // Retry `retryLimit` times in case of LWT failure - .onFailure() + .onFailure(LWTException.class) .retry() - .until( - error -> - error instanceof JsonApiException && retryAttempt.get() < retryLimit); + .until(error -> error instanceof LWTException && retryAttempt.get() < retryLimit); }) .collect() @@ -169,7 +167,7 @@ private Uni deleteDocument( } else { // In case of successful document delete - throw new JsonApiException( + throw new LWTException( ErrorCode.CONCURRENCY_FAILURE, "Delete failed for document with id %s because of concurrent transaction" .formatted(docToDelete.id().value())); @@ -207,4 +205,11 @@ private static QueryOuterClass.Query bindDeleteQuery( .addValues(Values.of(doc.txnId())); return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build(); } + + /** Inherited Exception class to handle retry */ + private class LWTException extends JsonApiException { + public LWTException(ErrorCode errorCode, String message) { + super(errorCode, message); + } + } } From bbf6016c5840421397020c2981ba02ea2e8b2941 Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Tue, 7 Mar 2023 11:39:34 -0500 Subject: [PATCH 14/15] Changed the retry logic to recoverWithUni --- .../operation/model/impl/DeleteOperation.java | 56 +++++++++---------- .../model/impl/DeleteOperationTest.java | 4 +- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java index cab17250be..c254c85eef 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java @@ -37,6 +37,7 @@ public Uni> execute(QueryExecutor queryExecutor) { final AtomicBoolean moreData = new AtomicBoolean(false); final QueryOuterClass.Query delete = buildDeleteQuery(); AtomicInteger totalCount = new AtomicInteger(0); + final int retryAttempt = retryLimit - 2; // Read the required records to be deleted return Multi.createBy() .repeating() @@ -74,15 +75,31 @@ public Uni> execute(QueryExecutor queryExecutor) { // Run delete for selected documents and retry in case of .onItem() - .transformToUniAndConcatenate( + .transformToUniAndMerge( document -> { - AtomicInteger retryAttempt = new AtomicInteger(0); - return deleteDocument(queryExecutor, delete, document, retryAttempt) - + return deleteDocument(queryExecutor, delete, document) // Retry `retryLimit` times in case of LWT failure .onFailure(LWTException.class) - .retry() - .until(error -> error instanceof LWTException && retryAttempt.get() < retryLimit); + .recoverWithUni( + () -> { + return Uni.createFrom() + .nullItem() + .flatMap( + nullData -> { + return readDocumentAgain(queryExecutor, document) + .onItem() + // Try deleting the document + .transformToUni( + reReadDocument -> + deleteDocument( + queryExecutor, delete, reReadDocument)); + }) + .onFailure(LWTException.class) + .retry() + // because it's already run twice before this + // check. + .atMost(retryLimit - 1); + }); }) .collect() @@ -122,39 +139,22 @@ private QueryOuterClass.Query buildDeleteQuery() { * @param queryExecutor * @param query * @param doc - * @param retryAttempt * @return Uni `true` if deleted successfully, else `false` if data changed and no longer * match the conditions and throws JsonApiException if LWT failure. */ private Uni deleteDocument( - QueryExecutor queryExecutor, - QueryOuterClass.Query query, - ReadDocument doc, - AtomicInteger retryAttempt) + QueryExecutor queryExecutor, QueryOuterClass.Query query, ReadDocument doc) throws JsonApiException { - return Uni.createFrom() .item(doc) // Read again if retryAttempt >`0` .onItem() .transformToUni( document -> { - if (retryAttempt.get() > 0) { - retryAttempt.incrementAndGet(); - return readDocumentAgain(queryExecutor, document); - } else { - retryAttempt.incrementAndGet(); - return Uni.createFrom().item(document); - } - }) - .onItem() - .transformToUni( - docToDelete -> { - // In case document resolved after the retry read - if (docToDelete == null) { + if (document == null) { return Uni.createFrom().item(false); } else { - QueryOuterClass.Query boundQuery = bindDeleteQuery(query, docToDelete); + QueryOuterClass.Query boundQuery = bindDeleteQuery(query, document); return queryExecutor .executeWrite(boundQuery) .onItem() @@ -170,14 +170,14 @@ private Uni deleteDocument( throw new LWTException( ErrorCode.CONCURRENCY_FAILURE, "Delete failed for document with id %s because of concurrent transaction" - .formatted(docToDelete.id().value())); + .formatted(document.id().value())); } }); } }); } - private Uni readDocumentAgain( + private Uni readDocumentAgain( QueryExecutor queryExecutor, ReadDocument prevReadDoc) { // Read again if retry flag is `true` return readOperation() diff --git a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java index 196c2e16b4..34a590608d 100644 --- a/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java +++ b/src/test/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperationTest.java @@ -439,9 +439,9 @@ public void deleteWithDynamicRetryFailure() { // assert query execution candidatesAssert.assertExecuteCount().isOne(); - candidatesAssert2.assertExecuteCount().isOne(); + candidatesAssert2.assertExecuteCount().isEqualTo(2); deleteAssert.assertExecuteCount().isOne(); - deleteAssert2.assertExecuteCount().isOne(); + deleteAssert2.assertExecuteCount().isEqualTo(2); // then result supplierUniAssertSubscriber.assertFailedWith( From a88d709b97db51722ed65fe64988f013e93b0b7e Mon Sep 17 00:00:00 2001 From: maheshrajamani <99678631+maheshrajamani@users.noreply.github.com> Date: Tue, 7 Mar 2023 11:41:51 -0500 Subject: [PATCH 15/15] Changed the event from null item to prevDocument --- .../service/operation/model/impl/DeleteOperation.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java index c254c85eef..cf16e317bc 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/impl/DeleteOperation.java @@ -83,10 +83,10 @@ public Uni> execute(QueryExecutor queryExecutor) { .recoverWithUni( () -> { return Uni.createFrom() - .nullItem() + .item(document) .flatMap( - nullData -> { - return readDocumentAgain(queryExecutor, document) + prevDoc -> { + return readDocumentAgain(queryExecutor, prevDoc) .onItem() // Try deleting the document .transformToUni(