Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle DOCUMENT_ALREADY_EXISTS error on insert #802

Merged
merged 7 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -140,6 +141,10 @@ private static Uni<DocumentId> insertDocument(
if (result.wasApplied()) {
return Uni.createFrom().item(doc.id());
} else {
final UUID txId = result.one().getUuid("tx_id");
if (doc.txID().equals(txId)) {
return Uni.createFrom().item(doc.id());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three retry will have the same txId, is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, retries are done by the driver with the same bind values

Exception failure = new JsonApiException(ErrorCode.DOCUMENT_ALREADY_EXISTS);
return Uni.createFrom().failure(failure);
}
Expand All @@ -153,15 +158,15 @@ private String buildInsertQuery(boolean vectorEnabled) {
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values, query_vector_value)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
return String.format(
insertWithVector, commandContext.namespace(), commandContext.collection());
} else {
String insert =
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
return String.format(insert, commandContext.namespace(), commandContext.collection());
}
}
Expand All @@ -174,6 +179,7 @@ private static SimpleStatement bindInsertValues(
return SimpleStatement.newInstance(
query,
CQLBindValues.getDocumentIdValue(doc.id()),
doc.txID(),
doc.docJson(),
CQLBindValues.getSetValue(doc.existKeys()),
CQLBindValues.getIntegerMapValues(doc.arraySize()),
Expand All @@ -188,6 +194,7 @@ private static SimpleStatement bindInsertValues(
return SimpleStatement.newInstance(
query,
CQLBindValues.getDocumentIdValue(doc.id()),
doc.txID(),
doc.docJson(),
CQLBindValues.getSetValue(doc.existKeys()),
CQLBindValues.getIntegerMapValues(doc.arraySize()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static class Builder implements ShredListener {
private final DocValueHasher hasher;

private final DocumentId id;
private final UUID txID;
private UUID txID;

private final String docJson;
private final JsonNode docJsonNode;
Expand Down Expand Up @@ -133,6 +133,7 @@ public Builder(DocumentId id, UUID txID, String docJson, JsonNode docJsonNode) {
* @return WritableShreddedDocument built from collected information
*/
public WritableShreddedDocument build() {
if (txID == null) txID = UUID.randomUUID();
return new WritableShreddedDocument(
id,
txID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,23 @@ public class InsertOperationTest extends OperationTestBase {
private final ColumnDefinitions COLUMNS_APPLIED =
buildColumnDefs(TestColumn.ofBoolean("[applied]"));

private final ColumnDefinitions COLUMNS_APPLIED_FAILURE =
buildColumnDefs(TestColumn.ofBoolean("[applied]"), TestColumn.ofUuid("tx_id"));

@Inject Shredder shredder;
@Inject ObjectMapper objectMapper;

static final String INSERT_CQL =
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";

static final String INSERT_VECTOR_CQL =
"INSERT INTO \"%s\".\"%s\""
+ " (key, tx_id, doc_json, exist_keys, array_size, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values, query_timestamp_values, query_vector_value)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";

@Nested
class InsertNonVector {
Expand Down Expand Up @@ -268,6 +271,58 @@ public void insertManyOrdered() throws Exception {
assertThat(result.errors()).isNull();
}

@Test
public void insertOneRetryLWTCheck() throws Exception {
String document =
"""
{
"_id": "doc1",
"text": "user1",
"number" : 10,
"boolean": true,
"nullval" : null,
"array" : ["a", "b"],
"sub_doc" : {"col": "val"},
"date_val" : {"$date": 1672531200000 }
}
""";

JsonNode jsonNode = objectMapper.readTree(document);
WritableShreddedDocument shredDocument = shredder.shred(jsonNode);

SimpleStatement insertStmt = nonVectorInsertStatement(shredDocument);
List<Row> rows =
Arrays.asList(resultRow(COLUMNS_APPLIED_FAILURE, 0, Boolean.FALSE, shredDocument.txID()));
AsyncResultSet results = new MockAsyncResultSet(COLUMNS_APPLIED, rows, null);
final AtomicInteger callCount = new AtomicInteger();
QueryExecutor queryExecutor = mock(QueryExecutor.class);

when(queryExecutor.executeWrite(eq(insertStmt)))
.then(
invocation -> {
callCount.incrementAndGet();
return Uni.createFrom().item(results);
});

Supplier<CommandResult> execute =
new InsertOperation(COMMAND_CONTEXT_NON_VECTOR, shredDocument)
.execute(queryExecutor)
.subscribe()
.withSubscriber(UniAssertSubscriber.create())
.awaitItem()
.getItem();

// assert query execution
assertThat(callCount.get()).isEqualTo(1);

// then result
CommandResult result = execute.get();
assertThat(result.status())
.hasSize(1)
.containsEntry(CommandStatus.INSERTED_IDS, List.of(new DocumentId.StringId("doc1")));
assertThat(result.errors()).isNull();
}

@Test
public void insertManyUnordered() throws Exception {
String document1 =
Expand Down Expand Up @@ -819,6 +874,7 @@ private SimpleStatement nonVectorInsertStatement(WritableShreddedDocument shredD
return SimpleStatement.newInstance(
insertCql,
CQLBindValues.getDocumentIdValue(shredDocument.id()),
shredDocument.txID(),
shredDocument.docJson(),
CQLBindValues.getSetValue(shredDocument.existKeys()),
CQLBindValues.getIntegerMapValues(shredDocument.arraySize()),
Expand All @@ -835,6 +891,7 @@ private SimpleStatement vectorInsertStatement(WritableShreddedDocument shredDocu
return SimpleStatement.newInstance(
insertCql,
CQLBindValues.getDocumentIdValue(shredDocument.id()),
shredDocument.txID(),
shredDocument.docJson(),
CQLBindValues.getSetValue(shredDocument.existKeys()),
CQLBindValues.getIntegerMapValues(shredDocument.arraySize()),
Expand Down
Loading