Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce AggregationStage API #4309

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
Expand Down Expand Up @@ -296,6 +297,19 @@ default MongoCollection<Document> createView(String name, Class<?> source, Aggre
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ public MongoCollection<Document> createView(String name, Class<?> source, Aggreg
@Nullable ViewOptions options) {

return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}

Expand All @@ -657,7 +657,7 @@ public MongoCollection<Document> createView(String name, String source, Aggregat
@Nullable ViewOptions options) {

return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import org.springframework.data.geo.GeoResult;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
Expand Down Expand Up @@ -256,6 +256,19 @@ default Mono<MongoCollection<Document>> createView(String name, Class<?> source,
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}

/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public Mono<MongoCollection<Document>> createView(String name, Class<?> source,
@Nullable ViewOptions options) {

return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}

Expand All @@ -685,7 +685,7 @@ public Mono<MongoCollection<Document>> createView(String name, String source, Ag
@Nullable ViewOptions options) {

return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ public class Aggregation {
private final AggregationOptions options;

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
* Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
*
* @param operations must not be {@literal null} or empty.
*/
public static Aggregation newAggregation(List<? extends AggregationOperation> operations) {
return newAggregation(operations.toArray(new AggregationOperation[operations.size()]));
public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
return newAggregation(operations.toArray(AggregationStage[]::new));
}

/**
Expand All @@ -119,6 +119,16 @@ public static Aggregation newAggregation(AggregationOperation... operations) {
return new Aggregation(operations);
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static Aggregation newAggregation(AggregationStage... stages) {
return new Aggregation(stages);
}

/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
Expand All @@ -130,6 +140,17 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
return AggregationUpdate.from(Arrays.asList(operations));
}

/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
* @param operations can be {@literal empty} but must not be {@literal null}.
* @return new instance of {@link AggregationUpdate}.
* @since 4.1
*/
public static AggregationUpdate newUpdate(AggregationStage... operations) {
return AggregationUpdate.updateFrom(Arrays.asList(operations));
}

/**
* Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
* supported in MongoDB version 2.6+.
Expand All @@ -141,7 +162,7 @@ public static AggregationUpdate newUpdate(AggregationOperation... operations) {
public Aggregation withOptions(AggregationOptions options) {

Assert.notNull(options, "AggregationOptions must not be null");
return new Aggregation(this.pipeline.getOperations(), options);
return new Aggregation(this.pipeline.getStages(), options);
}

/**
Expand All @@ -150,8 +171,8 @@ public Aggregation withOptions(AggregationOptions options) {
* @param type must not be {@literal null}.
* @param operations must not be {@literal null} or empty.
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) {
return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()]));
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
return newAggregation(type, operations.toArray(AggregationStage[]::new));
}

/**
Expand All @@ -164,6 +185,17 @@ public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationO
return new TypedAggregation<T>(type, operations);
}

/**
* Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
*
* @param type must not be {@literal null}.
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
return new TypedAggregation<>(type, stages);
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
Expand All @@ -173,6 +205,15 @@ protected Aggregation(AggregationOperation... aggregationOperations) {
this(asAggregationList(aggregationOperations));
}

/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(AggregationStage... aggregationOperations) {
this(Arrays.asList(aggregationOperations));
}

/**
* @param aggregationOperations must not be {@literal null} or empty.
* @return
Expand All @@ -189,7 +230,7 @@ protected static List<AggregationOperation> asAggregationList(AggregationOperati
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
this(aggregationOperations, DEFAULT_OPTIONS);
}

Expand All @@ -199,7 +240,7 @@ protected Aggregation(List<AggregationOperation> aggregationOperations) {
* @param aggregationOperations must not be {@literal null}.
* @param options must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {

Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
Assert.notNull(options, "AggregationOptions must not be null");
Expand Down Expand Up @@ -638,6 +679,17 @@ public static FacetOperationBuilder facet(AggregationOperation... aggregationOpe
return facet().and(aggregationOperations);
}

/**
* Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
*
* @param stages the sub-pipeline, must not be {@literal null}.
* @return new instance of {@link FacetOperation}.
* @since 4.1
*/
public static FacetOperationBuilder facet(AggregationStage... stages) {
return facet().and(stages);
}

/**
* Creates a new {@link LookupOperation}.
*
Expand Down Expand Up @@ -668,14 +720,14 @@ public static LookupOperation lookup(Field from, Field localField, Field foreign

/**
* Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
*
* <pre class="code">
* Aggregation.lookup().from("restaurants")
* .localField("restaurant_name")
* .foreignField("name")
* .let(newVariable("orders_drink").forField("drink"))
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
* .let(newVariable("orders_drink").forField("drink"))
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* </pre>
*
* @return new instance of {@link LookupOperationBuilder}.
* @since 4.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package org.springframework.data.mongodb.core.aggregation;

import java.util.Collections;
import java.util.List;

import org.bson.Document;
import org.springframework.util.CollectionUtils;

/**
* Represents one single operation in an aggregation pipeline.
Expand All @@ -29,30 +29,24 @@
* @author Christoph Strobl
* @since 1.3
*/
public interface AggregationOperation {
public interface AggregationOperation extends MultiOperationAggregationStage {

/**
* Turns the {@link AggregationOperation} into a {@link Document} by using the given
* {@link AggregationOperationContext}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the Document
* @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
* @return
*/
@Deprecated
@Override
Document toDocument(AggregationOperationContext context);

/**
* Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given
* {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
* eg. {@code $sort} or {@code $limit}.
* More the exception than the default.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the pipeline stages to run through. Never {@literal null}.
* @since 2.2
* @return never {@literal null}.
*/
@Override
default List<Document> toPipelineStages(AggregationOperationContext context) {
return Collections.singletonList(toDocument(context));
return List.of(toDocument(context));
}

/**
Expand All @@ -63,6 +57,6 @@ default List<Document> toPipelineStages(AggregationOperationContext context) {
* @since 3.0.2
*/
default String getOperator() {
return toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next();
return CollectionUtils.lastElement(toPipelineStages(Aggregation.DEFAULT_CONTEXT)).keySet().iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ class AggregationOperationRenderer {
* @param rootContext must not be {@literal null}.
* @return the {@link List} of {@link Document}.
*/
static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) {
static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {

List<Document> operationDocuments = new ArrayList<Document>(operations.size());

AggregationOperationContext contextToUse = rootContext;

for (AggregationOperation operation : operations) {
for (AggregationStage operation : operations) {

operationDocuments.addAll(operation.toPipelineStages(contextToUse));
if(operation instanceof MultiOperationAggregationStage mops) {
operationDocuments.addAll(mops.toPipelineStages(contextToUse));
} else {
operationDocuments.add(operation.toDocument(contextToUse));
}

if (operation instanceof FieldsExposingAggregationOperation) {

Expand Down
Loading