Skip to content

Commit

Permalink
Merged to work with the CQL order
Browse files Browse the repository at this point in the history
and some refactoring to simplify the code paths.
  • Loading branch information
amorton committed Nov 3, 2024
1 parent fd070e8 commit 14b10b7
Show file tree
Hide file tree
Showing 46 changed files with 1,670 additions and 1,132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,28 @@ public enum CommandStatus {
PRIMARY_KEY_SCHEMA,

/**
* The element contains the schema that describes the schema of the columns that were requested in
* the projection.
* The schema of the columns that were requested in the projection.
*
* <p>When doing a read, the result of the read by default does not columns that have null values.
* Additionaly in the default JSON representation multiple Column types may be represented as a
* Additionally, in the default JSON representation multiple Column types may be represented as a
* single JSON type, such as dates, timestamps, duration all represented as a string. Or all
* numeric types as a JSON number.
*
* <p>Clients can use the schema returned here to understand where null values were ommitted and
* what the actual type of the column is.
* <p>Clients can use the schema returned here to understand where null values were omitted and
* what the database type of the column is.
*/
@JsonProperty("projectionSchema")
PROJECTION_SCHEMA;
PROJECTION_SCHEMA,

/**
* The count of the number of rows that were read from the database and sorted in memory.
*
* <p>Sorting in memory is done when a sort clause uses columns that are not from the partition
* sort key. This type of sorting usually needs to read the entire table to sort the rows, and
* needs to keep in memory the skip + limit number of rows. Check documentation for ways to avoid
* in memory sorting.
*/
@JsonProperty("sortedRowCount")
SORTED_ROW_COUNT,
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public SortClause deserialize(JsonParser parser, DeserializationContext ctxt) th
// TODO: HACK: quick support for tables, if the value is an array we will assume the column
// is a vector then need to check on table pathway that the sort is correct.
// NOTE: does not check if there are more than one sort expression, the
// TableSortClauseResolver will take care of that so we can get proper ApiExceptions
// TableCqlSortClauseResolver will take care of that so we can get proper ApiExceptions
// this is also why we do not break the look here
sortExpressions.add(
SortExpression.tableVectorSort(path, arrayNodeToVector((ArrayNode) inner.getValue())));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.stargate.sgv2.jsonapi.service.cqldriver;

import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.stargate.sgv2.jsonapi.exception.SortException;
import io.stargate.sgv2.jsonapi.service.cqldriver.executor.RowAccumulator;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import org.apache.commons.lang3.NotImplementedException;

/**
* Accumulates rows from multiple {@link AsyncResultSet} instances into a single page of rows using
* a {@link RowAccumulator}.
*
* <p>Used when we want to pull rows from multiple result set pages and sort them in memory. Call
* {@link #accumulate(AsyncResultSet)} everytime you have a result set to add to the accumulator.
*/
public class AccumulatingAsyncResultSet implements AsyncResultSet {

private final RowAccumulator rowAccumulator;
private ColumnDefinitions columnDefinitions;

private int returnedPages = 0;

public AccumulatingAsyncResultSet(RowAccumulator rowAccumulator) {
this.rowAccumulator = Objects.requireNonNull(rowAccumulator, "rowAccumulator must not be null");
}

/**
* Adds the rows from the current page to the accumulator.
*
* @param resultSet The result set to add to the accumulator.
*/
public void accumulate(AsyncResultSet resultSet) {
if (columnDefinitions == null) {
columnDefinitions = resultSet.getColumnDefinitions();
}
resultSet
.currentPage()
.forEach(
row -> {
if (!rowAccumulator.accumulate(row)) {
// TODO: change so we are not throwing an application exception here, would be
// better to
// return false or throw a checked so the application can generate a better error
// message
throw SortException.Code.CANNOT_SORT_TOO_MUCH_DATA.get();
}
});
}

@Override
@NonNull
public ColumnDefinitions getColumnDefinitions() {
if (columnDefinitions == null) {
throw new IllegalStateException("No column definitions available");
}
return columnDefinitions;
}

@Override
@NonNull
public ExecutionInfo getExecutionInfo() {
throw new NotImplementedException();
}

@Override
@NonNull
public Iterable<Row> currentPage() {
returnedPages++;
return rowAccumulator.getPage();
}

@Override
public int remaining() {
throw new NotImplementedException();
}

@Override
public boolean hasMorePages() {
return returnedPages == 0;
}

@Override
@NonNull
public CompletionStage<AsyncResultSet> fetchNextPage() throws IllegalStateException {
throw new NotImplementedException();
}

@Override
public boolean wasApplied() {
return true;
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 14b10b7

Please sign in to comment.