Skip to content

Commit

Permalink
Add 'returnDocumentResponses' for "insertMany" (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
tatu-at-datastax authored Jun 18, 2024
1 parent ccae978 commit 69e93fa
Show file tree
Hide file tree
Showing 12 changed files with 410 additions and 232 deletions.
2 changes: 1 addition & 1 deletion docker-compose/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Docker Compose scripts for JSONAPI with DSE-6.9
# Docker Compose scripts for Data API with DSE-6.9

This directory provides two ways to start the Data API with DSE-6.9 or HCD using `docker compose`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public record CommandResult(
nullable = true)
})
Map<CommandStatus, Object> status,
@Schema(nullable = true) List<Error> errors) {
@JsonInclude(JsonInclude.Include.NON_EMPTY) @Schema(nullable = true) List<Error> errors) {

/**
* Constructor for only specifying the {@link MultiResponseData}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ public enum CommandStatus {
/** Status for reporting existing collections. */
@JsonProperty("collections")
EXISTING_COLLECTIONS,
/**
* List of response entries, one for each document we tried to insert with {@code insertMany}
* command. Each entry has 2 mandatory fields: {@code _id} (document id), and {@code status} (one
* of {@code OK}, {@code ERROR} or {@code SKIP}; {@code ERROR} entries also have {@code errorsIdx}
* field that refers to position of the error in the root level {@code errors} List.
*/
@JsonProperty("documentResponses")
DOCUMENT_RESPONSES,
/** The element has the list of inserted ids */
@JsonProperty("insertedIds")
INSERTED_IDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,14 @@ public record Options(
description =
"When `true` the server will insert the documents in sequential order, ensuring each document is successfully inserted before starting the next. Additionally the command will \"fail fast\", failing the first document that fails to insert. When `false` the server is free to re-order the inserts and parallelize them for performance. In this mode more than one document may fail to be inserted (aka \"fail silently\" mode).",
defaultValue = "false")
boolean ordered) {}
boolean ordered,
@Schema(
description =
"When `true`, response will contain an additional field: 'documentResponses'"
+ " with is an array of Document Response Objects. Each Document Response Object"
+ " contains the `_id` of the document and the `status` of the operation (one of"
+ " `OK`, `ERROR` or `SKIPPED`). Additional `errorsIdx` field is present when the"
+ " status is `ERROR` and contains the index of the error in the main `errors` array.",
defaultValue = "false")
boolean returnDocumentResponses) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.stargate.sgv2.jsonapi.service.operation.model.ModifyOperation;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
Expand All @@ -29,16 +30,28 @@ public record InsertOperation(
CommandContext commandContext,
List<WritableShreddedDocument> documents,
boolean ordered,
boolean offlineMode)
boolean offlineMode,
boolean returnDocumentResponses)
implements ModifyOperation {
public record WritableDocAndPosition(int position, WritableShreddedDocument document)
implements Comparable<WritableDocAndPosition> {
@Override
public int compareTo(InsertOperation.WritableDocAndPosition o) {
// Order by position (only), ascending
return Integer.compare(position, o.position);
}
}

public InsertOperation(
CommandContext commandContext, List<WritableShreddedDocument> documents, boolean ordered) {
this(commandContext, documents, ordered, false);
CommandContext commandContext,
List<WritableShreddedDocument> documents,
boolean ordered,
boolean returnDocumentResponses) {
this(commandContext, documents, ordered, false, returnDocumentResponses);
}

public InsertOperation(CommandContext commandContext, WritableShreddedDocument document) {
this(commandContext, List.of(document), false, false);
this(commandContext, List.of(document), false, false, false);
}

/** {@inheritDoc} */
Expand All @@ -57,22 +70,30 @@ public Uni<Supplier<CommandResult>> execute(
.jsonProcessingMetricsReporter()
.reportJsonWrittenDocsMetrics(commandContext().commandName(), documents.size());
}
final List<WritableDocAndPosition> docsWithPositions = new ArrayList<>(documents.size());
int pos = 0;
for (WritableShreddedDocument doc : documents) {
docsWithPositions.add(new WritableDocAndPosition(pos++, doc));
}
if (ordered) {
return insertOrdered(dataApiRequestInfo, queryExecutor, vectorEnabled);
return insertOrdered(dataApiRequestInfo, queryExecutor, vectorEnabled, docsWithPositions);
} else {
return insertUnordered(dataApiRequestInfo, queryExecutor, vectorEnabled);
return insertUnordered(dataApiRequestInfo, queryExecutor, vectorEnabled, docsWithPositions);
}
}

