Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relates to #178: insert commands per spec, added oerdered/unordered, better tests #216

Merged
merged 9 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions src/main/java/io/stargate/sgv2/jsonapi/StargateJsonApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,24 +196,29 @@
"""
{
"insertMany": {
"documents": [{
"_id": "1",
"location": "London",
"race": {
"competitors": 100,
"start_date": "2022-08-15"
},
"tags" : [ "eu" ]
},
{
"_id": "2",
"location": "New York",
"race": {
"competitors": 150,
"start_date": "2022-09-15"
},
"tags": [ "us" ]
}]
"documents": [
{
"_id": "1",
"location": "London",
"race": {
"competitors": 100,
"start_date": "2022-08-15"
},
"tags" : [ "eu" ]
},
{
"_id": "2",
"location": "New York",
"race": {
"competitors": 150,
"start_date": "2022-09-15"
},
"tags": [ "us" ]
}
],
"options": {
"ordered": true
}
}
}
"""),
Expand Down Expand Up @@ -269,7 +274,7 @@
"""),
@ExampleObject(
name = "resultCount",
summary = "countDocuments command result",
summary = "`countDocuments` command result",
value =
"""
{
Expand Down Expand Up @@ -369,7 +374,7 @@
"""),
@ExampleObject(
name = "resultInsert",
summary = "Insert command result",
summary = "`insertOne` & `insertMany` command result",
value =
"""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,35 @@
import io.stargate.sgv2.jsonapi.api.model.command.ModifyCommand;
import java.util.List;
import javax.annotation.Nullable;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

/**
* Representation of the insertMany API {@link Command}.
*
* @param document The document to insert.
* @param documents The document to insert.
* @param options Options for this command.
*/
@Schema(description = "Command that inserts multiple JSON document to a collection.")
@JsonTypeName("insertMany")
public record InsertManyCommand(
@NotNull
@NotEmpty
@Schema(
description = "JSON document to insert.",
implementation = Object.class,
type = SchemaType.ARRAY)
List<JsonNode> documents,
@Nullable Options options)
implements ModifyCommand {
public record Options() {}

@Schema(name = "InsertManyCommand.Options", description = "Options for inserting many documents.")
public record Options(
@Schema(
description =
"When `true` the server will insert the documents in sequential order, otherwise when `false` the server is free to re-order the inserts and parallelize them for performance. See specifications for more info on failure modes.",
defaultValue = "true")
Boolean ordered) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.stargate.sgv2.jsonapi.api.model.command.Command;
import io.stargate.sgv2.jsonapi.api.model.command.ModifyCommand;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
import org.eclipse.microprofile.openapi.annotations.media.Schema;
Expand All @@ -22,8 +21,5 @@ public record InsertOneCommand(
description = "JSON document to insert.",
implementation = Object.class,
type = SchemaType.OBJECT)
JsonNode document,
@Nullable Options options)
implements ModifyCommand {
public record Options() {}
}
JsonNode document)
implements ModifyCommand {}
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ public CollectionResource(CommandProcessor commandProcessor) {
schema = @Schema(implementation = CommandResult.class),
examples = {
@ExampleObject(ref = "resultCount"),
@ExampleObject(ref = "resultDeleteOne"),
@ExampleObject(ref = "resultDeleteMany"),
@ExampleObject(ref = "resultRead"),
@ExampleObject(ref = "resultFindOneAndUpdate"),
@ExampleObject(ref = "resultInsert"),
@ExampleObject(ref = "resultError"),
@ExampleObject(ref = "resultDeleteOne"),
@ExampleObject(ref = "resultDeleteMany"),
@ExampleObject(ref = "resultUpdateMany"),
@ExampleObject(ref = "resultUpdateOne"),
@ExampleObject(ref = "resultError"),
})))
@POST
public Uni<RestResponse<CommandResult>> postCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public enum ErrorCode {

CONCURRENCY_FAILURE("Unable to complete transaction due to concurrent transactions"),

DOCUMENT_ALREADY_EXISTS("Document already exists with the id"),
DOCUMENT_ALREADY_EXISTS("Document already exists with the given _id"),

DOCUMENT_UNPARSEABLE("Unable to parse the document"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public CommandResult get() {
return new CommandResult(List.of(error, causeError));
}
}

public ErrorCode getErrorCode() {
return errorCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
Expand All @@ -16,58 +17,140 @@
import java.util.List;
import java.util.function.Supplier;

/** Operation that inserts one or more documents. */
/**
* Operation that inserts one or more documents.
*
* @param commandContext Context that defines namespace and database.
* @param documents Documents to insert.
* @param ordered If insert should be ordered.
*/
public record InsertOperation(
CommandContext commandContext, List<WritableShreddedDocument> documents)
CommandContext commandContext, List<WritableShreddedDocument> documents, boolean ordered)
implements ModifyOperation {

public InsertOperation(CommandContext commandContext, WritableShreddedDocument document) {
this(commandContext, List.of(document));
this(commandContext, List.of(document), true);
}

/** {@inheritDoc} */
@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
if (ordered) {
return insertOrdered(queryExecutor);
} else {
return insertUnordered(queryExecutor);
}
}

// implementation for the ordered insert
private Uni<Supplier<CommandResult>> insertOrdered(QueryExecutor queryExecutor) {
// build query once
QueryOuterClass.Query query = buildInsertQuery();
final Uni<List<DocumentId>> ids =
Multi.createFrom()
.items(documents.stream())
.onItem()
.transformToUniAndConcatenate(doc -> insertDocument(queryExecutor, query, doc))
.collect()
.asList();
return ids.onItem().transform(insertedIds -> new InsertOperationPage(insertedIds, documents));

return Multi.createFrom()
.iterable(documents)

// concatenate to respect ordered
.onItem()
.transformToUni(
doc ->
insertDocument(queryExecutor, query, doc)

// wrap item and failure
// the collection can decide how to react on failure
.onItemOrFailure()
.transform((id, t) -> Tuple2.of(doc, t)))
.concatenate(false)

// if no failures reduce to the op page
.collect()
.in(
InsertOperationPage::new,
(agg, in) -> {
Throwable failure = in.getItem2();
agg.aggregate(in.getItem1().id(), failure);

if (failure != null) {
throw new FailFastInsertException(agg, failure);
}
})

// in case upstream propagated FailFastInsertException
// return collected result
.onFailure(FailFastInsertException.class)
.recoverWithItem(
e -> {
// safe to cast, asserted class in onFailure
FailFastInsertException failFastInsertException = (FailFastInsertException) e;
return failFastInsertException.result;
})

// use object identity to resolve to Supplier<CommandResult>
.map(i -> i);
ivansenic marked this conversation as resolved.
Show resolved Hide resolved
}

// implementation for the unordered insert
private Uni<Supplier<CommandResult>> insertUnordered(QueryExecutor queryExecutor) {
// build query once
QueryOuterClass.Query query = buildInsertQuery();

return Multi.createFrom()
.iterable(documents)

// merge to make it parallel
.onItem()
.transformToUniAndMerge(
doc ->
insertDocument(queryExecutor, query, doc)

// handle errors fail silent mode
.onItemOrFailure()
.transform((id, t) -> Tuple2.of(doc, t)))

// then reduce here
.collect()
.in(InsertOperationPage::new, (agg, in) -> agg.aggregate(in.getItem1().id(), in.getItem2()))
ivansenic marked this conversation as resolved.
Show resolved Hide resolved

// use object identity to resolve to Supplier<CommandResult>
.map(i -> i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed in() would have already return Uni< InsertOperationPage>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not compile. The method signature must be

Uni<? extends Supplier<CommandResult>>

so that we can return the Uni<InsertOperationPage>.

I wanted to refactor, but it's not a small change, should be changed in all sub-classes and tests.. Do you think it's worth it?

}

// inserts a single document
private static Uni<DocumentId> insertDocument(
QueryExecutor queryExecutor, QueryOuterClass.Query query, WritableShreddedDocument doc) {
query = bindInsertValues(query, doc);
// bind and execute
QueryOuterClass.Query bindedQuery = bindInsertValues(query, doc);

return queryExecutor
.executeWrite(query)
.executeWrite(bindedQuery)

// ensure document was written, if no applied continue with error
.onItem()
.transform(
.transformToUni(
result -> {
if (result.getRows(0).getValues(0).getBoolean()) {
return doc.id();
return Uni.createFrom().item(doc.id());
} else {
throw new JsonApiException(
ErrorCode.DOCUMENT_ALREADY_EXISTS,
String.format("Document already exists with the _id: %s", doc.id()));
Exception failure = new JsonApiException(ErrorCode.DOCUMENT_ALREADY_EXISTS);
return Uni.createFrom().failure(failure);
}
});
}

// utility for building the insert query
private QueryOuterClass.Query buildInsertQuery() {
String insert =
"INSERT INTO %s.%s"
+ " (key, tx_id, doc_json, exist_keys, sub_doc_equals, array_size, array_equals, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";
+ " (key, tx_id, doc_json, exist_keys, sub_doc_equals, array_size, array_equals, array_contains, query_bool_values, query_dbl_values , query_text_values, query_null_values)"
+ " VALUES"
+ " (?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS";

return QueryOuterClass.Query.newBuilder()
.setCql(String.format(insert, commandContext.namespace(), commandContext.collection()))
.build();
}

// utility for query binding
private static QueryOuterClass.Query bindInsertValues(
QueryOuterClass.Query builtQuery, WritableShreddedDocument doc) {
// respect the order in the DocsApiConstants.ALL_COLUMNS_NAMES
Expand All @@ -87,4 +170,15 @@ private static QueryOuterClass.Query bindInsertValues(
.addValues(Values.of(CustomValueSerializers.getSetValue(doc.queryNullValues())));
return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build();
}

// simple exception to propagate fail fast
private static class FailFastInsertException extends RuntimeException {

private final InsertOperationPage result;

public FailFastInsertException(InsertOperationPage result, Throwable cause) {
super(cause);
this.result = result;
}
}
}
Loading