Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'returnDocumentResponses' for "insertMany" #1161

Merged
merged 32 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5d19a40
Start adding wiring for 'returnDocumentPositions'
tatu-at-datastax Jun 11, 2024
f3b0df9
mvn fmt:format
tatu-at-datastax Jun 11, 2024
73645ca
...
tatu-at-datastax Jun 11, 2024
2468222
Fix test signatures
tatu-at-datastax Jun 11, 2024
c02673a
More plumbing, get "returnDocumentPositions" where needed
tatu-at-datastax Jun 11, 2024
f741b3e
Test refactoring
tatu-at-datastax Jun 11, 2024
aa0423c
More test refactoring
tatu-at-datastax Jun 11, 2024
c9975d8
...
tatu-at-datastax Jun 11, 2024
74b07df
Start piping through position of document
tatu-at-datastax Jun 11, 2024
a780120
Add new status keys, clean up ITs
tatu-at-datastax Jun 12, 2024
7526ad2
More plumbing, order failed/ok insertions by input position
tatu-at-datastax Jun 12, 2024
28fcb2b
...
tatu-at-datastax Jun 12, 2024
d99d503
Merge branch 'main' into tatu/c2-3382-return-doc-positions
tatu-at-datastax Jun 12, 2024
616b34a
Fix ITs not to rely on insertion/fail order for unordered case
tatu-at-datastax Jun 12, 2024
499c2e6
Merge branch 'main' into tatu/c2-3382-return-doc-positions
tatu-at-datastax Jun 12, 2024
63451c1
Merge branch 'main' into tatu/c2-3382-return-doc-positions
tatu-at-datastax Jun 12, 2024
009878a
Implement functionality; need tests next
tatu-at-datastax Jun 12, 2024
ca553ea
Add the first test for actual return-on-dups
tatu-at-datastax Jun 12, 2024
6aae747
Refactoring
tatu-at-datastax Jun 12, 2024
60b4c6b
Add test verifying returning of auto-generated UUIDs
tatu-at-datastax Jun 12, 2024
bf45bfa
Bit more testing
tatu-at-datastax Jun 12, 2024
c96cdb6
Merge branch 'main' into tatu/c2-3382-return-doc-positions
tatu-at-datastax Jun 13, 2024
2aa4593
Merge branch 'main' into tatu/c2-3382-return-doc-positions
tatu-at-datastax Jun 17, 2024
6949204
Renaming
tatu-at-datastax Jun 17, 2024
91854a3
Rename setting in integration tests too (returnDocumentPositions->ret…
tatu-at-datastax Jun 17, 2024
8d7d0ea
Merge branch 'main' into tatu/c2-3382-return-doc-positions
tatu-at-datastax Jun 18, 2024
28b4fd0
Some scaffolding for rewrite to new spec
tatu-at-datastax Jun 18, 2024
4f1db6f
Finish initial implementation
tatu-at-datastax Jun 18, 2024
0c655e4
Fix 2 ITs wrt output structure changes
tatu-at-datastax Jun 18, 2024
adfd9fa
comment improvement
tatu-at-datastax Jun 18, 2024
da1aa4d
Comment improvement
tatu-at-datastax Jun 18, 2024
f18572c
Add comments suggested by code review
tatu-at-datastax Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Docker Compose scripts for JSONAPI with DSE-6.9
# Docker Compose scripts for Data API with DSE-6.9

This directory provides two ways to start the Data API with DSE-6.9 or HCD using `docker compose`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public record CommandResult(
nullable = true)
})
Map<CommandStatus, Object> status,
@Schema(nullable = true) List<Error> errors) {
@JsonInclude(JsonInclude.Include.NON_EMPTY) @Schema(nullable = true) List<Error> errors) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this so that both null and [ ] values are skipped (not just nulls)


