Skip to content

Commit

Permalink
Added serial consistency for writer operations (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshrajamani authored Apr 18, 2023
1 parent bc9a8f7 commit e4811dc
Show file tree
Hide file tree
Showing 6 changed files with 452 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,13 @@ public Uni<QueryOuterClass.ResultSet> executeWrite(QueryOuterClass.Query query)
QueryOuterClass.Consistency consistency = queriesConfig.consistency().writes();
QueryOuterClass.ConsistencyValue.Builder consistencyValue =
QueryOuterClass.ConsistencyValue.newBuilder().setValue(consistency);
QueryOuterClass.Consistency serialConsistency = queriesConfig.serialConsistency();
QueryOuterClass.ConsistencyValue.Builder serialConsistencyValue =
QueryOuterClass.ConsistencyValue.newBuilder().setValue(serialConsistency);
QueryOuterClass.QueryParameters.Builder params =
QueryOuterClass.QueryParameters.newBuilder().setConsistency(consistencyValue);
QueryOuterClass.QueryParameters.newBuilder()
.setConsistency(consistencyValue)
.setSerialConsistency(serialConsistencyValue);
return queryBridge(
QueryOuterClass.Query.newBuilder(query).setParameters(params).buildPartial());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.stargate.bridge.grpc.TypeSpecs;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.api.common.config.QueriesConfig;
import io.stargate.sgv2.common.bridge.AbstractValidatingStargateBridgeTest;
import io.stargate.sgv2.common.bridge.ValidatingStargateBridge;
import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile;
Expand Down Expand Up @@ -40,6 +41,7 @@ public class DeleteOperationTest extends AbstractValidatingStargateBridgeTest {

@Inject QueryExecutor queryExecutor;
@Inject ObjectMapper objectMapper;
@Inject QueriesConfig queriesConfig;

@Nested
class Execute {
Expand Down Expand Up @@ -84,6 +86,7 @@ public void deleteWithId() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -160,6 +163,7 @@ public void deleteOneAndReturnById() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -271,6 +275,7 @@ public void deleteOneAndReturnWithSort() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -381,6 +386,7 @@ public void deleteOneAndReturnWithSortDesc() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc2"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -513,6 +519,7 @@ public void deleteWithDynamic() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -616,13 +623,15 @@ public void deleteWithDynamicRetry() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));
ValidatingStargateBridge.QueryAssert deleteAssert2 =
withQuery(
collectionDeleteCql,
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -728,13 +737,15 @@ public void deleteWithDynamicRetryFailure() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));
ValidatingStargateBridge.QueryAssert deleteAssert2 =
withQuery(
collectionDeleteCql,
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

FindOperation findOperation =
Expand Down Expand Up @@ -842,6 +853,7 @@ public void deleteWithDynamicRetryConcurrentDelete() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

FindOperation findOperation =
Expand Down Expand Up @@ -924,13 +936,15 @@ public void deleteManyWithDynamic() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));
ValidatingStargateBridge.QueryAssert deleteSecondAssert =
withQuery(
collectionDeleteCql,
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc2"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -1011,13 +1025,15 @@ public void deleteManyWithDynamicPaging() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));
ValidatingStargateBridge.QueryAssert deleteSecondAssert =
withQuery(
collectionDeleteCql,
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc2"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -1104,13 +1120,15 @@ public void deleteManyWithDynamicPagingAndMoreData() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));
ValidatingStargateBridge.QueryAssert deleteSecondAssert =
withQuery(
collectionDeleteCql,
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc2"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

FindOperation findOperation =
Expand Down Expand Up @@ -1283,6 +1301,7 @@ public void errorPartial() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

ValidatingStargateBridge.QueryAssert deleteDoc2Assert =
Expand All @@ -1291,6 +1310,7 @@ public void errorPartial() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc2"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

ValidatingStargateBridge.QueryAssert deleteDoc1RetryAssert =
Expand All @@ -1299,6 +1319,7 @@ public void errorPartial() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id3))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

FindOperation findOperation =
Expand Down Expand Up @@ -1450,6 +1471,7 @@ public void errorAll() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id1))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

