Skip to content

Commit

Permalink
Consider supporting Pageable for query methods
Browse files Browse the repository at this point in the history
Closes gh-30
  • Loading branch information
evgeniycheban committed Jan 2, 2025
1 parent 3c2ae6f commit a5680f2
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import ru.rt.restream.reindexer.Query;

import org.springframework.data.domain.Sort;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.Repository;

/**
Expand All @@ -29,7 +31,7 @@
* @author Evgeniy Cheban
*/
@NoRepositoryBean
public interface ReindexerRepository<T, ID> extends CrudRepository<T, ID> {
public interface ReindexerRepository<T, ID> extends CrudRepository<T, ID>, PagingAndSortingRepository<T, ID> {

@Override
<S extends T> List<S> saveAll(Iterable<S> entities);
Expand All @@ -40,6 +42,9 @@ public interface ReindexerRepository<T, ID> extends CrudRepository<T, ID> {
@Override
List<T> findAllById(Iterable<ID> ids);

@Override
List<T> findAll(Sort sort);

/**
* Returns a new {@link Query} instance for further customizations.
* @see Query for more information regarding supported conditions and result types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import ru.rt.restream.reindexer.AggregationResult;

import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.mapping.PreferredConstructor;
import org.springframework.data.mapping.model.PreferredConstructorDiscoverer;
import org.springframework.data.reindexer.repository.support.TransactionalNamespace;
Expand All @@ -53,6 +54,7 @@
import org.springframework.data.repository.query.parser.Part;
import org.springframework.data.repository.query.parser.PartTree;
import org.springframework.data.repository.query.parser.PartTree.OrPart;
import org.springframework.data.support.PageableExecutionUtils;
import org.springframework.util.Assert;

/**
Expand All @@ -62,8 +64,6 @@
*/
public class ReindexerRepositoryQuery implements RepositoryQuery {

private final Map<Class<?>, Constructor<?>> preferredConstructors = new ConcurrentHashMap<>();

private final ReindexerQueryMethod queryMethod;

private final Namespace<?> namespace;
Expand All @@ -72,6 +72,8 @@ public class ReindexerRepositoryQuery implements RepositoryQuery {

private final Map<String, ReindexerIndex> indexes;

private final QueryExecution queryExecution;

/**
* Creates an instance.
*
Expand All @@ -89,37 +91,14 @@ public ReindexerRepositoryQuery(ReindexerQueryMethod queryMethod, ReindexerEntit
for (ReindexerIndex index : namespace.getIndexes()) {
this.indexes.put(index.getName(), index);
}
this.queryExecution = new DelegatingQueryExecution(QueryMethodExecution.values());
}

@Override
public Object execute(Object[] parameters) {
ReturnedType projectionType = getProjectionType(parameters);
Query<?> query = createQuery(projectionType, parameters);
if (this.queryMethod.isCollectionQuery()) {
return toCollection(query, projectionType);
}
if (this.queryMethod.isStreamQuery()) {
return toStream(query, projectionType);
}
if (this.queryMethod.isIteratorQuery()) {
return new ProjectingResultIterator(query, projectionType);
}
if (this.queryMethod.isQueryForEntity()) {
Object entity = toEntity(query, projectionType);
Assert.state(entity != null, "Exactly one item expected, but there is zero");
return entity;
}
if (this.tree.isExistsProjection()) {
return query.exists();
}
if (this.tree.isCountProjection()) {
return query.count();
}
if (this.tree.isDelete()) {
query.delete();
return null;
}
return Optional.ofNullable(toEntity(query, projectionType));
return this.queryExecution.execute(query, this, projectionType, parameters);
}

private ReturnedType getProjectionType(Object[] parameters) {
Expand Down Expand Up @@ -154,6 +133,15 @@ private Query<?> createQuery(ReturnedType projectionType, Object[] parameters) {
base = base.sort(order.getProperty(), order.isDescending());
}
}
if (this.queryMethod.getParameters().hasPageableParameter()) {
Pageable pageable = (Pageable) parameters[this.queryMethod.getParameters().getPageableIndex()];
if (pageable.isPaged()) {
for (Order order : pageable.getSort()) {
base = base.sort(order.getProperty(), order.isDescending());
}
base = base.limit(pageable.getPageSize()).offset((int) pageable.getOffset()).reqTotal();
}
}
return base;
}

Expand Down Expand Up @@ -220,51 +208,176 @@ private Object getParameterValue(String indexName, Object value) {
return value;
}

private Collection<?> toCollection(Query<?> query, ReturnedType projectionType) {
try (ResultIterator<?> iterator = new ProjectingResultIterator(query, projectionType)) {
Collection<Object> result = CollectionFactory.createCollection(this.queryMethod.getReturnType(),
this.queryMethod.getReturnedObjectType(), (int) iterator.size());
while (iterator.hasNext()) {
result.add(iterator.next());
}
return result;
}
@Override
public ReindexerQueryMethod getQueryMethod() {
return this.queryMethod;
}

private Stream<?> toStream(Query<?> query, ReturnedType projectionType) {
ResultIterator<?> iterator = new ProjectingResultIterator(query, projectionType);
Spliterator<?> spliterator = Spliterators.spliterator(iterator, iterator.size(), Spliterator.NONNULL);
return StreamSupport.stream(spliterator, false).onClose(iterator::close);
private interface QueryExecution {
Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters);
}

private Object toEntity(Query<?> query, ReturnedType projectionType) {
try (ResultIterator<?> iterator = new ProjectingResultIterator(query, projectionType)) {
Object item = null;
if (iterator.hasNext()) {
item = iterator.next();
private static final class DelegatingQueryExecution implements QueryExecution {

private final List<QueryMethodExecution> executions;

private DelegatingQueryExecution(QueryMethodExecution... executions) {
this.executions = List.of(executions);
}

@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
for (QueryMethodExecution execution : this.executions) {
if (execution.supports(repositoryQuery)) {
return execution.execute(query, repositoryQuery, projectionType, parameters);
}
}
if (iterator.hasNext()) {
throw new IllegalStateException("Exactly one item expected, but there are more");
return fallbackToSingleResultQuery(query, repositoryQuery.queryMethod, projectionType);
}

private Object fallbackToSingleResultQuery(Query<?> query, ReindexerQueryMethod queryMethod, ReturnedType projectionType) {
Object entity = toEntity(query, queryMethod, projectionType);
if (queryMethod.isOptionalQuery()) {
return Optional.ofNullable(entity);
}
Assert.state(entity != null, "Exactly one item expected, but there is zero");
return entity;
}

private Object toEntity(Query<?> query, ReindexerQueryMethod queryMethod, ReturnedType projectionType) {
try (ResultIterator<?> iterator = new ProjectingResultIterator(query, queryMethod, projectionType)) {
Object item = null;
if (iterator.hasNext()) {
item = iterator.next();
}
if (iterator.hasNext()) {
throw new IllegalStateException("Exactly one item expected, but there are more");
}
return item;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
return item;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

@Override
public ReindexerQueryMethod getQueryMethod() {
return this.queryMethod;
private enum QueryMethodExecution implements QueryExecution {
COLLECTION {
@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
try (ResultIterator<?> iterator = new ProjectingResultIterator(query, repositoryQuery.queryMethod, projectionType)) {
Collection<Object> result = CollectionFactory.createCollection(repositoryQuery.queryMethod.getReturnType(),
repositoryQuery.queryMethod.getReturnedObjectType(), (int) iterator.size());
while (iterator.hasNext()) {
result.add(iterator.next());
}
return result;
}
}

@Override
public boolean supports(ReindexerRepositoryQuery repositoryQuery) {
return repositoryQuery.queryMethod.isCollectionQuery() && !repositoryQuery.queryMethod.getParameters().hasPageableParameter();
}
},
STREAM {
@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
ResultIterator<?> iterator = new ProjectingResultIterator(query, repositoryQuery.queryMethod, projectionType);
Spliterator<?> spliterator = Spliterators.spliterator(iterator, iterator.size(), Spliterator.NONNULL);
return StreamSupport.stream(spliterator, false).onClose(iterator::close);
}

@Override
public boolean supports(ReindexerRepositoryQuery repositoryQuery) {
return repositoryQuery.queryMethod.isStreamQuery();
}
},
ITERATOR {
@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
return new ProjectingResultIterator(query, repositoryQuery.queryMethod, projectionType);
}

@Override
public boolean supports(ReindexerRepositoryQuery repositoryQuery) {
return repositoryQuery.queryMethod.isIteratorQuery();
}
},
PAGEABLE {
@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
Pageable pageable = (Pageable) parameters[repositoryQuery.queryMethod.getParameters().getPageableIndex()];
try (ProjectingResultIterator iterator = new ProjectingResultIterator(query, repositoryQuery.queryMethod, projectionType)) {
List<Object> content = new ArrayList<>();
while (iterator.hasNext()) {
content.add(iterator.next());
}
if (repositoryQuery.queryMethod.isPageQuery()) {
return pageable.isPaged() ? PageableExecutionUtils.getPage(content, pageable, iterator::getTotalCount)
: new PageImpl<>(content);
}
if (repositoryQuery.queryMethod.isListQuery()) {
return content;
}
throw new UnsupportedOperationException("Unsupported return type for Pageable query " + repositoryQuery.queryMethod.getReturnType());
}
}

@Override
public boolean supports(ReindexerRepositoryQuery repositoryQuery) {
return repositoryQuery.queryMethod.getParameters().hasPageableParameter();
}
},
COUNT {
@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
return query.count();
}

@Override
boolean supports(ReindexerRepositoryQuery repositoryQuery) {
return repositoryQuery.tree.isCountProjection();
}
},
EXISTS {
@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
return query.exists();
}

@Override
boolean supports(ReindexerRepositoryQuery repositoryQuery) {
return repositoryQuery.tree.isExistsProjection();
}
},
DELETE {
@Override
public Object execute(Query<?> query, ReindexerRepositoryQuery repositoryQuery, ReturnedType projectionType, Object[] parameters) {
query.delete();
return null;
}

@Override
boolean supports(ReindexerRepositoryQuery repositoryQuery) {
return repositoryQuery.tree.isDelete();
}
};
abstract boolean supports(ReindexerRepositoryQuery repositoryQuery);
}

private final class ProjectingResultIterator implements ResultIterator<Object> {
private static final class ProjectingResultIterator implements ResultIterator<Object> {

private static final Map<Class<?>, Constructor<?>> cache = new ConcurrentHashMap<>();

private final ResultIterator<?> delegate;

private final ReindexerQueryMethod queryMethod;

private final ReturnedType projectionType;

private ProjectingResultIterator(Query<?> query, ReturnedType projectionType) {
private ProjectingResultIterator(Query<?> query, ReindexerQueryMethod queryMethod, ReturnedType projectionType) {
this.delegate = query.execute();
this.queryMethod = queryMethod;
this.projectionType = projectionType;
}

Expand Down Expand Up @@ -298,14 +411,14 @@ public Object next() {
Object item = this.delegate.next();
if (this.projectionType != null) {
if (this.projectionType.getReturnedType().isInterface()) {
return ReindexerRepositoryQuery.this.queryMethod.getFactory().createProjection(this.projectionType.getReturnedType(), item);
return this.queryMethod.getFactory().createProjection(this.projectionType.getReturnedType(), item);
}
List<String> properties = this.projectionType.getInputProperties();
Object[] values = new Object[properties.size()];
for (int i = 0; i < properties.size(); i++) {
values[i] = BeanPropertyUtils.getProperty(item, properties.get(i));
}
Constructor<?> constructor = ReindexerRepositoryQuery.this.preferredConstructors.computeIfAbsent(this.projectionType.getReturnedType(), (type) -> {
Constructor<?> constructor = cache.computeIfAbsent(this.projectionType.getReturnedType(), (type) -> {
PreferredConstructor<?, ?> preferredConstructor = PreferredConstructorDiscoverer.discover(type);
Assert.state(preferredConstructor != null, () -> "No preferred constructor found for " + type);
return preferredConstructor.getConstructor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@
import ru.rt.restream.reindexer.Namespace;
import ru.rt.restream.reindexer.Query;
import ru.rt.restream.reindexer.Reindexer;
import ru.rt.restream.reindexer.ResultIterator;

import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.reindexer.repository.ReindexerRepository;
import org.springframework.data.reindexer.repository.query.ReindexerEntityInformation;
import org.springframework.data.support.PageableExecutionUtils;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -96,6 +103,34 @@ public List<T> findAll() {
return this.namespace.query().toList();
}

@Override
public List<T> findAll(Sort sort) {
Query<T> query = this.namespace.query();
for (Order order : sort) {
query = query.sort(order.getProperty(), order.isDescending());
}
return query.toList();
}

@Override
public Page<T> findAll(Pageable pageable) {
if (pageable.isUnpaged()) {
return new PageImpl<>(findAll());
}
Query<T> query = this.namespace.query();
for (Order order : pageable.getSort()) {
query = query.sort(order.getProperty(), order.isDescending());
}
query = query.limit(pageable.getPageSize()).offset((int) pageable.getOffset()).reqTotal();
try (ResultIterator<T> iterator = query.execute()) {
List<T> content = new ArrayList<>();
while (iterator.hasNext()) {
content.add(iterator.next());
}
return PageableExecutionUtils.getPage(content, pageable, iterator::getTotalCount);
}
}

@Override
public List<T> findAllById(Iterable<ID> ids) {
Assert.notNull(ids, "The given Ids of entities not be null!");
Expand Down
Loading

0 comments on commit a5680f2

Please sign in to comment.