Skip to content

Commit

Permalink
Handle DOCUMENT_ALREADY_EXISTS error on insert (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshrajamani authored Jan 16, 2024
1 parent 8bcf303 commit 0e54228
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -140,6 +141,10 @@ private static Uni<DocumentId> insertDocument(
if (result.wasApplied()) {
return Uni.createFrom().item(doc.id());
} else {
final UUID txId = result.one().getUuid("tx_id");
if (doc.nextTxID().equals(txId)) {
return Uni.createFrom().item(doc.id());
}
Exception failure = new JsonApiException(ErrorCode.DOCUMENT_ALREADY_EXISTS);
return Uni.createFrom().failure(failure);
}
Expand All @@ -153,15 +158,15 @@ private String buildInsertQuery(boolean vectorEnabled) {
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values, query_vector_value)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
return String.format(
insertWithVector, commandContext.namespace(), commandContext.collection());
} else {
String insert =
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
return String.format(insert, commandContext.namespace(), commandContext.collection());
}
}
Expand All @@ -174,6 +179,7 @@ private static SimpleStatement bindInsertValues(
return SimpleStatement.newInstance(
query,
CQLBindValues.getDocumentIdValue(doc.id()),
doc.nextTxID(),
doc.docJson(),
CQLBindValues.getSetValue(doc.existKeys()),
CQLBindValues.getIntegerMapValues(doc.arraySize()),
Expand All @@ -188,6 +194,7 @@ private static SimpleStatement bindInsertValues(
return SimpleStatement.newInstance(
query,
CQLBindValues.getDocumentIdValue(doc.id()),
doc.nextTxID(),
doc.docJson(),
CQLBindValues.getSetValue(doc.existKeys()),
CQLBindValues.getIntegerMapValues(doc.arraySize()),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.stargate.sgv2.jsonapi.service.shredding.model;

import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down Expand Up @@ -42,7 +43,8 @@ public record WritableShreddedDocument(
Map<JsonPath, String> queryTextValues,
Map<JsonPath, Date> queryTimestampValues,
Set<JsonPath> queryNullValues,
float[] queryVectorValues) {
float[] queryVectorValues,
UUID nextTxID) {

@Override
public boolean equals(Object o) {
Expand Down Expand Up @@ -146,7 +148,8 @@ public WritableShreddedDocument build() {
_nonNull(queryTextValues),
_nonNull(queryTimestampValues),
_nonNull(queryNullValues),
queryVectorValues);
queryVectorValues,
Uuids.timeBased());
}

private <T> Map<JsonPath, T> _nonNull(Map<JsonPath, T> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
Expand Down Expand Up @@ -64,20 +65,23 @@ public class InsertOperationTest extends OperationTestBase {
private final ColumnDefinitions COLUMNS_APPLIED =
buildColumnDefs(TestColumn.ofBoolean("[applied]"));

private final ColumnDefinitions COLUMNS_APPLIED_FAILURE =
buildColumnDefs(TestColumn.ofBoolean("[applied]"), TestColumn.ofUuid("tx_id"));

@Inject Shredder shredder;
@Inject ObjectMapper objectMapper;

static final String INSERT_CQL =
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";

static final String INSERT_VECTOR_CQL =
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values, query_vector_value)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";

@Nested
class InsertNonVector {
Expand Down Expand Up @@ -152,8 +156,9 @@ public void insertDuplicate() throws Exception {

SimpleStatement insertStmt = nonVectorInsertStatement(shredDocument);
// Note: FALSE is needed to "fail" insertion, producing failure message
List<Row> rows = Arrays.asList(resultRow(COLUMNS_APPLIED, 0, Boolean.FALSE));
AsyncResultSet results = new MockAsyncResultSet(COLUMNS_APPLIED, rows, null);
List<Row> rows =
Arrays.asList(resultRow(COLUMNS_APPLIED_FAILURE, 0, Boolean.FALSE, UUID.randomUUID()));
AsyncResultSet results = new MockAsyncResultSet(COLUMNS_APPLIED_FAILURE, rows, null);
final AtomicInteger callCount = new AtomicInteger();
QueryExecutor queryExecutor = mock(QueryExecutor.class);

Expand Down Expand Up @@ -268,6 +273,59 @@ public void insertManyOrdered() throws Exception {
assertThat(result.errors()).isNull();
}

@Test
public void insertOneRetryLWTCheck() throws Exception {
String document =
"""
{
"_id": "doc1",
"text": "user1",
"number" : 10,
"boolean": true,
"nullval" : null,
"array" : ["a", "b"],
"sub_doc" : {"col": "val"},
"date_val" : {"$date": 1672531200000 }
}
""";

JsonNode jsonNode = objectMapper.readTree(document);
WritableShreddedDocument shredDocument = shredder.shred(jsonNode);

SimpleStatement insertStmt = nonVectorInsertStatement(shredDocument);
List<Row> rows =
Arrays.asList(
resultRow(COLUMNS_APPLIED_FAILURE, 0, Boolean.FALSE, shredDocument.nextTxID()));
AsyncResultSet results = new MockAsyncResultSet(COLUMNS_APPLIED_FAILURE, rows, null);
final AtomicInteger callCount = new AtomicInteger();
QueryExecutor queryExecutor = mock(QueryExecutor.class);

when(queryExecutor.executeWrite(eq(insertStmt)))
.then(
invocation -> {
callCount.incrementAndGet();
return Uni.createFrom().item(results);
});

Supplier<CommandResult> execute =
new InsertOperation(COMMAND_CONTEXT_NON_VECTOR, shredDocument)
.execute(queryExecutor)
.subscribe()
.withSubscriber(UniAssertSubscriber.create())
.awaitItem()
.getItem();

// assert query execution
assertThat(callCount.get()).isEqualTo(1);

// then result
CommandResult result = execute.get();
assertThat(result.status())
.hasSize(1)
.containsEntry(CommandStatus.INSERTED_IDS, List.of(new DocumentId.StringId("doc1")));
assertThat(result.errors()).isNull();
}

@Test
public void insertManyUnordered() throws Exception {
String document1 =
Expand Down Expand Up @@ -819,6 +877,7 @@ private SimpleStatement nonVectorInsertStatement(WritableShreddedDocument shredD
return SimpleStatement.newInstance(
insertCql,
CQLBindValues.getDocumentIdValue(shredDocument.id()),
shredDocument.nextTxID(),
shredDocument.docJson(),
CQLBindValues.getSetValue(shredDocument.existKeys()),
CQLBindValues.getIntegerMapValues(shredDocument.arraySize()),
Expand All @@ -835,6 +894,7 @@ private SimpleStatement vectorInsertStatement(WritableShreddedDocument shredDocu
return SimpleStatement.newInstance(
insertCql,
CQLBindValues.getDocumentIdValue(shredDocument.id()),
shredDocument.nextTxID(),
shredDocument.docJson(),
CQLBindValues.getSetValue(shredDocument.existKeys()),
CQLBindValues.getIntegerMapValues(shredDocument.arraySize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class Insert {
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";

@Test
public void insert() throws Exception {
Expand All @@ -178,6 +178,7 @@ public void insert() throws Exception {
SimpleStatement.newInstance(
INSERT_CQL.formatted(KEYSPACE_NAME, COLLECTION_NAME),
CQLBindValues.getDocumentIdValue(shredDocument.id()),
shredDocument.nextTxID(),
shredDocument.docJson(),
CQLBindValues.getSetValue(shredDocument.existKeys()),
CQLBindValues.getIntegerMapValues(shredDocument.arraySize()),
Expand Down

0 comments on commit 0e54228

Please sign in to comment.