Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for findOne command #27

Merged
merged 20 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public enum ErrorCode {
/** Command error codes. */
COMMAND_NOT_IMPLEMENTED("The provided command is not implemented."),

DOCUMENT_UNPARSEABLE("Unable to parse the document"),

FILTER_UNRESOLVABLE("Unable to resolve the filter"),

SHRED_BAD_DOCUMENT_TYPE("Bad document type to shred"),
Expand All @@ -18,7 +20,9 @@ public enum ErrorCode {

SHRED_UNRECOGNIZED_NODE_TYPE("Unrecognized JSON node type in input document"),

UNSUPPORTED_FILTER_DATA_TYPE("Unsupported filter data type");
UNSUPPORTED_FILTER_DATA_TYPE("Unsupported filter data type"),

UNSUPPORTED_FILTER_OPERATION("Unsupported filter operator");

private final String message;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,24 @@
@ConfigMapping(prefix = "stargate.document")
public interface DocumentConfig {

/** @return Defines the maximum document page size, defaults to <code>20</code>. */
@Max(100)
/** @return Defines the maximum document page size, defaults to <code>100</code>. */
@Max(500)
@Positive
@WithDefault("100")
int maxPageSize();

/** @return Defines the default document page size, defaults to <code>100</code>. */
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
@Max(500)
@Positive
@WithDefault("20")
int pageSize();
int defaultPageSize();

/**
* @return Defines the maximum limit of document that can be returned for a request, defaults to
* <code>1000</code>.
*/
@Max(Integer.MAX_VALUE)
@Positive
@WithDefault("1000")
int maxLimit();
Comment on lines +41 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how's maxLimit() different from maxPageSize()? If we are storing document per row, why do we need this limit?

This was different in previous versions of docs api, as we stored documents in multiple rows, thus we need to defined limis for documents getting back and limits for page size in the executed queries.. I don't see this situation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxPageSize is for maximum number of record that can be fetched in a query iteration, which can be paginated. maxLimit is maximum number of documents that can be delivered for a command. If a find command (yet to be implemented) reads all the document in a collection, even if it's paginated it will return only max document set as limit.

Copy link
Contributor

@ivansenic ivansenic Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so this is because of the possible in memory filtering right? 👍

But default of 1000 seems wrong, 20 is the default "page" size in many implementations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not in that context. Let say if a query is "select * from table", maxLimit defines how many records we will return irrespective of pagination. If maxLimit is 1000 the query will be run as "select * from table limit 1000" if limit value cannot be resolved from the command.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, ok, but shouldn't limit always be same as page size? 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It need not be. If for example the user want to show any 200 document on the screen (may be sorted). The limit will be 200. To keep the VM in check in stargate we will have max page size as 100. The driver will paginate and return 200 records.

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public Uni<QueryOuterClass.ResultSet> executeRead(
}

if (pageSize.isPresent()) {
int page = Math.min(pageSize.get(), documentConfig.pageSize());
int page = Math.min(pageSize.get(), documentConfig.maxPageSize());
params.setPageSize(Int32Value.of(page));
} else {
params.setPageSize(Int32Value.of(documentConfig.pageSize()));
params.setPageSize(Int32Value.of(documentConfig.defaultPageSize()));
}
return queryBridge(
QueryOuterClass.Query.newBuilder(query).setParameters(params).buildPartial());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.stargate.sgv3.docsapi.service.operation.model;

import io.smallrye.mutiny.Uni;
import io.stargate.sgv3.docsapi.api.model.command.CommandContext;
import io.stargate.sgv3.docsapi.api.model.command.CommandResult;
import io.stargate.sgv3.docsapi.service.bridge.executor.QueryExecutor;
import java.util.function.Supplier;
Expand All @@ -25,9 +24,5 @@
* OperationExecutor}
*/
public interface Operation {

/** @return The context of the command responsible for this operation. */
CommandContext commandContext();
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved

Uni<Supplier<CommandResult>> execute(QueryExecutor queryExecutor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.stargate.sgv3.docsapi.service.operation.model;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import io.smallrye.mutiny.Uni;
import io.stargate.bridge.grpc.Values;
import io.stargate.bridge.proto.QueryOuterClass;
import io.stargate.sgv3.docsapi.exception.DocsException;
import io.stargate.sgv3.docsapi.exception.ErrorCode;
import io.stargate.sgv3.docsapi.service.bridge.executor.QueryExecutor;
import io.stargate.sgv3.docsapi.service.operation.model.impl.ReadDocument;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

/**
* ReadOperation interface which all find command operations will use. It also provides the
* implementation to excute and query and parse the result set as {@link FindResponse}
*/
public interface ReadOperation extends Operation {
static String[] documentColumns = {"key", "tx_id", "doc_json"};
static String[] documentKeyColumns = {"key", "tx_id"};

/**
* Default implementation to query and parse the result set
*
* @param queryExecutor
* @param query
* @param pagingState
* @param readDocument This flag is set to false if the read is done to just identify the document
* id and tx_id to perform another DML operation
* @param objectMapper
* @return
*/
default Uni<FindResponse> findDocument(
QueryExecutor queryExecutor,
QueryOuterClass.Query query,
String pagingState,
boolean readDocument,
ObjectMapper objectMapper) {
return queryExecutor
.executeRead(query, Optional.ofNullable(pagingState), Optional.empty())
.onItem()
.transform(
rSet -> {
int remaining = rSet.getRowsCount();
int colCount = rSet.getColumnsCount();
List<ReadDocument> documents = new ArrayList<>(remaining);
Iterator<QueryOuterClass.Row> rowIterator = rSet.getRowsList().stream().iterator();
while (--remaining >= 0 && rowIterator.hasNext()) {
QueryOuterClass.Row row = rowIterator.next();
ReadDocument document = null;
try {
document =
new ReadDocument(
Values.string(row.getValues(0)), // key
Optional.of(Values.uuid(row.getValues(1))), // tx_id
readDocument
? objectMapper.readTree(Values.string(row.getValues(2)))
: null);
} catch (JsonProcessingException e) {
throw new DocsException(ErrorCode.DOCUMENT_UNPARSEABLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means if parsing any of the rows fails, this ends up in the exception. Is this the wanted behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

objectMapper.readTree is throwing JsonProcessingException which needs to be handled but in theory this will never happen because the json document stored is validated during insert.

}
documents.add(document);
}
Comment on lines +48 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want something similar as we have in the docs v3 -> io.stargate.sgv2.docsapi.service.common.model.RowWrapper

Imo this is a very nice utility that helps you row columns by name.. I see you used ids here, is this what we want to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are only max of 3 static columns read from the table as per design. I don't think we need them.

return new FindResponse(documents, extractPagingStateFromResultSet(rSet));
});
}

private String extractPagingStateFromResultSet(QueryOuterClass.ResultSet rSet) {
maheshrajamani marked this conversation as resolved.
Show resolved Hide resolved
BytesValue pagingStateOut = rSet.getPagingState();
if (pagingStateOut.isInitialized()) {
ByteString rawPS = pagingStateOut.getValue();
if (!rawPS.isEmpty()) {
byte[] b = rawPS.toByteArray();
// Could almost use "ByteBufferUtils.toBase64" but need variant that takes 'byte[]'
return Base64.getEncoder().encodeToString(b);
}
}
return null;
}

public static record FindResponse(List<ReadDocument> docs, String pagingState) {}

/**
* This method not needed once Tatu implements new shredder for document
*
* @param coll
* @return
*/
private int verifyMapLength(QueryOuterClass.Collection coll) {
return 0;
}
}
Loading