/**
* Constructor for only specifying the {@link MultiResponseData}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ public enum CommandStatus {
/** Status for reporting existing collections. */
@JsonProperty("collections")
EXISTING_COLLECTIONS,
/**
* List of response entries, one for each document we tried to insert with {@code insertMany}
* command. Each entry has 2 mandatory fields: {@code _id} (document id), and {@code status} (one
* of {@code OK}, {@code ERROR} or {@code SKIP}; {@code ERROR} entries also have {@code errorsIdx}
* field that refers to position of the error in the root level {@code errors} List.
*/
@JsonProperty("documentResponses")
DOCUMENT_RESPONSES,
/** The element has the list of inserted ids */
@JsonProperty("insertedIds")
INSERTED_IDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,14 @@ public record Options(
description =
"When `true` the server will insert the documents in sequential order, ensuring each document is successfully inserted before starting the next. Additionally the command will \"fail fast\", failing the first document that fails to insert. When `false` the server is free to re-order the inserts and parallelize them for performance. In this mode more than one document may fail to be inserted (aka \"fail silently\" mode).",
defaultValue = "false")
boolean ordered) {}
boolean ordered,
@Schema(
description =
"When `true`, response will contain an additional field: 'documentResponses'"
+ " with is an array of Document Response Objects. Each Document Response Object"
+ " contains the `_id` of the document and the `status` of the operation (one of"
+ " `OK`, `ERROR` or `SKIPPED`). Additional `errorsIdx` field is present when the"
+ " status is `ERROR` and contains the index of the error in the main `errors` array.",
defaultValue = "false")
boolean returnDocumentResponses) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.stargate.sgv2.jsonapi.service.operation.model.ModifyOperation;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
Expand All @@ -29,16 +30,28 @@ public record InsertOperation(
CommandContext commandContext,
List<WritableShreddedDocument> documents,
boolean ordered,
boolean offlineMode)
boolean offlineMode,
boolean returnDocumentResponses)
implements ModifyOperation {
public record WritableDocAndPosition(int position, WritableShreddedDocument document)
tatu-at-datastax marked this conversation as resolved.
Show resolved Hide resolved
implements Comparable<WritableDocAndPosition> {
@Override
public int compareTo(InsertOperation.WritableDocAndPosition o) {
// Order by position (only), ascending
return Integer.compare(position, o.position);
}
}

public InsertOperation(
CommandContext commandContext, List<WritableShreddedDocument> documents, boolean ordered) {
this(commandContext, documents, ordered, false);
CommandContext commandContext,
List<WritableShreddedDocument> documents,
boolean ordered,
boolean returnDocumentResponses) {
this(commandContext, documents, ordered, false, returnDocumentResponses);
}

public InsertOperation(CommandContext commandContext, WritableShreddedDocument document) {
this(commandContext, List.of(document), false, false);
this(commandContext, List.of(document), false, false, false);
}

/** {@inheritDoc} */
Expand All @@ -57,22 +70,30 @@ public Uni<Supplier<CommandResult>> execute(
.jsonProcessingMetricsReporter()
.reportJsonWrittenDocsMetrics(commandContext().commandName(), documents.size());
}
final List<WritableDocAndPosition> docsWithPositions = new ArrayList<>(documents.size());
int pos = 0;
for (WritableShreddedDocument doc : documents) {
docsWithPositions.add(new WritableDocAndPosition(pos++, doc));
}
if (ordered) {
return insertOrdered(dataApiRequestInfo, queryExecutor, vectorEnabled);
return insertOrdered(dataApiRequestInfo, queryExecutor, vectorEnabled, docsWithPositions);
} else {
return insertUnordered(dataApiRequestInfo, queryExecutor, vectorEnabled);
return insertUnordered(dataApiRequestInfo, queryExecutor, vectorEnabled, docsWithPositions);
}
}

