Skip to content

Commit

Permalink
Insert working, one and many - some Test failing
Browse files Browse the repository at this point in the history
  • Loading branch information
amorton committed Jul 10, 2024
1 parent 7e6e99d commit 611282a
Show file tree
Hide file tree
Showing 19 changed files with 379 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package io.stargate.sgv2.jsonapi.service.operation.model;

import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer;
import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow;
import java.util.Optional;

// TODO AARON COMMENTS, comparable so we can re-order the inserts when building the results
public interface InsertAttempt extends Comparable<InsertAttempt> {

int position();

WritableDocRow docRow();
// AARON comments this is here because we may not have the row if we fail to shred.
// But what if we dont have enough to get the row id
Optional<DocRowIdentifer> docRowID();

Optional<WritableDocRow> docRow();

Optional<Throwable> failure();

InsertAttempt maybeAddFailure(Throwable failure);

@Override
default int compareTo(InsertAttempt o) {
return Integer.compare(position(), o.position());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.model.command.CommandStatus;
import io.stargate.sgv2.jsonapi.exception.mappers.ThrowableToErrorMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer;
import java.util.*;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -76,8 +73,8 @@ public CommandResult get() {
// Note: See DocRowIdentifer, it has an attribute that will be called for JSON serialization
List<DocRowIdentifer> insertedIds =
successfulInsertions.stream()
.map(InsertAttempt::docRow)
.map(WritableDocRow::docRowID)
.map(InsertAttempt::docRowID)
.map(Optional::orElseThrow)
.toList();
return new CommandResult(null, Map.of(CommandStatus.INSERTED_IDS, insertedIds), errors);
}
Expand All @@ -91,7 +88,7 @@ public CommandResult get() {
// Results array filled in order: first successful insertions
for (InsertAttempt okInsertion : successfulInsertions) {
results[okInsertion.position()] =
new InsertionResult(okInsertion.docRow().docRowID(), InsertionStatus.OK, null);
new InsertionResult(okInsertion.docRowID().orElseThrow(), InsertionStatus.OK, null);
}
// Second: failed insertions; output in order of insertion
for (InsertAttempt failedInsertion : failedInsertions) {
Expand All @@ -106,7 +103,8 @@ public CommandResult get() {
errors.add(error);
}
results[failedInsertion.position()] =
new InsertionResult(failedInsertion.docRow().docRowID(), InsertionStatus.ERROR, errorIdx);
new InsertionResult(
failedInsertion.docRowID().orElseThrow(), InsertionStatus.ERROR, errorIdx);
}

// And third, if any, skipped insertions; those that were not attempted (f.ex due
Expand All @@ -115,7 +113,7 @@ public CommandResult get() {
if (null == results[i]) {
results[i] =
new InsertionResult(
allInsertions.get(i).docRow().docRowID(), InsertionStatus.SKIPPED, null);
allInsertions.get(i).docRowID().orElseThrow(), InsertionStatus.SKIPPED, null);
}
}
return new CommandResult(
Expand All @@ -126,7 +124,7 @@ private static CommandResult.Error getOldStyleError(InsertAttempt insertAttempt)
String message =
"Failed to insert document with _id %s: %s"
.formatted(
insertAttempt.docRow().docRowID(),
insertAttempt.docRowID().orElseThrow(),
insertAttempt
.failure()
.map(Throwable::getMessage)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.stargate.sgv2.jsonapi.service.operation.model.collections;

import io.stargate.sgv2.jsonapi.service.operation.model.InsertAttempt;
import io.stargate.sgv2.jsonapi.service.operation.model.WritableDocRow;
import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer;
import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow;
import io.stargate.sgv2.jsonapi.service.shredding.model.DocumentId;
import io.stargate.sgv2.jsonapi.service.shredding.model.WritableShreddedDocument;
import java.util.ArrayList;
Expand All @@ -21,9 +22,7 @@ public class CollectionInsertAttempt implements InsertAttempt {

private final int position;
public final WritableShreddedDocument document;
// TODO Aaron - once work out why the document can be null, work out if can remove the documentID
// and get it from the doc
public final DocumentId documentId;
private final DocumentId documentId;

private Throwable failure;

Expand Down Expand Up @@ -60,17 +59,23 @@ public int position() {
}

@Override
public WritableDocRow docRow() {
return document;
public Optional<DocRowIdentifer> docRowID() {
return Optional.ofNullable(documentId);
}

@Override
public Optional<WritableDocRow> docRow() {
return Optional.ofNullable(document);
}

@Override
public Optional<Throwable> failure() {
return Optional.ofNullable(failure);
}

public CollectionInsertAttempt addFailure(Throwable failure) {
if (failure != null) {
@Override
public InsertAttempt maybeAddFailure(Throwable failure) {
if (this.failure == null) {
this.failure = failure;
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private Uni<Supplier<CommandResult>> insertOrdered(
// wrap item and failure
// the collection can decide how to react on failure
.onItemOrFailure()
.transform((id, t) -> insertion.addFailure(t)))
.transform((id, t) -> insertion.maybeAddFailure(t)))
.concatenate(false)

// if no failures reduce to the op page
Expand Down Expand Up @@ -175,7 +175,7 @@ private Uni<Supplier<CommandResult>> insertUnordered(
offlineMode)
// handle errors fail silent mode
.onItemOrFailure()
.transform((id, t) -> insertion.addFailure(t)))
.transform((id, t) -> insertion.maybeAddFailure(t)))
// then reduce here
.collect()
.in(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.stargate.sgv2.jsonapi.service.operation.model.tables;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.querybuilder.term.Term;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.api.model.command.CommandResult;
import io.stargate.sgv2.jsonapi.api.request.DataApiRequestInfo;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.QueryExecutor;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject;
import io.stargate.sgv2.jsonapi.service.operation.model.InsertOperationPage;
import io.stargate.sgv2.jsonapi.service.shredding.tables.WriteableTableRow;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class InsertTableOperation extends TableMutationOperation {

private final List<TableInsertAttempt> insertAttempts;

// TODO AARON JSON to start with, need a document object
public InsertTableOperation(
CommandContext<TableSchemaObject> commandContext, List<TableInsertAttempt> insertAttempts) {
super(commandContext);
this.insertAttempts = List.copyOf(insertAttempts);
}

@Override
public Uni<Supplier<CommandResult>> execute(
DataApiRequestInfo dataApiRequestInfo, QueryExecutor queryExecutor) {

// TODO AARON - this is for unordered, copy from Collection InsertOperation insertUnordered
return Multi.createFrom()
.iterable(insertAttempts)
// merge to make it parallel
.onItem()
.transformToUniAndMerge(
insertion -> insertRow(dataApiRequestInfo, queryExecutor, insertion))
// then reduce here
.collect()
.in(() -> new InsertOperationPage(insertAttempts, false), InsertOperationPage::aggregate)
// use object identity to resolve to Supplier<CommandResult>
// TODO AARON - not sure what this is doing, original was .map(i -> i)
.map(Function.identity());
}

private Uni<TableInsertAttempt> insertRow(
DataApiRequestInfo dataApiRequestInfo,
QueryExecutor queryExecutor,
TableInsertAttempt insertAttempt) {

// First things first: did we already fail? If so, propagate
if (insertAttempt.failure().isPresent()) {
return Uni.createFrom().failure(insertAttempt.failure().get());
}

// bind and execute
var boundStatement = buildInsertStatement(queryExecutor, insertAttempt.row().orElseThrow());

// TODO: AARON What happens to errors here?
return queryExecutor
.executeWrite(dataApiRequestInfo, boundStatement)
.onItemOrFailure()
.transform(
(result, t) -> {
if (t != null) {
return (TableInsertAttempt) insertAttempt.maybeAddFailure(t);
}
// This is where to check result.wasApplied() if this was a LWT
return insertAttempt;
})
.onItemOrFailure()
.transform((ia, throwable) -> (TableInsertAttempt) ia.maybeAddFailure(throwable));
}

private SimpleStatement buildInsertStatement(QueryExecutor queryExecutor, WriteableTableRow row) {

Map<CqlIdentifier, Term> colValues =
row.allColumnValues().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> literal(e.getValue())));

return insertInto(
commandContext.schemaObject().name.keyspace(),
commandContext.schemaObject().name.table())
.valuesByIds(colValues)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.stargate.sgv2.jsonapi.service.operation.model.tables;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.stargate.sgv2.jsonapi.service.operation.model.InsertAttempt;
import io.stargate.sgv2.jsonapi.service.shredding.DocRowIdentifer;
import io.stargate.sgv2.jsonapi.service.shredding.WritableDocRow;
import io.stargate.sgv2.jsonapi.service.shredding.tables.RowId;
import io.stargate.sgv2.jsonapi.service.shredding.tables.RowShredder;
import io.stargate.sgv2.jsonapi.service.shredding.tables.WriteableTableRow;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

public class TableInsertAttempt implements InsertAttempt {

private final int position;
private final RowId rowId;
private final WriteableTableRow row;
private Throwable failure;

private TableInsertAttempt(int position, RowId rowId, WriteableTableRow row) {
this.position = position;
this.rowId = rowId;
this.row = row;
}

public static List<TableInsertAttempt> create(RowShredder shredder, JsonNode document) {
return create(shredder, List.of(document));
}

public static List<TableInsertAttempt> create(RowShredder shredder, List<JsonNode> documents) {
Preconditions.checkNotNull(shredder, "shredder cannot be null");
Preconditions.checkNotNull(documents, "documents cannot be null");

return IntStream.range(0, documents.size())
.mapToObj(
i -> {
WriteableTableRow row;
try {
row = shredder.shred(documents.get(i));
} catch (Exception e) {
// TODO: need a shredding base excpetion to catch
// TODO: we need to get the row id, so we can return it in the response
return (TableInsertAttempt)
new TableInsertAttempt(i, null, null).maybeAddFailure(e);
}
return new TableInsertAttempt(i, row.id(), row);
})
.toList();
}

public Optional<WriteableTableRow> row() {
return Optional.ofNullable(row);
}

@Override
public int position() {
return position;
}

@Override
public Optional<DocRowIdentifer> docRowID() {
return Optional.ofNullable(rowId);
}

@Override
public Optional<WritableDocRow> docRow() {
return Optional.ofNullable(row);
}

@Override
public Optional<Throwable> failure() {
return Optional.ofNullable(failure);
}

@Override
public InsertAttempt maybeAddFailure(Throwable failure) {
if (this.failure == null) {
this.failure = failure;
}
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
package io.stargate.sgv2.jsonapi.service.operation.model.tables;

import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject;

/** For now, a marker class / interface for operations that modify data in a table. */
abstract class TableMutationOperation extends TableOperation {}
abstract class TableMutationOperation extends TableOperation {

protected TableMutationOperation(CommandContext<TableSchemaObject> commandContext) {
super(commandContext);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
package io.stargate.sgv2.jsonapi.service.operation.model.tables;

import com.google.common.base.Preconditions;
import io.stargate.sgv2.jsonapi.api.model.command.CommandContext;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.TableSchemaObject;
import io.stargate.sgv2.jsonapi.service.operation.model.Operation;

/**
* Base for any operations that works with CQL Tables with rows, rather than Collections of
* Documents
*/
abstract class TableOperation implements Operation {}
abstract class TableOperation implements Operation {

protected final CommandContext<TableSchemaObject> commandContext;

protected TableOperation(CommandContext<TableSchemaObject> commandContext) {
Preconditions.checkNotNull(commandContext, "commandContext cannot be null");
this.commandContext = commandContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@
/** For now, a marker class / interface for operations that read data in a table. */
abstract class TableReadOperation extends TableOperation {

protected final CommandContext<TableSchemaObject> commandContext;
protected final LogicalExpression logicalExpression;

public TableReadOperation(
CommandContext<TableSchemaObject> commandContext, LogicalExpression logicalExpression) {

Preconditions.checkNotNull(commandContext, "commandContext cannot be null");
super(commandContext);
Preconditions.checkNotNull(logicalExpression, "logicalExpression cannot be null");
this.commandContext = commandContext;
this.logicalExpression = logicalExpression;
}
}
Loading

0 comments on commit 611282a

Please sign in to comment.