ValidatingStargateBridge.QueryAssert deleteDoc2Assert =
Expand All @@ -1458,6 +1480,7 @@ public void errorAll() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc2"))),
Values.of(tx_id2))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

ValidatingStargateBridge.QueryAssert deleteDoc1RetryAssert =
Expand All @@ -1466,13 +1489,15 @@ public void errorAll() {
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc1"))),
Values.of(tx_id3))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));
ValidatingStargateBridge.QueryAssert deleteDoc2RetryAssert =
withQuery(
collectionDeleteCql,
Values.of(
CustomValueSerializers.getDocumentIdValue(DocumentId.fromString("doc2"))),
Values.of(tx_id4))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

FindOperation findOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.stargate.bridge.grpc.TypeSpecs;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.api.common.config.QueriesConfig;
import io.stargate.sgv2.common.bridge.AbstractValidatingStargateBridgeTest;
import io.stargate.sgv2.common.bridge.ValidatingStargateBridge;
import io.stargate.sgv2.common.testprofiles.NoGlobalResourcesTestProfile;
Expand Down Expand Up @@ -39,6 +40,7 @@ public class InsertOperationTest extends AbstractValidatingStargateBridgeTest {
@Inject Shredder shredder;
@Inject ObjectMapper objectMapper;
@Inject QueryExecutor queryExecutor;
@Inject QueriesConfig queriesConfig;

@Nested
class Execute {
Expand Down Expand Up @@ -93,6 +95,7 @@ public void insertOne() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

InsertOperation operation = new InsertOperation(COMMAND_CONTEXT, shredDocument);
Expand Down Expand Up @@ -159,6 +162,7 @@ public void insertDuplicate() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(false))));

InsertOperation operation = new InsertOperation(COMMAND_CONTEXT, shredDocument);
Expand Down Expand Up @@ -250,6 +254,7 @@ public void insertManyOrdered() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));
ValidatingStargateBridge.QueryAssert insertSecondAssert =
withQuery(
Expand Down Expand Up @@ -278,6 +283,7 @@ public void insertManyOrdered() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

InsertOperation operation =
Expand Down Expand Up @@ -365,6 +371,7 @@ public void insertManyUnordered() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));
ValidatingStargateBridge.QueryAssert insertSecondAssert =
withQuery(
Expand Down Expand Up @@ -393,6 +400,7 @@ public void insertManyUnordered() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

InsertOperation operation =
Expand Down Expand Up @@ -483,6 +491,7 @@ public void failureOrdered() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returningFailure(new RuntimeException("Ivan breaks the test."));

InsertOperation operation =
Expand Down Expand Up @@ -574,6 +583,7 @@ public void failureOrderedLastFails() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));
ValidatingStargateBridge.QueryAssert insertSecondAssert =
withQuery(
Expand Down Expand Up @@ -602,6 +612,7 @@ public void failureOrderedLastFails() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returningFailure(new RuntimeException("Ivan really breaks the test."));

InsertOperation operation =
Expand Down Expand Up @@ -697,6 +708,7 @@ public void failureUnorderedPartial() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returningFailure(new RuntimeException("Ivan breaks the test."));
ValidatingStargateBridge.QueryAssert insertSecondAssert =
withQuery(
Expand Down Expand Up @@ -725,6 +737,7 @@ public void failureUnorderedPartial() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returning(List.of(List.of(Values.of(true))));

InsertOperation operation =
Expand Down Expand Up @@ -819,6 +832,7 @@ public void failureUnorderedAll() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returningFailure(new RuntimeException("Ivan breaks the test."));
ValidatingStargateBridge.QueryAssert insertSecondAssert =
withQuery(
Expand Down Expand Up @@ -847,6 +861,7 @@ public void failureUnorderedAll() throws Exception {
.setName("applied")
.setType(TypeSpecs.BOOLEAN)
.build()))
.withSerialConsistency(queriesConfig.serialConsistency())
.returningFailure(new RuntimeException("Ivan really breaks the test."));

InsertOperation operation =
Expand Down
Loading

0 comments on commit e4811dc

Please sign in to comment.