// implementation for the ordered insert
private Uni<Supplier<CommandResult>> insertOrdered(
DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor, boolean vectorEnabled) {
DataApiRequestInfo dataApiRequestInfo,
QueryExecutor queryExecutor,
boolean vectorEnabled,
List<WritableDocAndPosition> docsWithPositions) {

// build query once
final String query = buildInsertQuery(vectorEnabled);

return Multi.createFrom()
.iterable(documents)
.iterable(docsWithPositions)

// concatenate to respect ordered
.onItem()
Expand All @@ -89,10 +110,10 @@ private Uni<Supplier<CommandResult>> insertOrdered(
// if no failures reduce to the op page
.collect()
.in(
InsertOperationPage::new,
() -> new InsertOperationPage(docsWithPositions, returnDocumentResponses()),
(agg, in) -> {
Throwable failure = in.getItem2();
agg.aggregate(in.getItem1().id(), failure);
agg.aggregate(in.getItem1(), failure);

if (failure != null) {
throw new FailFastInsertException(agg, failure);
Expand All @@ -115,11 +136,14 @@ private Uni<Supplier<CommandResult>> insertOrdered(

// implementation for the unordered insert
private Uni<Supplier<CommandResult>> insertUnordered(
DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor, boolean vectorEnabled) {
DataApiRequestInfo dataApiRequestInfo,
QueryExecutor queryExecutor,
boolean vectorEnabled,
List<WritableDocAndPosition> docsWithPositions) {
// build query once
String query = buildInsertQuery(vectorEnabled);
return Multi.createFrom()
.iterable(documents)
.iterable(docsWithPositions)

// merge to make it parallel
.onItem()
Expand All @@ -133,7 +157,9 @@ private Uni<Supplier<CommandResult>> insertUnordered(

// then reduce here
.collect()
.in(InsertOperationPage::new, (agg, in) -> agg.aggregate(in.getItem1().id(), in.getItem2()))
.in(
() -> new InsertOperationPage(docsWithPositions, returnDocumentResponses()),
(agg, in) -> agg.aggregate(in.getItem1(), in.getItem2()))

// use object identity to resolve to Supplier<CommandResult>
.map(i -> i);
Expand All @@ -144,10 +170,11 @@ private static Uni<DocumentId> insertDocument(
DataApiRequestInfo dataApiRequestInfo,
QueryExecutor queryExecutor,
String query,
WritableShreddedDocument doc,
WritableDocAndPosition docWithPosition,
boolean vectorEnabled,
boolean offlineMode) {
// bind and execute
final WritableShreddedDocument doc = docWithPosition.document();
SimpleStatement boundStatement = bindInsertValues(query, doc, vectorEnabled, offlineMode);
return queryExecutor
.executeWrite(dataApiRequestInfo, boundStatement)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package io.stargate.sgv2.jsonapi.service.operation.model.impl;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.smallrye.mutiny.tuples.Tuple2;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.exception.mappers.ThrowableToErrorMapper;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
Expand All @@ -15,59 +19,118 @@
* The internal to insert operation results, keeping ids of successfully and not-successfully
* inserted documents.
*
* <p>Can serve as an aggregator, using the {@link #aggregate(DocumentId, Throwable)} function.
* <p>Can serve as an aggregator, using the {@link #aggregate} function.
*
* @param insertedIds Documents IDs that we successfully inserted.
* @param failedIds Document IDs that failed to be inserted.
* @param successfulInsertions Documents that we successfully inserted.
* @param failedInsertions Documents that failed to be inserted, along with failure reason.
*/
public record InsertOperationPage(
List<DocumentId> insertedIds, Map<DocumentId, Throwable> failedIds)
List<InsertOperation.WritableDocAndPosition> allAttemptedInsertions,
boolean returnDocumentResponses,
List<InsertOperation.WritableDocAndPosition> successfulInsertions,
List<Tuple2<InsertOperation.WritableDocAndPosition, Throwable>> failedInsertions)
implements Supplier<CommandResult> {
enum InsertionStatus {
OK,
ERROR,
SKIPPED
}

@JsonPropertyOrder({"_id", "status", "errorsIdx"})
@JsonInclude(JsonInclude.Include.NON_NULL)
record InsertionResult(DocumentId _id, InsertionStatus status, Integer errorsIdx) {}

/** No-arg constructor, usually used for aggregation. */
public InsertOperationPage() {
this(new ArrayList<>(), new HashMap<>());
public InsertOperationPage(
List<InsertOperation.WritableDocAndPosition> allAttemptedInsertions,
boolean returnDocumentResponses) {
this(allAttemptedInsertions, returnDocumentResponses, new ArrayList<>(), new ArrayList<>());
}

/** {@inheritDoc} */
@Override
public CommandResult get() {
// if we have errors, transform
if (null != failedIds && !failedIds().isEmpty()) {

List<CommandResult.Error> errors = new ArrayList<>(failedIds.size());
failedIds.forEach((documentId, throwable) -> errors.add(getError(documentId, throwable)));
// Ensure insertions and errors are in the input order (wrt unordered insertions),
// regardless of output format
Collections.sort(successfulInsertions);
if (!failedInsertions().isEmpty()) {
Collections.sort(
failedInsertions, Comparator.comparing(tuple -> tuple.getItem1().position()));
}

if (!returnDocumentResponses()) { // legacy output, limited to ids, error messages
List<CommandResult.Error> errors;
if (failedInsertions().isEmpty()) {
errors = null;
} else {
errors =
failedInsertions.stream()
.map(tuple -> getError(tuple.getItem1().document().id(), tuple.getItem2()))
.toList();
}
// Old style, simple ids:
List<DocumentId> insertedIds =
successfulInsertions.stream().map(docAndPos -> docAndPos.document().id()).toList();
return new CommandResult(null, Map.of(CommandStatus.INSERTED_IDS, insertedIds), errors);
}

// id no errors, just inserted ids
return new CommandResult(Map.of(CommandStatus.INSERTED_IDS, insertedIds));
// New style output: detailed responses.
InsertionResult[] results = new InsertionResult[allAttemptedInsertions().size()];
List<CommandResult.Error> errors = new ArrayList<>();

// Results array filled in order: first successful insertions
for (InsertOperation.WritableDocAndPosition docAndPos : successfulInsertions) {
results[docAndPos.position()] =
new InsertionResult(docAndPos.document().id(), InsertionStatus.OK, null);
}
// Second: failed insertions
for (Tuple2<InsertOperation.WritableDocAndPosition, Throwable> failed : failedInsertions) {
InsertOperation.WritableDocAndPosition docAndPos = failed.getItem1();
Throwable throwable = failed.getItem2();
CommandResult.Error error = getError(throwable);
int errorIdx = errors.indexOf(error);
if (errorIdx < 0) { // not yet added, add
tatu-at-datastax marked this conversation as resolved.
Show resolved Hide resolved
errorIdx = errors.size();
errors.add(error);
}
int idx = docAndPos.position();
results[idx] =
new InsertionResult(docAndPos.document().id(), InsertionStatus.ERROR, errorIdx);
}
// And third, if any, skipped
for (int i = 0; i < results.length; i++) {
if (null == results[i]) {
tatu-at-datastax marked this conversation as resolved.
Show resolved Hide resolved
results[i] =
new InsertionResult(
allAttemptedInsertions.get(i).document().id(), InsertionStatus.SKIPPED, null);
}
}
return new CommandResult(
null, Map.of(CommandStatus.DOCUMENT_RESPONSES, Arrays.asList(results)), errors);
}

private static CommandResult.Error getError(DocumentId documentId, Throwable throwable) {
String message =
"Failed to insert document with _id %s: %s".formatted(documentId, throwable.getMessage());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was not being used, not sure why it was there (ThrowableToErrorMapper handles this logic)

Map<String, Object> fields = new HashMap<>();
fields.put("exceptionClass", throwable.getClass().getSimpleName());
if (throwable instanceof JsonApiException jae) {
fields.put("errorCode", jae.getErrorCode().name());
}
return ThrowableToErrorMapper.getMapperWithMessageFunction().apply(throwable, message);
}

private static CommandResult.Error getError(Throwable throwable) {
return ThrowableToErrorMapper.getMapperWithMessageFunction()
.apply(throwable, throwable.getMessage());
}

/**
* Aggregates the result of the insert operation into this object.
*
* @param id ID of the document that was inserted written.
* @param failure If not null, means an error occurred during the write.
* @param docWithPosition Document that was inserted (or failed to be inserted)
* @param failure If not null, means an error occurred during attempted insertion
*/
public void aggregate(DocumentId id, Throwable failure) {
if (null != failure) {
failedIds.put(id, failure);
public void aggregate(InsertOperation.WritableDocAndPosition docWithPosition, Throwable failure) {
if (null == failure) {
successfulInsertions.add(docWithPosition);
} else {
insertedIds.add(id);
failedInsertions.add(Tuple2.of(docWithPosition, failure));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package io.stargate.sgv2.jsonapi.service.resolver.model.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertManyCommand;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;
import io.stargate.sgv2.jsonapi.service.operation.model.impl.InsertOperation;
import io.stargate.sgv2.jsonapi.service.projection.IndexingProjector;
import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver;
import io.stargate.sgv2.jsonapi.service.shredding.Shredder;
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
Expand All @@ -18,12 +16,10 @@
public class InsertManyCommandResolver implements CommandResolver<InsertManyCommand> {

private final Shredder shredder;
private final ObjectMapper objectMapper;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was not being used, removed.


@Inject
public InsertManyCommandResolver(Shredder shredder, ObjectMapper objectMapper) {
public InsertManyCommandResolver(Shredder shredder) {
this.shredder = shredder;
this.objectMapper = objectMapper;
}

@Override
Expand All @@ -33,15 +29,13 @@ public Class<InsertManyCommand> getCommandClass() {

@Override
public Operation resolveCommand(CommandContext ctx, InsertManyCommand command) {
final IndexingProjector projection = ctx.indexingProjector();
final List<WritableShreddedDocument> shreddedDocuments =
command.documents().stream().map(doc -> shredder.shred(ctx, doc, null)).toList();

// resolve ordered
InsertManyCommand.Options options = command.options();
boolean ordered = (null != options) && options.ordered();
boolean returnDocumentResponses = (null != options) && options.returnDocumentResponses();

boolean ordered = null != options && Boolean.TRUE.equals(options.ordered());

return new InsertOperation(ctx, shreddedDocuments, ordered);
return new InsertOperation(ctx, shreddedDocuments, ordered, false, returnDocumentResponses);
}
}
Loading