Skip to content

Commit

Permalink
relates to #178: insert commands per spec, added oerdered/unordered, …
Browse files Browse the repository at this point in the history
…better tests (#216)
  • Loading branch information
Ivan Senic authored Mar 8, 2023
1 parent a37700f commit e36f2c3
Show file tree
Hide file tree
Showing 16 changed files with 1,784 additions and 311 deletions.
45 changes: 25 additions & 20 deletions src/main/java/io/stargate/sgv2/jsonapi/StargateJsonApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,24 +196,29 @@
"""
{
"insertMany": {
"documents": [{
"_id": "1",
"location": "London",
"race": {
"competitors": 100,
"start_date": "2022-08-15"
},
"tags" : [ "eu" ]
},
{
"_id": "2",
"location": "New York",
"race": {
"competitors": 150,
"start_date": "2022-09-15"
},
"tags": [ "us" ]
}]
"documents": [
{
"_id": "1",
"location": "London",
"race": {
"competitors": 100,
"start_date": "2022-08-15"
},
"tags" : [ "eu" ]
},
{
"_id": "2",
"location": "New York",
"race": {
"competitors": 150,
"start_date": "2022-09-15"
},
"tags": [ "us" ]
}
],
"options": {
"ordered": true
}
}
}
"""),
Expand Down Expand Up @@ -269,7 +274,7 @@
"""),
@ExampleObject(
name = "resultCount",
summary = "countDocuments command result",
summary = "`countDocuments` command result",
value =
"""
{
Expand Down Expand Up @@ -369,7 +374,7 @@
"""),
@ExampleObject(
name = "resultInsert",
summary = "Insert command result",
summary = "`insertOne` & `insertMany` command result",
value =
"""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,35 @@
import io.stargate.sgv2.jsonapi.api.model.command.ModifyCommand;
import java.util.List;
import javax.annotation.Nullable;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

/**
* Representation of the insertMany API {@link Command}.
*
* @param document The document to insert.
* @param documents The document to insert.
* @param options Options for this command.
*/
@Schema(description = "Command that inserts multiple JSON document to a collection.")
@JsonTypeName("insertMany")
public record InsertManyCommand(
@NotNull
@NotEmpty
@Schema(
description = "JSON document to insert.",
implementation = Object.class,
type = SchemaType.ARRAY)
List<JsonNode> documents,
@Nullable Options options)
implements ModifyCommand {
public record Options() {}

@Schema(name = "InsertManyCommand.Options", description = "Options for inserting many documents.")
public record Options(
@Schema(
description =
"When `true` the server will insert the documents in sequential order, otherwise when `false` the server is free to re-order the inserts and parallelize them for performance. See specifications for more info on failure modes.",
defaultValue = "true")
Boolean ordered) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.stargate.sgv2.jsonapi.api.model.command.Command;
import io.stargate.sgv2.jsonapi.api.model.command.ModifyCommand;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
import org.eclipse.microprofile.openapi.annotations.media.Schema;
Expand All @@ -22,8 +21,5 @@ public record InsertOneCommand(
description = "JSON document to insert.",
implementation = Object.class,
type = SchemaType.OBJECT)
JsonNode document,
@Nullable Options options)
implements ModifyCommand {
public record Options() {}
}
JsonNode document)
implements ModifyCommand {}
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ public CollectionResource(CommandProcessor commandProcessor) {
schema = @Schema(implementation = CommandResult.class),
examples = {
@ExampleObject(ref = "resultCount"),
@ExampleObject(ref = "resultDeleteOne"),
@ExampleObject(ref = "resultDeleteMany"),
@ExampleObject(ref = "resultRead"),
@ExampleObject(ref = "resultFindOneAndUpdate"),
@ExampleObject(ref = "resultInsert"),
@ExampleObject(ref = "resultError"),
@ExampleObject(ref = "resultDeleteOne"),
@ExampleObject(ref = "resultDeleteMany"),
@ExampleObject(ref = "resultUpdateMany"),
@ExampleObject(ref = "resultUpdateOne"),
@ExampleObject(ref = "resultError"),
})))
@POST
public Uni<RestResponse<CommandResult>> postCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public enum ErrorCode {

CONCURRENCY_FAILURE("Unable to complete transaction due to concurrent transactions"),

DOCUMENT_ALREADY_EXISTS("Document already exists with the id"),
DOCUMENT_ALREADY_EXISTS("Document already exists with the given _id"),

DOCUMENT_UNPARSEABLE("Unable to parse the document"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public CommandResult get() {
return new CommandResult(List.of(error, causeError));
}
}

public ErrorCode getErrorCode() {
return errorCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
Expand All @@ -16,58 +17,140 @@
import java.util.List;
import java.util.function.Supplier;

/** Operation that inserts one or more documents. */
/**
* Operation that inserts one or more documents.
*
* @param commandContext Context that defines namespace and database.
* @param documents Documents to insert.
* @param ordered If insert should be ordered.
*/
public record InsertOperation(
CommandContext commandContext, List<WritableShreddedDocument> documents)
CommandContext commandContext, List<WritableShreddedDocument> documents, boolean ordered)
implements ModifyOperation {

public InsertOperation(CommandContext commandContext, WritableShreddedDocument document) {
this(commandContext, List.of(document));
this(commandContext, List.of(document), true);
}

/** {@inheritDoc} */
@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
if (ordered) {
return insertOrdered(queryExecutor);
} else {
return insertUnordered(queryExecutor);
}
}

// implementation for the ordered insert
private Uni<Supplier<CommandResult>> insertOrdered(QueryExecutor queryExecutor) {
// build query once
QueryOuterClass.Query query = buildInsertQuery();
final Uni<List<DocumentId>> ids =
Multi.createFrom()
.items(documents.stream())
.onItem()
.transformToUniAndConcatenate(doc -> insertDocument(queryExecutor, query, doc))
.collect()
.asList();
return ids.onItem().transform(insertedIds -> new InsertOperationPage(insertedIds, documents));

return Multi.createFrom()
.iterable(documents)

// concatenate to respect ordered
.onItem()
.transformToUni(
doc ->
insertDocument(queryExecutor, query, doc)

// wrap item and failure
// the collection can decide how to react on failure
.onItemOrFailure()
.transform((id, t) -> Tuple2.of(doc, t)))
.concatenate(false)

// if no failures reduce to the op page
.collect()
.in(
InsertOperationPage::new,
(agg, in) -> {
Throwable failure = in.getItem2();
agg.aggregate(in.getItem1().id(), failure);

if (failure != null) {
throw new FailFastInsertException(agg, failure);
}
})

// in case upstream propagated FailFastInsertException
// return collected result
.onFailure(FailFastInsertException.class)
.recoverWithItem(
e -> {
// safe to cast, asserted class in onFailure
FailFastInsertException failFastInsertException = (FailFastInsertException) e;
return failFastInsertException.result;
})

// use object identity to resolve to Supplier<CommandResult>
.map(i -> i);
}

// implementation for the unordered insert
private Uni<Supplier<CommandResult>> insertUnordered(QueryExecutor queryExecutor) {
// build query once
QueryOuterClass.Query query = buildInsertQuery();

return Multi.createFrom()
.iterable(documents)

// merge to make it parallel
.onItem()
.transformToUniAndMerge(
doc ->
insertDocument(queryExecutor, query, doc)

// handle errors fail silent mode
.onItemOrFailure()
.transform((id, t) -> Tuple2.of(doc, t)))

// then reduce here
.collect()
.in(InsertOperationPage::new, (agg, in) -> agg.aggregate(in.getItem1().id(), in.getItem2()))

// use object identity to resolve to Supplier<CommandResult>
.map(i -> i);
}

// inserts a single document
private static Uni<DocumentId> insertDocument(
QueryExecutor queryExecutor, QueryOuterClass.Query query, WritableShreddedDocument doc) {
query = bindInsertValues(query, doc);
// bind and execute
QueryOuterClass.Query bindedQuery = bindInsertValues(query, doc);

return queryExecutor
.executeWrite(query)
.executeWrite(bindedQuery)

// ensure document was written, if no applied continue with error
.onItem()
.transform(
.transformToUni(
result -> {
if (result.getRows(0).getValues(0).getBoolean()) {
return doc.id();
return Uni.createFrom().item(doc.id());
} else {
throw new JsonApiException(
ErrorCode.DOCUMENT_ALREADY_EXISTS,
String.format("Document already exists with the _id: %s", doc.id()));
Exception failure = new JsonApiException(ErrorCode.DOCUMENT_ALREADY_EXISTS);
return Uni.createFrom().failure(failure);
}
});
}

// utility for building the insert query
private QueryOuterClass.Query buildInsertQuery() {
String insert =
"INSERT INTO %s.%s"
+ " (key, tx_id, doc_json, exist_keys, sub_doc_equals, array_size, array_equals, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (key, tx_id, doc_json, exist_keys, sub_doc_equals, array_size, array_equals, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";

return QueryOuterClass.Query.newBuilder()
.setCql(String.format(insert, commandContext.namespace(), commandContext.collection()))
.build();
}

// utility for query binding
private static QueryOuterClass.Query bindInsertValues(
QueryOuterClass.Query builtQuery, WritableShreddedDocument doc) {
// respect the order in the DocsApiConstants.ALL_COLUMNS_NAMES
Expand All @@ -87,4 +170,15 @@ private static QueryOuterClass.Query bindInsertValues(
.addValues(Values.of(CustomValueSerializers.getSetValue(doc.queryNullValues())));
return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build();
}

// simple exception to propagate fail fast
private static class FailFastInsertException extends RuntimeException {

private final InsertOperationPage result;

public FailFastInsertException(InsertOperationPage result, Throwable cause) {
super(cause);
this.result = result;
}
}
}
Loading

0 comments on commit e36f2c3

Please sign in to comment.