Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failure modes for update and delete operations #237

Merged
merged 14 commits into from
Mar 9, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple3;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
Expand Down Expand Up @@ -99,20 +100,15 @@ public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
// because it's already run twice before this
// check.
.atMost(retryLimit - 1);
});
})
.onItemOrFailure()
.transform((deleted, error) -> Tuple3.of(deleted, error, document.id()));
})
.collect()

// Count the successful deletes
.in(
AtomicInteger::new,
(atomicCounter, flag) -> {
if (flag) {
atomicCounter.incrementAndGet();
}
})
.asList()
.onItem()
.transform(deletedCounter -> new DeleteOperationPage(deletedCounter.get(), moreData.get()));
.transform(
deletedInformation -> new DeleteOperationPage(deletedInformation, moreData.get()));
}

private QueryOuterClass.Query buildDeleteQuery() {
Expand Down Expand Up @@ -167,10 +163,7 @@ private Uni<Boolean> deleteDocument(
} else {
// In case of successful document delete

throw new LWTException(
ErrorCode.CONCURRENCY_FAILURE,
"Delete failed for document with id %s because of concurrent transaction"
.formatted(document.id().value()));
throw new LWTException(ErrorCode.CONCURRENCY_FAILURE);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,77 @@
package io.stargate.sgv2.jsonapi.service.operation.model.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.smallrye.mutiny.tuples.Tuple3;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import io.stargate.sgv2.jsonapi.util.ExceptionUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* This represents the response for a delete operation.
*
* @param deletedCount - Count of documents deleted
* @param deletedInformation - List of Tuple3, each tuple 3 corresponds to document tried for
* deletion. Item1 boolean states if document is deleted, item2 contains the throwable and item3
* has the document id
* @param moreData - if `true` means more documents available in DB for the provided condition
*/
public record DeleteOperationPage(Integer deletedCount, boolean moreData)
public record DeleteOperationPage(
List<Tuple3<Boolean, Throwable, DocumentId>> deletedInformation, boolean moreData)
implements Supplier<CommandResult> {
private static final String ERROR = "Failed to delete documents with _id %s: %s";

@Override
public CommandResult get() {
int deletedCount =
(int)
deletedInformation.stream()
.filter(deletedDocument -> Boolean.TRUE.equals(deletedDocument.getItem1()))
.count();

// aggregate the errors by error code or error class
Multimap<String, Tuple3<Boolean, Throwable, DocumentId>> groupedErrorDeletes =
ArrayListMultimap.create();
deletedInformation.forEach(
deletedData -> {
if (deletedData.getItem2() != null) {
String key = ExceptionUtil.getThrowableGroupingKey(deletedData.getItem2());
groupedErrorDeletes.put(key, deletedData);
}
});

// Create error by error code or error class
List<CommandResult.Error> errors = new ArrayList<>(groupedErrorDeletes.size());
groupedErrorDeletes
.keySet()
.forEach(
key -> {
final Collection<Tuple3<Boolean, Throwable, DocumentId>> deletedDocuments =
groupedErrorDeletes.get(key);
final List<DocumentId> documentIds =
deletedDocuments.stream()
.map(deletes -> deletes.getItem3())
.collect(Collectors.toList());
errors.add(
ExceptionUtil.getError(
ERROR, documentIds, deletedDocuments.stream().findFirst().get().getItem2()));
});

if (moreData)
return new CommandResult(
Map.of(CommandStatus.DELETED_COUNT, deletedCount, CommandStatus.MORE_DATA, true));
else return new CommandResult(Map.of(CommandStatus.DELETED_COUNT, deletedCount));
null,
Map.of(CommandStatus.DELETED_COUNT, deletedCount, CommandStatus.MORE_DATA, true),
errors.isEmpty() ? null : errors);
else
return new CommandResult(
null,
Map.of(CommandStatus.DELETED_COUNT, deletedCount),
errors.isEmpty() ? null : errors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,4 @@ public class LWTException extends JsonApiException {
public LWTException(ErrorCode errorCode) {
super(errorCode);
}

public LWTException(ErrorCode errorCode, String message) {
super(errorCode, message);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package io.stargate.sgv2.jsonapi.service.operation.model.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import io.stargate.sgv2.jsonapi.util.ExceptionUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public record UpdateOperationPage(
int matchedCount,
Expand All @@ -19,17 +21,41 @@ public record UpdateOperationPage(
boolean returnDocs,
boolean moreDataFlag)
implements Supplier<CommandResult> {

private static final String ERROR = "Failed to update documents with _id %s: %s";

@Override
public CommandResult get() {
final DocumentId[] upsertedId = new DocumentId[1];
List<JsonNode> updatedDocs = new ArrayList<>(updatedDocuments().size());
List<CommandResult.Error> errors = new ArrayList<>();

// aggregate the errors by error code or error class
Multimap<String, ReadAndUpdateOperation.UpdatedDocument> groupedErrorUpdates =
ArrayListMultimap.create();
updatedDocuments.forEach(
update -> {
if (update.upserted()) upsertedId[0] = update.id();
if (returnDocs) updatedDocs.add(update.document());
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
if (update.error() != null) errors.add(getError(update.id(), update.error()));
//
if (update.error() != null) {
String key = ExceptionUtil.getThrowableGroupingKey(update.error());
groupedErrorUpdates.put(key, update);
}
});
// Create error by error code or error class
List<CommandResult.Error> errors = new ArrayList<>(groupedErrorUpdates.size());
groupedErrorUpdates
.keySet()
.forEach(
key -> {
final Collection<ReadAndUpdateOperation.UpdatedDocument> updatedDocuments =
groupedErrorUpdates.get(key);
final List<DocumentId> documentIds =
updatedDocuments.stream().map(update -> update.id()).collect(Collectors.toList());
errors.add(
ExceptionUtil.getError(
ERROR, documentIds, updatedDocuments.stream().findFirst().get().error()));
});
EnumMap<CommandStatus, Object> updateStatus = new EnumMap<>(CommandStatus.class);
if (upsertedId[0] != null) updateStatus.put(CommandStatus.UPSERTED_ID, upsertedId[0]);
updateStatus.put(CommandStatus.MATCHED_COUNT, matchedCount());
Expand All @@ -45,16 +71,4 @@ public CommandResult get() {
return new CommandResult(null, updateStatus, errors.isEmpty() ? null : errors);
}
}

private CommandResult.Error getError(DocumentId documentId, Throwable throwable) {
String message =
"Failed to update document with _id %s: %s".formatted(documentId, throwable.getMessage());

Map<String, Object> fields = new HashMap<>();
fields.put("exceptionClass", throwable.getClass().getSimpleName());
if (throwable instanceof JsonApiException jae) {
fields.put("errorCode", jae.getErrorCode().name());
}
return new CommandResult.Error(message, fields);
}
}
27 changes: 27 additions & 0 deletions src/main/java/io/stargate/sgv2/jsonapi/util/ExceptionUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.stargate.sgv2.jsonapi.util;

import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.exception.JsonApiException;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ExceptionUtil {
public static String getThrowableGroupingKey(Throwable error) {
String key = error.getClass().getSimpleName();
if (error instanceof JsonApiException jae) key = jae.getErrorCode().name();
return key;
}

public static CommandResult.Error getError(
String messageTemplate, List<DocumentId> documentIds, Throwable throwable) {
String message = messageTemplate.formatted(documentIds, throwable.getMessage());
Map<String, Object> fields = new HashMap<>();
fields.put("exceptionClass", throwable.getClass().getSimpleName());
if (throwable instanceof JsonApiException jae) {
fields.put("errorCode", jae.getErrorCode().name());
}
return new CommandResult.Error(message, fields);
}
}
Loading