diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertAttempt.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertAttempt.java index 7185e81271..fffab72a05 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertAttempt.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertAttempt.java @@ -1,5 +1,7 @@ package io.stargate.sgv2.jsonapi.service.operation.model; +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; +import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow; import java.util.Optional; // TODO AARON COMMENTS, comparable so we can re-order the inserts when building the results @@ -7,10 +9,16 @@ public interface InsertAttempt extends Comparable { int position(); - WritableDocRow docRow(); + // AARON comments this is here because we may not have the row if we fail to shred. + // But what if we dont have enough to get the row id + Optional docRowID(); + + Optional docRow(); Optional failure(); + InsertAttempt maybeAddFailure(Throwable failure); + @Override default int compareTo(InsertAttempt o) { return Integer.compare(position(), o.position()); diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertOperationPage.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertOperationPage.java index 77f02bcad0..fad3569524 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertOperationPage.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/InsertOperationPage.java @@ -5,11 +5,8 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus; import io.stargate.sgv2.jsonapi.exception.mappers.ThrowableToErrorMapper; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; +import java.util.*; import java.util.function.Supplier; /** @@ -76,8 +73,8 @@ public CommandResult get() { // Note: See DocRowIdentifer, it has an attribute that will be called for JSON serialization List insertedIds = successfulInsertions.stream() - .map(InsertAttempt::docRow) - .map(WritableDocRow::docRowID) + .map(InsertAttempt::docRowID) + .map(Optional::orElseThrow) .toList(); return new CommandResult(null, Map.of(CommandStatus.INSERTED_IDS, insertedIds), errors); } @@ -91,7 +88,7 @@ public CommandResult get() { // Results array filled in order: first successful insertions for (InsertAttempt okInsertion : successfulInsertions) { results[okInsertion.position()] = - new InsertionResult(okInsertion.docRow().docRowID(), InsertionStatus.OK, null); + new InsertionResult(okInsertion.docRowID().orElseThrow(), InsertionStatus.OK, null); } // Second: failed insertions; output in order of insertion for (InsertAttempt failedInsertion : failedInsertions) { @@ -106,7 +103,8 @@ public CommandResult get() { errors.add(error); } results[failedInsertion.position()] = - new InsertionResult(failedInsertion.docRow().docRowID(), InsertionStatus.ERROR, errorIdx); + new InsertionResult( + failedInsertion.docRowID().orElseThrow(), InsertionStatus.ERROR, errorIdx); } // And third, if any, skipped insertions; those that were not attempted (f.ex due @@ -115,7 +113,7 @@ public CommandResult get() { if (null == results[i]) { results[i] = new InsertionResult( - allInsertions.get(i).docRow().docRowID(), InsertionStatus.SKIPPED, null); + allInsertions.get(i).docRowID().orElseThrow(), InsertionStatus.SKIPPED, null); } } return new CommandResult( @@ -126,7 +124,7 @@ private static CommandResult.Error getOldStyleError(InsertAttempt insertAttempt) String message = "Failed to insert document with _id %s: %s" .formatted( - insertAttempt.docRow().docRowID(), + insertAttempt.docRowID().orElseThrow(), insertAttempt .failure() .map(Throwable::getMessage) diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/CollectionInsertAttempt.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/CollectionInsertAttempt.java index ecff746d8a..a764ad2b4e 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/CollectionInsertAttempt.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/CollectionInsertAttempt.java @@ -1,7 +1,8 @@ package io.stargate.sgv2.jsonapi.service.operation.model.collections; import io.stargate.sgv2.jsonapi.service.operation.model.InsertAttempt; -import io.stargate.sgv2.jsonapi.service.operation.model.WritableDocRow; +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; +import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow; import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId; import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument; import java.util.ArrayList; @@ -21,9 +22,7 @@ public class CollectionInsertAttempt implements InsertAttempt { private final int position; public final WritableShreddedDocument document; - // TODO Aaron - once work out why the document can be null, work out if can remove the documentID - // and get it from the doc - public final DocumentId documentId; + private final DocumentId documentId; private Throwable failure; @@ -60,8 +59,13 @@ public int position() { } @Override - public WritableDocRow docRow() { - return document; + public Optional docRowID() { + return Optional.ofNullable(documentId); + } + + @Override + public Optional docRow() { + return Optional.ofNullable(document); } @Override @@ -69,8 +73,9 @@ public Optional failure() { return Optional.ofNullable(failure); } - public CollectionInsertAttempt addFailure(Throwable failure) { - if (failure != null) { + @Override + public InsertAttempt maybeAddFailure(Throwable failure) { + if (this.failure == null) { this.failure = failure; } return this; diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/InsertOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/InsertOperation.java index 80afaf7400..f6b2c87568 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/InsertOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/collections/InsertOperation.java @@ -120,7 +120,7 @@ private Uni> insertOrdered( // wrap item and failure // the collection can decide how to react on failure .onItemOrFailure() - .transform((id, t) -> insertion.addFailure(t))) + .transform((id, t) -> insertion.maybeAddFailure(t))) .concatenate(false) // if no failures reduce to the op page @@ -175,7 +175,7 @@ private Uni> insertUnordered( offlineMode) // handle errors fail silent mode .onItemOrFailure() - .transform((id, t) -> insertion.addFailure(t))) + .transform((id, t) -> insertion.maybeAddFailure(t))) // then reduce here .collect() .in( diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/InsertTableOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/InsertTableOperation.java new file mode 100644 index 0000000000..22860900a5 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/InsertTableOperation.java @@ -0,0 +1,94 @@ +package io.stargate.sgv2.jsonapi.service.operation.model.tables; + +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.querybuilder.term.Term; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; +import io.stargate.sgv2.jsonapi.api.model.command.CommandResult; +import io.stargate.sgv2.jsonapi.api.request.DataApiRequestInfo; +import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor; +import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject; +import io.stargate.sgv2.jsonapi.service.operation.model.InsertOperationPage; +import io.stargate.sgv2.jsonapi.service.shredding.tables.WriteableTableRow; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class InsertTableOperation extends TableMutationOperation { + + private final List insertAttempts; + + // TODO AARON JSON to start with, need a document object + public InsertTableOperation( + CommandContext commandContext, List insertAttempts) { + super(commandContext); + this.insertAttempts = List.copyOf(insertAttempts); + } + + @Override + public Uni> execute( + DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor) { + + // TODO AARON - this is for unordered, copy from Collection InsertOperation insertUnordered + return Multi.createFrom() + .iterable(insertAttempts) + // merge to make it parallel + .onItem() + .transformToUniAndMerge( + insertion -> insertRow(dataApiRequestInfo, queryExecutor, insertion)) + // then reduce here + .collect() + .in(() -> new InsertOperationPage(insertAttempts, false), InsertOperationPage::aggregate) + // use object identity to resolve to Supplier + // TODO AARON - not sure what this is doing, original was .map(i -> i) + .map(Function.identity()); + } + + private Uni insertRow( + DataApiRequestInfo dataApiRequestInfo, + QueryExecutor queryExecutor, + TableInsertAttempt insertAttempt) { + + // First things first: did we already fail? If so, propagate + if (insertAttempt.failure().isPresent()) { + return Uni.createFrom().failure(insertAttempt.failure().get()); + } + + // bind and execute + var boundStatement = buildInsertStatement(queryExecutor, insertAttempt.row().orElseThrow()); + + // TODO: AARON What happens to errors here? + return queryExecutor + .executeWrite(dataApiRequestInfo, boundStatement) + .onItemOrFailure() + .transform( + (result, t) -> { + if (t != null) { + return (TableInsertAttempt) insertAttempt.maybeAddFailure(t); + } + // This is where to check result.wasApplied() if this was a LWT + return insertAttempt; + }) + .onItemOrFailure() + .transform((ia, throwable) -> (TableInsertAttempt) ia.maybeAddFailure(throwable)); + } + + private SimpleStatement buildInsertStatement(QueryExecutor queryExecutor, WriteableTableRow row) { + + Map colValues = + row.allColumnValues().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> literal(e.getValue()))); + + return insertInto( + commandContext.schemaObject().name.keyspace(), + commandContext.schemaObject().name.table()) + .valuesByIds(colValues) + .build(); + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableInsertAttempt.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableInsertAttempt.java new file mode 100644 index 0000000000..7db07d4984 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableInsertAttempt.java @@ -0,0 +1,84 @@ +package io.stargate.sgv2.jsonapi.service.operation.model.tables; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.stargate.sgv2.jsonapi.service.operation.model.InsertAttempt; +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; +import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow; +import io.stargate.sgv2.jsonapi.service.shredding.tables.RowId; +import io.stargate.sgv2.jsonapi.service.shredding.tables.RowShredder; +import io.stargate.sgv2.jsonapi.service.shredding.tables.WriteableTableRow; +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +public class TableInsertAttempt implements InsertAttempt { + + private final int position; + private final RowId rowId; + private final WriteableTableRow row; + private Throwable failure; + + private TableInsertAttempt(int position, RowId rowId, WriteableTableRow row) { + this.position = position; + this.rowId = rowId; + this.row = row; + } + + public static List create(RowShredder shredder, JsonNode document) { + return create(shredder, List.of(document)); + } + + public static List create(RowShredder shredder, List documents) { + Preconditions.checkNotNull(shredder, "shredder cannot be null"); + Preconditions.checkNotNull(documents, "documents cannot be null"); + + return IntStream.range(0, documents.size()) + .mapToObj( + i -> { + WriteableTableRow row; + try { + row = shredder.shred(documents.get(i)); + } catch (Exception e) { + // TODO: need a shredding base excpetion to catch + // TODO: we need to get the row id, so we can return it in the response + return (TableInsertAttempt) + new TableInsertAttempt(i, null, null).maybeAddFailure(e); + } + return new TableInsertAttempt(i, row.id(), row); + }) + .toList(); + } + + public Optional row() { + return Optional.ofNullable(row); + } + + @Override + public int position() { + return position; + } + + @Override + public Optional docRowID() { + return Optional.ofNullable(rowId); + } + + @Override + public Optional docRow() { + return Optional.ofNullable(row); + } + + @Override + public Optional failure() { + return Optional.ofNullable(failure); + } + + @Override + public InsertAttempt maybeAddFailure(Throwable failure) { + if (this.failure == null) { + this.failure = failure; + } + return this; + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableMutationOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableMutationOperation.java index 18edd2f0eb..16a7c104a1 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableMutationOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableMutationOperation.java @@ -1,4 +1,12 @@ package io.stargate.sgv2.jsonapi.service.operation.model.tables; +import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; +import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject; + /** For now, a marker class / interface for operations that modify data in a table. */ -abstract class TableMutationOperation extends TableOperation {} +abstract class TableMutationOperation extends TableOperation { + + protected TableMutationOperation(CommandContext commandContext) { + super(commandContext); + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableOperation.java index d67ff8d18d..b34216c405 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableOperation.java @@ -1,9 +1,20 @@ package io.stargate.sgv2.jsonapi.service.operation.model.tables; +import com.google.common.base.Preconditions; +import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; +import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject; import io.stargate.sgv2.jsonapi.service.operation.model.Operation; /** * Base for any operations that works with CQL Tables with rows, rather than Collections of * Documents */ -abstract class TableOperation implements Operation {} +abstract class TableOperation implements Operation { + + protected final CommandContext commandContext; + + protected TableOperation(CommandContext commandContext) { + Preconditions.checkNotNull(commandContext, "commandContext cannot be null"); + this.commandContext = commandContext; + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableReadOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableReadOperation.java index 581c1f50ec..0f2ea64196 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableReadOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableReadOperation.java @@ -8,15 +8,12 @@ /** For now, a marker class / interface for operations that read data in a table. */ abstract class TableReadOperation extends TableOperation { - protected final CommandContext commandContext; protected final LogicalExpression logicalExpression; public TableReadOperation( CommandContext commandContext, LogicalExpression logicalExpression) { - - Preconditions.checkNotNull(commandContext, "commandContext cannot be null"); + super(commandContext); Preconditions.checkNotNull(logicalExpression, "logicalExpression cannot be null"); - this.commandContext = commandContext; this.logicalExpression = logicalExpression; } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableSchemaOperation.java b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableSchemaOperation.java index 686234f061..cf746c258c 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableSchemaOperation.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/tables/TableSchemaOperation.java @@ -1,4 +1,12 @@ package io.stargate.sgv2.jsonapi.service.operation.model.tables; +import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; +import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject; + /** For now, a marker class / interface for operations that modify schema */ -abstract class TableSchemaOperation extends TableOperation {} +abstract class TableSchemaOperation extends TableOperation { + + protected TableSchemaOperation(CommandContext commandContext) { + super(commandContext); + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertManyCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertManyCommandResolver.java index 22c487f3cc..ff26de5f05 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertManyCommandResolver.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertManyCommandResolver.java @@ -4,13 +4,17 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertManyCommand; import io.stargate.sgv2.jsonapi.service.cqldriver.executor.CollectionSchemaObject; +import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject; import io.stargate.sgv2.jsonapi.service.operation.model.Operation; import io.stargate.sgv2.jsonapi.service.operation.model.collections.CollectionInsertAttempt; import io.stargate.sgv2.jsonapi.service.operation.model.collections.InsertOperation; +import io.stargate.sgv2.jsonapi.service.operation.model.tables.InsertTableOperation; +import io.stargate.sgv2.jsonapi.service.operation.model.tables.TableInsertAttempt; import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver; import io.stargate.sgv2.jsonapi.service.shredding.Shredder; import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId; import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument; +import io.stargate.sgv2.jsonapi.service.shredding.tables.RowShredder; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.ArrayList; @@ -21,11 +25,13 @@ @ApplicationScoped public class InsertManyCommandResolver implements CommandResolver { - private final Shredder shredder; + private final Shredder documentShredder; + private final RowShredder rowShredder; @Inject - public InsertManyCommandResolver(Shredder shredder) { - this.shredder = shredder; + public InsertManyCommandResolver(Shredder documentShredder, RowShredder rowShredder) { + this.documentShredder = documentShredder; + this.rowShredder = rowShredder; } @Override @@ -51,13 +57,22 @@ public Operation resolveCollectionCommand( AtomicReference idRef = new AtomicReference<>(); try { final WritableShreddedDocument shredded = - shredder.shred(ctx, inputDocs.get(pos), null, idRef); + documentShredder.shred(ctx, inputDocs.get(pos), null, idRef); attempt = CollectionInsertAttempt.from(pos, shredded); } catch (Exception e) { + // TODO: need a base Shredding exception to catch attempt = new CollectionInsertAttempt(pos, idRef.get(), e); } insertions.add(attempt); } return new InsertOperation(ctx, insertions, ordered, false, returnDocumentResponses); } + + @Override + public Operation resolveTableCommand( + CommandContext ctx, InsertManyCommand command) { + + return new InsertTableOperation( + ctx, TableInsertAttempt.create(rowShredder, command.documents())); + } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertOneCommandResolver.java b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertOneCommandResolver.java index 607046168e..0017e5c8ee 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertOneCommandResolver.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/resolver/model/impl/InsertOneCommandResolver.java @@ -3,11 +3,15 @@ import io.stargate.sgv2.jsonapi.api.model.command.CommandContext; import io.stargate.sgv2.jsonapi.api.model.command.impl.InsertOneCommand; import io.stargate.sgv2.jsonapi.service.cqldriver.executor.CollectionSchemaObject; +import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject; import io.stargate.sgv2.jsonapi.service.operation.model.Operation; import io.stargate.sgv2.jsonapi.service.operation.model.collections.InsertOperation; +import io.stargate.sgv2.jsonapi.service.operation.model.tables.InsertTableOperation; +import io.stargate.sgv2.jsonapi.service.operation.model.tables.TableInsertAttempt; import io.stargate.sgv2.jsonapi.service.resolver.model.CommandResolver; import io.stargate.sgv2.jsonapi.service.shredding.Shredder; import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument; +import io.stargate.sgv2.jsonapi.service.shredding.tables.RowShredder; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -15,11 +19,13 @@ @ApplicationScoped public class InsertOneCommandResolver implements CommandResolver { - private final Shredder shredder; + private final Shredder documentShredder; + private final RowShredder rowShredder; @Inject - public InsertOneCommandResolver(Shredder shredder) { - this.shredder = shredder; + public InsertOneCommandResolver(Shredder documentShredder, RowShredder rowShredder) { + this.documentShredder = documentShredder; + this.rowShredder = rowShredder; } @Override @@ -31,7 +37,7 @@ public Class getCommandClass() { public Operation resolveCollectionCommand( CommandContext ctx, InsertOneCommand command) { WritableShreddedDocument shreddedDocument = - shredder.shred( + documentShredder.shred( command.document(), null, ctx.schemaObject().indexingProjector(), @@ -40,4 +46,12 @@ public Operation resolveCollectionCommand( null); return InsertOperation.create(ctx, shreddedDocument); } + + @Override + public Operation resolveTableCommand( + CommandContext ctx, InsertOneCommand command) { + + return new InsertTableOperation( + ctx, TableInsertAttempt.create(rowShredder, command.document())); + } } diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/DocRowIdentifer.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/DocRowIdentifer.java similarity index 81% rename from src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/DocRowIdentifer.java rename to src/main/java/io/stargate/sgv2/jsonapi/service/shredding/DocRowIdentifer.java index ae0a5cee06..48371b899c 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/DocRowIdentifer.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/DocRowIdentifer.java @@ -1,4 +1,4 @@ -package io.stargate.sgv2.jsonapi.service.operation.model; +package io.stargate.sgv2.jsonapi.service.shredding; import com.fasterxml.jackson.annotation.JsonValue; diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/WritableDocRow.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/WritableDocRow.java similarity index 53% rename from src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/WritableDocRow.java rename to src/main/java/io/stargate/sgv2/jsonapi/service/shredding/WritableDocRow.java index d2eb5222e0..ee6e62a590 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/operation/model/WritableDocRow.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/WritableDocRow.java @@ -1,4 +1,4 @@ -package io.stargate.sgv2.jsonapi.service.operation.model; +package io.stargate.sgv2.jsonapi.service.shredding; public interface WritableDocRow { diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java index 8184960d72..617dfbddc9 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/DocumentId.java @@ -8,7 +8,7 @@ import io.stargate.sgv2.jsonapi.config.constants.DocumentConstants; import io.stargate.sgv2.jsonapi.exception.ErrorCode; import io.stargate.sgv2.jsonapi.exception.JsonApiException; -import io.stargate.sgv2.jsonapi.service.operation.model.DocRowIdentifer; +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; import io.stargate.sgv2.jsonapi.util.JsonUtil; import java.math.BigDecimal; import java.util.Date; diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/WritableShreddedDocument.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/WritableShreddedDocument.java index 28ffe16f77..c892feab1f 100644 --- a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/WritableShreddedDocument.java +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/model/WritableShreddedDocument.java @@ -6,10 +6,10 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.stargate.sgv2.jsonapi.exception.ErrorCode; import io.stargate.sgv2.jsonapi.exception.JsonApiException; -import io.stargate.sgv2.jsonapi.service.operation.model.DocRowIdentifer; -import io.stargate.sgv2.jsonapi.service.operation.model.WritableDocRow; +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; import io.stargate.sgv2.jsonapi.service.shredding.JsonPath; import io.stargate.sgv2.jsonapi.service.shredding.ShredListener; +import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow; import io.stargate.sgv2.jsonapi.util.JsonUtil; import java.math.BigDecimal; import java.util.Arrays; diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/RowId.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/RowId.java new file mode 100644 index 0000000000..90e9e3cbc9 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/RowId.java @@ -0,0 +1,12 @@ +package io.stargate.sgv2.jsonapi.service.shredding.tables; + +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; + +// TODO: AARON needs to hold all the values we identify as the parts of the PK for the row +public record RowId(Object[] values) implements DocRowIdentifer { + + @Override + public Object value() { + return values; + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/RowShredder.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/RowShredder.java new file mode 100644 index 0000000000..005246ed63 --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/RowShredder.java @@ -0,0 +1,68 @@ +package io.stargate.sgv2.jsonapi.service.shredding.tables; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.stargate.sgv2.jsonapi.api.v1.metrics.JsonProcessingMetricsReporter; +import io.stargate.sgv2.jsonapi.config.DocumentLimitsConfig; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.util.HashMap; +import java.util.Map; + +/** AATON TODO shreds docs for rows */ +@ApplicationScoped +public class RowShredder { + + private final ObjectMapper objectMapper; + + private final DocumentLimitsConfig documentLimits; + + private final JsonProcessingMetricsReporter jsonProcessingMetricsReporter; + + @Inject + public RowShredder( + ObjectMapper objectMapper, + DocumentLimitsConfig documentLimits, + JsonProcessingMetricsReporter jsonProcessingMetricsReporter) { + this.objectMapper = objectMapper; + this.documentLimits = documentLimits; + this.jsonProcessingMetricsReporter = jsonProcessingMetricsReporter; + } + + /** + * Shreds the document to get it ready for the database, we need to know the table schema so we + * can work out the primary key and the columns to insert + * + * @param document + * @return + */ + public WriteableTableRow shred(JsonNode document) { + + // HACK for now we assume the primary is a field called primary key. + + Object keyObject; + try { + keyObject = objectMapper.treeToValue(document.get("key"), Object.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + Map columnValues = new HashMap<>(); + document + .fields() + .forEachRemaining( + entry -> { + // using fromCQL so it is case sensitive + try { + columnValues.put( + CqlIdentifier.fromCql(entry.getKey()), + objectMapper.treeToValue(entry.getValue(), Object.class)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + return new WriteableTableRow(new RowId(new Object[] {keyObject}), columnValues); + } +} diff --git a/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/WriteableTableRow.java b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/WriteableTableRow.java new file mode 100644 index 0000000000..b3de9f1c4b --- /dev/null +++ b/src/main/java/io/stargate/sgv2/jsonapi/service/shredding/tables/WriteableTableRow.java @@ -0,0 +1,15 @@ +package io.stargate.sgv2.jsonapi.service.shredding.tables; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer; +import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow; +import java.util.Map; + +public record WriteableTableRow(RowId id, Map allColumnValues) + implements WritableDocRow { + + @Override + public DocRowIdentifer docRowID() { + return id(); + } +}