// implementation for the ordered insert
private Uni<Supplier<CommandResult>> insertOrdered(
DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor, boolean vectorEnabled) {
DataApiRequestInfo dataApiRequestInfo,
QueryExecutor queryExecutor,
boolean vectorEnabled,
List<WritableDocAndPosition> docsWithPositions) {

// build query once
final String query = buildInsertQuery(vectorEnabled);

return Multi.createFrom()
.iterable(documents)
.iterable(docsWithPositions)

// concatenate to respect ordered
.onItem()
Expand All @@ -89,10 +110,10 @@ private Uni<Supplier<CommandResult>> insertOrdered(
// if no failures reduce to the op page
.collect()
.in(
InsertOperationPage::new,
() -> new InsertOperationPage(docsWithPositions, returnDocumentResponses()),
(agg, in) -> {
Throwable failure = in.getItem2();
agg.aggregate(in.getItem1().id(), failure);
agg.aggregate(in.getItem1(), failure);

if (failure != null) {
throw new FailFastInsertException(agg, failure);
Expand All @@ -115,11 +136,14 @@ private Uni<Supplier<CommandResult>> insertOrdered(

// implementation for the unordered insert
private Uni<Supplier<CommandResult>> insertUnordered(
DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor, boolean vectorEnabled) {
DataApiRequestInfo dataApiRequestInfo,
QueryExecutor queryExecutor,
boolean vectorEnabled,
List<WritableDocAndPosition> docsWithPositions) {
// build query once
String query = buildInsertQuery(vectorEnabled);
return Multi.createFrom()
.iterable(documents)
.iterable(docsWithPositions)

// merge to make it parallel
.onItem()
Expand All @@ -133,7 +157,9 @@ private Uni<Supplier<CommandResult>> insertUnordered(

// then reduce here
.collect()
.in(InsertOperationPage::new, (agg, in) -> agg.aggregate(in.getItem1().id(), in.getItem2()))
.in(
() -> new InsertOperationPage(docsWithPositions, returnDocumentResponses()),
(agg, in) -> agg.aggregate(in.getItem1(), in.getItem2()))

// use object identity to resolve to Supplier<CommandResult>
.map(i -> i);
Expand All @@ -144,10 +170,11 @@ private static Uni<DocumentId> insertDocument(
DataApiRequestInfo dataApiRequestInfo,
QueryExecutor queryExecutor,
String query,
WritableShreddedDocument doc,
WritableDocAndPosition docWithPosition,
boolean vectorEnabled,
boolean offlineMode) {
// bind and execute
final WritableShreddedDocument doc = docWithPosition.document();
SimpleStatement boundStatement = bindInsertValues(query, doc, vectorEnabled, offlineMode);
return queryExecutor
.executeWrite(dataApiRequestInfo, boundStatement)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package io.stargate.sgv2.jsonapi.service.operation.model.impl;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.smallrye.mutiny.tuples.Tuple2;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.exception.mappers.ThrowableToErrorMapper;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -15,59 +19,121 @@
* The internal to insert operation results, keeping ids of successfully and not-successfully
* inserted documents.
*
* <p>Can serve as an aggregator, using the {@link #aggregate(DocumentId, Throwable)} function.
* <p>Can serve as an aggregator, using the {@link #aggregate} function.
*
* @param insertedIds Documents IDs that we successfully inserted.
* @param failedIds Document IDs that failed to be inserted.
* @param successfulInsertions Documents that we successfully inserted.
* @param failedInsertions Documents that failed to be inserted, along with failure reason.
*/
public record InsertOperationPage(
List<DocumentId> insertedIds, Map<DocumentId, Throwable> failedIds)
List<InsertOperation.WritableDocAndPosition> allAttemptedInsertions,
boolean returnDocumentResponses,
List<InsertOperation.WritableDocAndPosition> successfulInsertions,
List<Tuple2<InsertOperation.WritableDocAndPosition, Throwable>> failedInsertions)
implements Supplier<CommandResult> {
enum InsertionStatus {
OK,
ERROR,
SKIPPED
}

@JsonPropertyOrder({"_id", "status", "errorsIdx"})
@JsonInclude(JsonInclude.Include.NON_NULL)
record InsertionResult(DocumentId _id, InsertionStatus status, Integer errorsIdx) {}

/** No-arg constructor, usually used for aggregation. */
public InsertOperationPage() {
this(new ArrayList<>(), new HashMap<>());
public InsertOperationPage(
List<InsertOperation.WritableDocAndPosition> allAttemptedInsertions,
boolean returnDocumentResponses) {
this(allAttemptedInsertions, returnDocumentResponses, new ArrayList<>(), new ArrayList<>());
}

/** {@inheritDoc} */
@Override
public CommandResult get() {
// if we have errors, transform
if (null != failedIds && !failedIds().isEmpty()) {

List<CommandResult.Error> errors = new ArrayList<>(failedIds.size());
failedIds.forEach((documentId, throwable) -> errors.add(getError(documentId, throwable)));
// Ensure insertions and errors are in the input order (wrt unordered insertions),
// regardless of output format
Collections.sort(successfulInsertions);
if (!failedInsertions().isEmpty()) {
Collections.sort(
failedInsertions, Comparator.comparing(tuple -> tuple.getItem1().position()));
}

if (!returnDocumentResponses()) { // legacy output, limited to ids, error messages
List<CommandResult.Error> errors;
if (failedInsertions().isEmpty()) {
errors = null;
} else {
errors =
failedInsertions.stream()
.map(tuple -> getError(tuple.getItem1().document().id(), tuple.getItem2()))
.toList();
}
// Old style, simple ids:
List<DocumentId> insertedIds =
successfulInsertions.stream().map(docAndPos -> docAndPos.document().id()).toList();
return new CommandResult(null, Map.of(CommandStatus.INSERTED_IDS, insertedIds), errors);
}

// id no errors, just inserted ids
return new CommandResult(Map.of(CommandStatus.INSERTED_IDS, insertedIds));
// New style output: detailed responses.
InsertionResult[] results = new InsertionResult[allAttemptedInsertions().size()];
List<CommandResult.Error> errors = new ArrayList<>();

// Results array filled in order: first successful insertions
for (InsertOperation.WritableDocAndPosition docAndPos : successfulInsertions) {
results[docAndPos.position()] =
new InsertionResult(docAndPos.document().id(), InsertionStatus.OK, null);
}
// Second: failed insertions
for (Tuple2<InsertOperation.WritableDocAndPosition, Throwable> failed : failedInsertions) {
InsertOperation.WritableDocAndPosition docAndPos = failed.getItem1();
Throwable throwable = failed.getItem2();
CommandResult.Error error = getError(throwable);

// We want to avoid adding the same error multiple times, so we keep track of the index:
// either one exists, use it; or if not, add it and use the new index.
int errorIdx = errors.indexOf(error);
if (errorIdx < 0) { // new non-dup error; add it
errorIdx = errors.size(); // will be appended at the end
errors.add(error);
}
results[docAndPos.position()] =
new InsertionResult(docAndPos.document().id(), InsertionStatus.ERROR, errorIdx);
}
// And third, if any, skipped insertions; those that were not attempted (f.ex due
// to failure for ordered inserts)
for (int i = 0; i < results.length; i++) {
if (null == results[i]) {
results[i] =
new InsertionResult(
allAttemptedInsertions.get(i).document().id(), InsertionStatus.SKIPPED, null);
}
}
return new CommandResult(
null, Map.of(CommandStatus.DOCUMENT_RESPONSES, Arrays.asList(results)), errors);
}

private static CommandResult.Error getError(DocumentId documentId, Throwable throwable) {
String message =
"Failed to insert document with _id %s: %s".formatted(documentId, throwable.getMessage());

Map<String, Object> fields = new HashMap<>();
fields.put("exceptionClass", throwable.getClass().getSimpleName());
if (throwable instanceof JsonApiException jae) {
fields.put("errorCode", jae.getErrorCode().name());
}
return ThrowableToErrorMapper.getMapperWithMessageFunction().apply(throwable, message);
}

private static CommandResult.Error getError(Throwable throwable) {
return ThrowableToErrorMapper.getMapperWithMessageFunction()
.apply(throwable, throwable.getMessage());
}

/**
* Aggregates the result of the insert operation into this object.
*
* @param id ID of the document that was inserted written.
* @param failure If not null, means an error occurred during the write.
* @param docWithPosition Document that was inserted (or failed to be inserted)
* @param failure If not null, means an error occurred during attempted insertion
*/
public void aggregate(DocumentId id, Throwable failure) {
if (null != failure) {
failedIds.put(id, failure);
public void aggregate(InsertOperation.WritableDocAndPosition docWithPosition, Throwable failure) {
if (null == failure) {
successfulInsertions.add(docWithPosition);
} else {
insertedIds.add(id);
failedInsertions.add(Tuple2.of(docWithPosition, failure));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package io.stargate.sgv2.jsonapi.service.resolver.model.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertManyCommand;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.InsertOperation;
import io.stargate.sgv2.jsonapi.service.projection.IndexingProjector;
import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver;
import io.stargate.sgv2.jsonapi.service.shredding.Shredder;
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
Expand All @@ -18,12 +16,10 @@
public class InsertManyCommandResolver implements CommandResolver<InsertManyCommand> {

private final Shredder shredder;
private final ObjectMapper objectMapper;

@Inject
public InsertManyCommandResolver(Shredder shredder, ObjectMapper objectMapper) {
public InsertManyCommandResolver(Shredder shredder) {
this.shredder = shredder;
this.objectMapper = objectMapper;
}

@Override
Expand All @@ -33,15 +29,13 @@ public Class<InsertManyCommand> getCommandClass() {

@Override
public Operation resolveCommand(CommandContext ctx, InsertManyCommand command) {
final IndexingProjector projection = ctx.indexingProjector();
final List<WritableShreddedDocument> shreddedDocuments =
command.documents().stream().map(doc -> shredder.shred(ctx, doc, null)).toList();

// resolve ordered
InsertManyCommand.Options options = command.options();
boolean ordered = (null != options) && options.ordered();
boolean returnDocumentResponses = (null != options) && options.returnDocumentResponses();

boolean ordered = null != options && Boolean.TRUE.equals(options.ordered());

return new InsertOperation(ctx, shreddedDocuments, ordered);
return new InsertOperation(ctx, shreddedDocuments, ordered, false, returnDocumentResponses);
}
}
Loading

0 comments on commit 69e93fa

Please sign in to comment.