Skip to content

Commit

Permalink
Update Many command implementation (#192)
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshrajamani authored Feb 24, 2023
1 parent a6bc2a2 commit 29d23f7
Show file tree
Hide file tree
Showing 22 changed files with 1,350 additions and 166 deletions.
41 changes: 39 additions & 2 deletions src/main/java/io/stargate/sgv2/jsonapi/StargateJsonApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,24 @@
}
}
"""),
@ExampleObject(
name = "updateMany",
summary = "`updateMany` command",
value =
"""
{
"updateMany": {
"filter": {"location": "London"},
"update": {
"$set": {"location": "New York"},
"$push": {"tags": "marathon"}
},
"options" : {
"upsert" : true
}
}
}
"""),
@ExampleObject(
name = "deleteOne",
summary = "`deleteOne` command",
Expand Down Expand Up @@ -313,7 +331,9 @@
],
"count": 1,
"status": {
"updatedIds": ["1"]
"upsertedId": "1",
"matchedCount": 0,
"modifiedCount": 1,
}
}
}
Expand All @@ -325,11 +345,28 @@
"""
{
"status": {
"updatedIds": ["1"]
"upsertedId": "1",
"matchedCount": 0,
"modifiedCount": 1,
}
}
}
"""),
@ExampleObject(
name = "resultUpdateMany",
summary = "`updateMany` command result",
value =
"""
{
"status": {
"upsertedId": "1",
"matchedCount": 0,
"modifiedCount": 1,
"moreData" : true
}
}
}
"""),
@ExampleObject(
name = "resultInsert",
summary = "Insert command result",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.impl.FindOneCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertManyCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertOneCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.UpdateManyCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.UpdateOneCommand;
import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver;

Expand Down Expand Up @@ -48,6 +49,7 @@
@JsonSubTypes.Type(value = FindOneAndUpdateCommand.class),
@JsonSubTypes.Type(value = InsertOneCommand.class),
@JsonSubTypes.Type(value = InsertManyCommand.class),
@JsonSubTypes.Type(value = UpdateManyCommand.class),
@JsonSubTypes.Type(value = UpdateOneCommand.class),
})
public interface Command {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ public enum CommandStatus {
/** The element has the list of inserted ids */
@JsonProperty("insertedIds")
INSERTED_IDS,
/** The element has the count of document read for the update operation */
@JsonProperty("matchedCount")
MATCHED_COUNT,

/** The element has the count of document modified for the update operation */
@JsonProperty("modifiedCount")
MODIFIED_COUNT,
/**
* The element with boolean 'true' represents if more document to be processed for updateMany and
* deleteMany commands
Expand All @@ -22,7 +29,10 @@ public enum CommandStatus {
/** The element has value 1 if collection is created */
@JsonProperty("ok")
OK,
/** The element has the list of updated ids */
@JsonProperty("updatedIds")
UPDATED_IDS;
/**
* The element has the document id of newly inserted document part of update, when upserted option
* is 'true' and no document available in DB for matching condition
*/
@JsonProperty("upsertedId")
UPSERTED_ID;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.stargate.sgv2.jsonapi.api.model.command.impl;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.stargate.sgv2.jsonapi.api.model.command.Filterable;
import io.stargate.sgv2.jsonapi.api.model.command.ReadCommand;
import io.stargate.sgv2.jsonapi.api.model.command.clause.filter.FilterClause;
import io.stargate.sgv2.jsonapi.api.model.command.clause.update.UpdateClause;
import javax.annotation.Nullable;
import javax.validation.Valid;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

@Schema(
description =
"Command that finds documents from a collection and updates it with the values provided in the update clause.")
@JsonTypeName("updateMany")
public record UpdateManyCommand(
@Valid @JsonProperty("filter") FilterClause filterClause,
@Valid @JsonProperty("update") UpdateClause updateClause,
@Nullable Options options)
implements ReadCommand, Filterable {
public record Options(boolean upsert) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.stargate.sgv2.jsonapi.api.model.command.impl.FindOneCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertManyCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertOneCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.UpdateManyCommand;
import io.stargate.sgv2.jsonapi.api.model.command.impl.UpdateOneCommand;
import io.stargate.sgv2.jsonapi.config.constants.OpenApiConstants;
import io.stargate.sgv2.jsonapi.service.processor.CommandProcessor;
Expand Down Expand Up @@ -76,6 +77,7 @@ public CollectionResource(CommandProcessor commandProcessor) {
FindOneAndUpdateCommand.class,
InsertOneCommand.class,
InsertManyCommand.class,
UpdateManyCommand.class,
UpdateOneCommand.class
}),
examples = {
Expand All @@ -87,6 +89,7 @@ public CollectionResource(CommandProcessor commandProcessor) {
@ExampleObject(ref = "findOneAndUpdate"),
@ExampleObject(ref = "insertOne"),
@ExampleObject(ref = "insertMany"),
@ExampleObject(ref = "updateMany"),
@ExampleObject(ref = "updateOne"),
}))
@APIResponses(
Expand All @@ -105,6 +108,7 @@ public CollectionResource(CommandProcessor commandProcessor) {
@ExampleObject(ref = "resultInsert"),
@ExampleObject(ref = "resultError"),
@ExampleObject(ref = "resultDelete"),
@ExampleObject(ref = "resultUpdateMany"),
@ExampleObject(ref = "resultUpdateOne"),
})))
@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,20 @@ public interface DocumentConfig {
int maxLimit();

/**
* @return Defines the maximum limit of document that can be returned for a request, defaults to
* @return Defines the maximum limit of document that can be deleted for a request, defaults to
* <code>20</code>.
*/
@Max(100)
@Positive
@WithDefault("20")
int maxDocumentDeleteCount();

/**
* @return Defines the maximum limit of document that can be updated for a request, defaults to
* <code>20</code>.
*/
@Max(100)
@Positive
@WithDefault("20")
int maxDocumentUpdateCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String paging
}

@Override
public ReadDocument getEmptyDocuments() {
public ReadDocument getNewDocument() {
throw new JsonApiException(ErrorCode.UNSUPPORTED_OPERATION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ default Uni<CountResponse> countDocuments(
*
* @return
*/
ReadDocument getEmptyDocuments();
ReadDocument getNewDocument();

record FindResponse(List<ReadDocument> docs, String pagingState) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Uni<FindResponse> getDocuments(QueryExecutor queryExecutor, String paging
}

@Override
public ReadDocument getEmptyDocuments() {
public ReadDocument getNewDocument() {
ObjectNode rootNode = objectMapper().createObjectNode();
DocumentId documentId = null;
for (DBFilterBase filter : filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,118 @@
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
import io.stargate.sgv2.jsonapi.service.updater.DocumentUpdater;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;

/**
* This operation method is used for 3 commands findOneAndUpdate, updateOne and updateMany
*
* @param commandContext
* @param readOperation
* @param documentUpdater
* @param returnDocumentInResponse
* @param returnUpdatedDocument
* @param upsert
* @param shredder
* @param updateLimit
*/
public record ReadAndUpdateOperation(
CommandContext commandContext,
ReadOperation readOperation,
DocumentUpdater documentUpdater,
boolean returnDocumentInResponse,
boolean returnUpdatedDocument,
boolean upsert,
Shredder shredder)
Shredder shredder,
int updateLimit)
implements ModifyOperation {

@Override
public Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor) {
Uni<ReadOperation.FindResponse> docsToUpate = readOperation().getDocuments(queryExecutor, null);
final AtomicBoolean moreDataFlag = new AtomicBoolean(false);
final Multi<ReadOperation.FindResponse> findResponses =
Multi.createBy()
.repeating()
.uni(
() -> new AtomicReference<String>(null),
stateRef -> {
Uni<ReadOperation.FindResponse> docsToUpdate =
readOperation().getDocuments(queryExecutor, stateRef.get());
return docsToUpdate
.onItem()
.invoke(findResponse -> stateRef.set(findResponse.pagingState()));
})
.whilst(findResponse -> findResponse.pagingState() != null);
final AtomicInteger matchedCount = new AtomicInteger(0);
final AtomicInteger modifiedCount = new AtomicInteger(0);

final Uni<List<UpdatedDocument>> updatedDocuments =
docsToUpate
findResponses
.onItem()
.transformToMulti(
findResponse -> {
if (findResponse.docs().isEmpty()) {
if (upsert) {
return Multi.createFrom().item(readOperation().getEmptyDocuments());
final List<ReadDocument> docs = findResponse.docs();
if (upsert() && docs.size() == 0 && matchedCount.get() == 0) {
return Multi.createFrom().item(readOperation().getNewDocument());
} else {
// Below conditionality is because we read up to deleteLimit +1 record.
if (matchedCount.get() + docs.size() <= updateLimit) {
matchedCount.addAndGet(docs.size());
return Multi.createFrom().items(docs.stream());
} else {
return Multi.createFrom().items(Stream.empty());
int needed = updateLimit - matchedCount.get();
matchedCount.addAndGet(needed);

moreDataFlag.set(true);
return Multi.createFrom()
.items(findResponse.docs().subList(0, needed).stream());
}
} else {
return Multi.createFrom().items(findResponse.docs().stream());
}
})
.concatenate()
.onItem()
.transformToUniAndConcatenate(
readDocument -> {
JsonNode updatedDocument =
DocumentUpdater.DocumentUpdaterResponse documentUpdaterResponse =
documentUpdater().applyUpdates(readDocument.document().deepCopy());
WritableShreddedDocument writableShreddedDocument =
shredder().shred(updatedDocument, readDocument.txnId());
final JsonNode originalDocument =
readDocument.txnId() == null ? null : readDocument.document();
JsonNode updatedDocument = documentUpdaterResponse.document();
Uni<DocumentId> updated = Uni.createFrom().nullItem();
if (documentUpdaterResponse.modified()) {
WritableShreddedDocument writableShreddedDocument =
shredder().shred(updatedDocument, readDocument.txnId());
updated = updatedDocument(queryExecutor, writableShreddedDocument);
}
final JsonNode documentToReturn =
returnUpdatedDocument ? updatedDocument : originalDocument;
return updatedDocument(queryExecutor, writableShreddedDocument)
return updated
.onItem()
.transform(v -> new UpdatedDocument(readDocument.id(), documentToReturn));
.ifNotNull()
.transform(
v -> {
if (readDocument.txnId() != null) modifiedCount.incrementAndGet();
return new UpdatedDocument(
readDocument.id(),
readDocument.txnId() == null,
returnDocumentInResponse ? documentToReturn : null);
});
})
.collect()
.asList();

return updatedDocuments
.onItem()
.transform(updates -> new UpdateOperationPage(updates, returnDocumentInResponse()));
.transform(
updates ->
new UpdateOperationPage(
matchedCount.get(),
modifiedCount.get(),
updates,
returnDocumentInResponse(),
moreDataFlag.get()));
}

private Uni<DocumentId> updatedDocument(
Expand Down Expand Up @@ -133,5 +194,5 @@ protected static QueryOuterClass.Query bindUpdateValues(
return QueryOuterClass.Query.newBuilder(builtQuery).setValues(values).build();
}

record UpdatedDocument(DocumentId id, JsonNode document) {}
record UpdatedDocument(DocumentId id, boolean upserted, JsonNode document) {}
}
Loading

0 comments on commit 29d23f7

Please sign in to comment.