Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failure modes for update and delete operations #237

Merged
merged 14 commits into from
Mar 9, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple3;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
Expand Down Expand Up @@ -99,20 +100,15 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
// because it's already run twice before this
// check.
.atMost(retryLimit - 1);
});
})
.onItemOrFailure()
.transform((deleted, error) -> Tuple3.of(deleted, error, document.id()));
})
.collect()

// Count the successful deletes
.in(
AtomicInteger::new,
(atomicCounter, flag) -> {
if (flag) {
atomicCounter.incrementAndGet();
}
})
.asList()
.onItem()
.transform(deletedCounter -> new DeleteOperationPage(deletedCounter.get(), moreData.get()));
.transform(
deletedInformation -> new DeleteOperationPage(deletedInformation, moreData.get()));
}

private QueryOuterClass.Query buildDeleteQuery() {
Expand Down Expand Up @@ -167,10 +163,7 @@ private Uni<Boolean> deleteDocument(
} else {
// In case of successful document delete

throw new LWTException(
ErrorCode.CONCURRENCY_FAILURE,
"Delete failed for document with id %s because of concurrent transaction"
.formatted(document.id().value()));
throw new LWTException(ErrorCode.CONCURRENCY_FAILURE);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,91 @@
package io.stargate.sgv2.jsonapi.service.operation.model.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.smallrye.mutiny.tuples.Tuple3;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* This represents the response for a delete operation.
*
* @param deletedCount - Count of documents deleted
* @param deletedInformation - List of Tuple3, each tuple 3 corresponds to document tried for
* deletion. Item1 boolean states if document is deleted, item2 contains the throwable and item3
* has the document id
* @param moreData - if `true` means more documents available in DB for the provided condition
*/
public record DeleteOperationPage(Integer deletedCount, boolean moreData)
public record DeleteOperationPage(
List<Tuple3<Boolean, Throwable, DocumentId>> deletedInformation, boolean moreData)
implements Supplier<CommandResult> {
@Override
public CommandResult get() {
int deletedCount =
(int)
deletedInformation.stream()
.filter(
deletedDocument ->
deletedDocument.getItem1() != null && deletedDocument.getItem1())
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
.count();

// aggregate the errors by error code or error class
Multimap<String, Tuple3<Boolean, Throwable, DocumentId>> groupedErrorDeletes =
ArrayListMultimap.create();
deletedInformation.forEach(
deletedData -> {
if (deletedData.getItem2() != null) {
String key = deletedData.getItem2().getClass().getSimpleName();
if (deletedData.getItem2() instanceof JsonApiException jae)
key = jae.getErrorCode().name();
groupedErrorDeletes.put(key, deletedData);
}
});

// Create error by error code or error class
List<CommandResult.Error> errors = new ArrayList<>(groupedErrorDeletes.size());
groupedErrorDeletes
.keySet()
.forEach(
key -> {
final Collection<Tuple3<Boolean, Throwable, DocumentId>> deletedDocuments =
groupedErrorDeletes.get(key);
final List<DocumentId> documentIds =
deletedDocuments.stream()
.map(deletes -> deletes.getItem3())
.collect(Collectors.toList());
errors.add(
getError(documentIds, deletedDocuments.stream().findFirst().get().getItem2()));
});

if (moreData)
return new CommandResult(
Map.of(CommandStatus.DELETED_COUNT, deletedCount, CommandStatus.MORE_DATA, true));
else return new CommandResult(Map.of(CommandStatus.DELETED_COUNT, deletedCount));
null,
Map.of(CommandStatus.DELETED_COUNT, deletedCount, CommandStatus.MORE_DATA, true),
errors.isEmpty() ? null : errors);
else
return new CommandResult(
null,
Map.of(CommandStatus.DELETED_COUNT, deletedCount),
errors.isEmpty() ? null : errors);
}

private CommandResult.Error getError(List<DocumentId> documentIds, Throwable throwable) {
String message =
"Failed to delete documents with _id %s: %s".formatted(documentIds, throwable.getMessage());

Map<String, Object> fields = new HashMap<>();
fields.put("exceptionClass", throwable.getClass().getSimpleName());
if (throwable instanceof JsonApiException jae) {
fields.put("errorCode", jae.getErrorCode().name());
}
return new CommandResult.Error(message, fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,4 @@ public class LWTException extends JsonApiException {
public LWTException(ErrorCode errorCode) {
super(errorCode);
}

public LWTException(ErrorCode errorCode, String message) {
super(errorCode, message);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package io.stargate.sgv2.jsonapi.service.operation.model.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public record UpdateOperationPage(
int matchedCount,
Expand All @@ -23,13 +27,34 @@ public record UpdateOperationPage(
public CommandResult get() {
final DocumentId[] upsertedId = new DocumentId[1];
List<JsonNode> updatedDocs = new ArrayList<>(updatedDocuments().size());
List<CommandResult.Error> errors = new ArrayList<>();

// aggregate the errors by error code or error class
Multimap<String, ReadAndUpdateOperation.UpdatedDocument> groupedErrorUpdates =
ArrayListMultimap.create();
updatedDocuments.forEach(
update -> {
if (update.upserted()) upsertedId[0] = update.id();
if (returnDocs) updatedDocs.add(update.document());
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
if (update.error() != null) errors.add(getError(update.id(), update.error()));
//
if (update.error() != null) {
String key = update.error().getClass().getSimpleName();
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
if (update.error() instanceof JsonApiException jae) key = jae.getErrorCode().name();
groupedErrorUpdates.put(key, update);
}
});
// Create error by error code or error class
List<CommandResult.Error> errors = new ArrayList<>(groupedErrorUpdates.size());
groupedErrorUpdates
.keySet()
.forEach(
key -> {
final Collection<ReadAndUpdateOperation.UpdatedDocument> updatedDocuments =
groupedErrorUpdates.get(key);
final List<DocumentId> documentIds =
updatedDocuments.stream().map(update -> update.id()).collect(Collectors.toList());
errors.add(
getError(documentIds, updatedDocuments.stream().findFirst().get().error()));
});
EnumMap<CommandStatus, Object> updateStatus = new EnumMap<>(CommandStatus.class);
if (upsertedId[0] != null) updateStatus.put(CommandStatus.UPSERTED_ID, upsertedId[0]);
updateStatus.put(CommandStatus.MATCHED_COUNT, matchedCount());
Expand All @@ -46,9 +71,9 @@ public CommandResult get() {
}
}

private CommandResult.Error getError(DocumentId documentId, Throwable throwable) {
private CommandResult.Error getError(List<DocumentId> documentIds, Throwable throwable) {
String message =
"Failed to update document with _id %s: %s".formatted(documentId, throwable.getMessage());
"Failed to update documents with _id %s: %s".formatted(documentIds, throwable.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that similar to exception key, formatting of documentIds probably should be a shared utility method since we may want to add quoting/escaping at some point -- start with just List.toString() for now, but I have found that it is good to distinguish f.ex between number 1 and String "1"... especially as String keys can have all kind of content.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of this, since the only value where simple "value.toString()" may not be sufficient is String (where we'd want to surround value in quotes), we could easily change toString() method for DocumentId.StringId and the message would be changed?

Downside is that there are lots of tests that compare exact exception message.

I can do a PR after merging this one so as not to extend scope here.


Map<String, Object> fields = new HashMap<>();
fields.put("exceptionClass", throwable.getClass().getSimpleName());
Expand Down
Loading