From b6e186cdea45e0987082ba4328e38f93898c8f39 Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Sun, 30 Sep 2018 22:35:17 +0200 Subject: [PATCH 1/7] First steps for adding Parent-Aggregation to parent-join module --- .../client/RestHighLevelClientTests.java | 4 +- .../elasticsearch/join/ParentJoinPlugin.java | 10 +- .../ChildrenToParentAggregator.java | 216 +++++++++++ .../join/aggregations/InternalParent.java | 56 +++ .../aggregations/JoinAggregationBuilders.java | 7 + .../join/aggregations/Parent.java | 28 ++ .../ParentAggregationBuilder.java | 176 +++++++++ .../aggregations/ParentAggregatorFactory.java | 73 ++++ .../join/aggregations/ParsedParent.java | 36 ++ .../spi/ParentJoinNamedXContentProvider.java | 16 +- .../aggregations/AbstractParentChildIT.java | 132 +++++++ .../join/aggregations/ChildrenIT.java | 85 +---- .../ChildrenToParentAggregatorTests.java | 337 ++++++++++++++++++ .../aggregations/InternalParentTests.java | 65 ++++ .../join/aggregations/ParentIT.java | 211 +++++++++++ .../join/aggregations/ParentTests.java | 42 +++ 16 files changed, 1400 insertions(+), 94 deletions(-) create mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java create mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/InternalParent.java create mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/Parent.java create mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregationBuilder.java create mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java create mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParsedParent.java create mode 100644 modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildIT.java create mode 100644 modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java create mode 100644 modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/InternalParentTests.java create mode 100644 modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java create mode 100644 modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentTests.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index acdfc50b5a13a..102da6ced007c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -618,7 +618,7 @@ public void testDefaultNamedXContents() { public void testProvidedNamedXContents() { List namedXContents = RestHighLevelClient.getProvidedNamedXContents(); - assertEquals(10, namedXContents.size()); + assertEquals(11, namedXContents.size()); Map, Integer> categories = new HashMap<>(); List names = new ArrayList<>(); for (NamedXContentRegistry.Entry namedXContent : namedXContents) { @@ -629,7 +629,7 @@ public void testProvidedNamedXContents() { } } assertEquals(3, categories.size()); - assertEquals(Integer.valueOf(2), categories.get(Aggregation.class)); + assertEquals(Integer.valueOf(3), categories.get(Aggregation.class)); assertTrue(names.contains(ChildrenAggregationBuilder.NAME)); assertTrue(names.contains(MatrixStatsAggregationBuilder.NAME)); assertEquals(Integer.valueOf(4), categories.get(EvaluationMetric.class)); diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/ParentJoinPlugin.java b/modules/parent-join/src/main/java/org/elasticsearch/join/ParentJoinPlugin.java index 2236662b5d54b..e2b84333447b4 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/ParentJoinPlugin.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/ParentJoinPlugin.java @@ -22,6 +22,8 @@ import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder; import org.elasticsearch.join.aggregations.InternalChildren; +import org.elasticsearch.join.aggregations.InternalParent; +import org.elasticsearch.join.aggregations.ParentAggregationBuilder; import org.elasticsearch.join.mapper.ParentJoinFieldMapper; import org.elasticsearch.join.query.HasChildQueryBuilder; import org.elasticsearch.join.query.HasParentQueryBuilder; @@ -51,9 +53,11 @@ public List> getQueries() { @Override public List getAggregations() { - return Collections.singletonList( - new AggregationSpec(ChildrenAggregationBuilder.NAME, ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse) - .addResultReader(InternalChildren::new) + return Arrays.asList( + new AggregationSpec(ChildrenAggregationBuilder.NAME, ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse) + .addResultReader(InternalChildren::new), + new AggregationSpec(ParentAggregationBuilder.NAME, ParentAggregationBuilder::new, ParentAggregationBuilder::parse) + .addResultReader(InternalParent::new) ); } diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java new file mode 100644 index 0000000000000..aba71946dd15c --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java @@ -0,0 +1,216 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.join.aggregations; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.LongObjectPagedHashMap; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A {@link BucketsAggregator} which resolves to the matching parent documents. + * + * It ensures that each parent only matches once per bucket. + */ +public class ChildrenToParentAggregator extends BucketsAggregator implements SingleBucketAggregator { + + static final ParseField TYPE_FIELD = new ParseField("type"); + + private final Weight childFilter; + private final Weight parentFilter; + private final ValuesSource.Bytes.WithOrdinals valuesSource; + + // Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, + // but then we don't have the reuse feature of BigArrays. + // Also if we know the highest possible value that a parent agg will create + // then we store multiple values into one slot + private final LongArray childrenOrdToBuckets; + + // Only pay the extra storage price if the a parentOrd has multiple buckets + // Most of the times a parent doesn't have multiple buckets, since there is + // only one document per parent ord, + // only in the case of terms agg if a parent doc has multiple terms per + // field this is needed: + private final LongObjectPagedHashMap childrenOrdToOtherBuckets; + private boolean multipleBucketsPerChildrenOrd = false; + + public ChildrenToParentAggregator(String name, AggregatorFactories factories, + SearchContext context, Aggregator parent, Query childFilter, + Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource, + long maxOrd, List pipelineAggregators, Map metaData) + throws IOException { + super(name, factories, context, parent, pipelineAggregators, metaData); + // these two filters are cached in the parser + this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); + this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); + this.childrenOrdToBuckets = context.bigArrays().newLongArray(maxOrd, false); + this.childrenOrdToBuckets.fill(0, maxOrd, -1); + this.childrenOrdToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays()); + this.valuesSource = valuesSource; + } + + @Override + public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException { + return new InternalParent(name, bucketDocCount(owningBucketOrdinal), + bucketAggregations(owningBucketOrdinal), pipelineAggregators(), metaData()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalParent(name, 0, buildEmptySubAggregations(), pipelineAggregators(), + metaData()); + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); + final Bits childDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), childFilter.scorerSupplier(ctx)); + return new LeafBucketCollector() { + + @Override + public void collect(int docId, long bucket) throws IOException { + if (childDocs.get(docId) && globalOrdinals.advanceExact(docId)) { + long globalOrdinal = globalOrdinals.nextOrd(); + assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; + if (globalOrdinal != -1) { + if (childrenOrdToBuckets.get(globalOrdinal) == -1) { + childrenOrdToBuckets.set(globalOrdinal, bucket); + } else { + long[] bucketOrds = childrenOrdToOtherBuckets.get(globalOrdinal); + if (bucketOrds != null) { + bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); + bucketOrds[bucketOrds.length - 1] = bucket; + childrenOrdToOtherBuckets.put(globalOrdinal, bucketOrds); + } else { + childrenOrdToOtherBuckets.put(globalOrdinal, new long[] { bucket }); + } + multipleBucketsPerChildrenOrd = true; + } + } + } + } + }; + } + + @Override + protected void doPostCollection() throws IOException { + IndexReader indexReader = context().searcher().getIndexReader(); + for (LeafReaderContext ctx : indexReader.leaves()) { + Scorer parentDocsScorer = parentFilter.scorer(ctx); + if (parentDocsScorer == null) { + continue; + } + DocIdSetIterator parentDocsIter = parentDocsScorer.iterator(); + + final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); + + final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); + // Set the scorer, since we now replay only the parent docIds + sub.setScorer(new Scorable() { + @Override + public float score() { + return 1f; + } + + @Override + public int docID() { + return parentDocsIter.docID(); + } + }); + + // TODO: this is unwanted allocation, just for initial verification + // of the implementation idea, probably needs to be done differently + // for production use so we do not cause memory churn here + //Set> seenParents = new HashSet<>(); + Map> seenParents = new HashMap<>(); + + final Bits liveDocs = ctx.reader().getLiveDocs(); + for (int docId = parentDocsIter + .nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = parentDocsIter + .nextDoc()) { + if (liveDocs != null && liveDocs.get(docId) == false) { + continue; + } + if (globalOrdinals.advanceExact(docId)) { + long globalOrdinal = globalOrdinals.nextOrd(); + assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; + long bucketOrd = childrenOrdToBuckets.get(globalOrdinal); + if (bucketOrd != -1) { + collectBucketIfUnique(sub, seenParents, docId, bucketOrd); + if (multipleBucketsPerChildrenOrd) { + long[] otherBucketOrds = childrenOrdToOtherBuckets.get(globalOrdinal); + if (otherBucketOrds != null) { + for (long otherBucketOrd : otherBucketOrds) { + // only collect each parentId once per bucket + collectBucketIfUnique(sub, seenParents, docId, otherBucketOrd); + } + } + } + } + } + } + } + } + + private void collectBucketIfUnique(LeafBucketCollector sub, Map> seenParents, + int docId, long bucketOrd) throws IOException { + // only collect each parentId once per bucket + Set seenBucketOrds = seenParents.computeIfAbsent(docId, integer -> new HashSet<>()); + if (seenBucketOrds.add(bucketOrd)) { + collectBucket(sub, docId, bucketOrd); + } + } + + @Override + protected void doClose() { + Releasables.close(childrenOrdToBuckets, childrenOrdToOtherBuckets); + } +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/InternalParent.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/InternalParent.java new file mode 100644 index 0000000000000..f61589b1d9aaf --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/InternalParent.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Results of the {@link ChildrenToParentAggregator}. + */ +public class InternalParent extends InternalSingleBucketAggregation implements Parent { + public InternalParent(String name, long docCount, InternalAggregations aggregations, List pipelineAggregators, + Map metaData) { + super(name, docCount, aggregations, pipelineAggregators, metaData); + } + + /** + * Read from a stream. + */ + public InternalParent(StreamInput in) throws IOException { + super(in); + } + + @Override + public String getWriteableName() { + return ParentAggregationBuilder.NAME; + } + + @Override + protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) { + return new InternalParent(name, docCount, subAggregations, pipelineAggregators(), getMetaData()); + } +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/JoinAggregationBuilders.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/JoinAggregationBuilders.java index 73522a68b4595..4d4708cbcbe9b 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/JoinAggregationBuilders.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/JoinAggregationBuilders.java @@ -26,4 +26,11 @@ public abstract class JoinAggregationBuilders { public static ChildrenAggregationBuilder children(String name, String childType) { return new ChildrenAggregationBuilder(name, childType); } + + /** + * Create a new {@link Parent} aggregation with the given name. + */ + public static ParentAggregationBuilder parent(String name, String childType) { + return new ParentAggregationBuilder(name, childType); + } } diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/Parent.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/Parent.java new file mode 100644 index 0000000000000..1942798b51338 --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/Parent.java @@ -0,0 +1,28 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; + +/** + * An single bucket aggregation that translates child documents to their parent documents. + */ +public interface Parent extends SingleBucketAggregation { +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregationBuilder.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregationBuilder.java new file mode 100644 index 0000000000000..495a5c0f9ad3f --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregationBuilder.java @@ -0,0 +1,176 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import org.apache.lucene.search.Query; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.fielddata.plain.SortedSetDVOrdinalsIndexFieldData; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.join.mapper.ParentIdFieldMapper; +import org.elasticsearch.join.mapper.ParentJoinFieldMapper; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.support.FieldContext; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.WithOrdinals; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class ParentAggregationBuilder + extends ValuesSourceAggregationBuilder { + + public static final String NAME = "parent"; + + private final String childType; + private Query parentFilter; + private Query childFilter; + + /** + * @param name + * the name of this aggregation + * @param childType + * the type of children documents + */ + public ParentAggregationBuilder(String name, String childType) { + super(name, ValuesSourceType.BYTES, ValueType.STRING); + if (childType == null) { + throw new IllegalArgumentException("[childType] must not be null: [" + name + "]"); + } + this.childType = childType; + } + + protected ParentAggregationBuilder(ParentAggregationBuilder clone, + Builder factoriesBuilder, Map metaData) { + super(clone, factoriesBuilder, metaData); + this.childType = clone.childType; + this.childFilter = clone.childFilter; + this.parentFilter = clone.parentFilter; + } + + @Override + protected AggregationBuilder shallowCopy(Builder factoriesBuilder, Map metaData) { + return new ParentAggregationBuilder(this, factoriesBuilder, metaData); + } + + /** + * Read from a stream. + */ + public ParentAggregationBuilder(StreamInput in) throws IOException { + super(in, ValuesSourceType.BYTES, ValueType.STRING); + childType = in.readString(); + } + + @Override + protected void innerWriteTo(StreamOutput out) throws IOException { + out.writeString(childType); + } + + @Override + protected ValuesSourceAggregatorFactory innerBuild(SearchContext context, + ValuesSourceConfig config, + AggregatorFactory parent, + Builder subFactoriesBuilder) throws IOException { + return new ParentAggregatorFactory(name, config, childFilter, parentFilter, context, parent, + subFactoriesBuilder, metaData); + } + + @Override + protected ValuesSourceConfig resolveConfig(SearchContext context) { + ValuesSourceConfig config = new ValuesSourceConfig<>(ValuesSourceType.BYTES); + joinFieldResolveConfig(context, config); + return config; + } + + private void joinFieldResolveConfig(SearchContext context, ValuesSourceConfig config) { + ParentJoinFieldMapper parentJoinFieldMapper = ParentJoinFieldMapper.getMapper(context.mapperService()); + ParentIdFieldMapper parentIdFieldMapper = parentJoinFieldMapper.getParentIdFieldMapper(childType, false); + if (parentIdFieldMapper != null) { + parentFilter = parentIdFieldMapper.getParentFilter(); + childFilter = parentIdFieldMapper.getChildFilter(childType); + MappedFieldType fieldType = parentIdFieldMapper.fieldType(); + final SortedSetDVOrdinalsIndexFieldData fieldData = context.getForField(fieldType); + config.fieldContext(new FieldContext(fieldType.name(), fieldData, fieldType)); + } else { + config.unmapped(true); + } + } + + @Override + protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(ChildrenToParentAggregator.TYPE_FIELD.getPreferredName(), childType); + return builder; + } + + public static ParentAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException { + String childType = null; + + XContentParser.Token token; + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("type".equals(currentFieldName)) { + childType = parser.text(); + } else { + throw new ParsingException(parser.getTokenLocation(), + "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); + } + } else { + throw new ParsingException(parser.getTokenLocation(), "Unexpected token " + token + " in [" + aggregationName + "]."); + } + } + + if (childType == null) { + throw new ParsingException(parser.getTokenLocation(), + "Missing [child_type] field for parent aggregation [" + aggregationName + "]"); + } + + return new ParentAggregationBuilder(aggregationName, childType); + } + + @Override + protected int innerHashCode() { + return Objects.hash(childType); + } + + @Override + protected boolean innerEquals(Object obj) { + ParentAggregationBuilder other = (ParentAggregationBuilder) obj; + return Objects.equals(childType, other.childType); + } + + @Override + public String getType() { + return NAME; + } +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java new file mode 100644 index 0000000000000..ee7ea6cd21ead --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import org.apache.lucene.search.Query; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.NonCollectingAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.WithOrdinals; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class ParentAggregatorFactory + extends ValuesSourceAggregatorFactory { + + private final Query parentFilter; + private final Query childFilter; + + public ParentAggregatorFactory(String name, ValuesSourceConfig config, + Query childFilter, Query parentFilter, SearchContext context, AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, Map metaData) throws IOException { + super(name, config, context, parent, subFactoriesBuilder, metaData); + this.childFilter = childFilter; + this.parentFilter = parentFilter; + } + + @Override + protected Aggregator createUnmapped(Aggregator parent, List pipelineAggregators, Map metaData) + throws IOException { + return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) { + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalParent(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData()); + } + + }; + } + + @Override + protected Aggregator doCreateInternal(WithOrdinals valuesSource, Aggregator children, + boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) + throws IOException { + long maxOrd = valuesSource.globalMaxOrd(context.searcher()); + return new ChildrenToParentAggregator(name, factories, context, children, childFilter, + parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData); + } +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParsedParent.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParsedParent.java new file mode 100644 index 0000000000000..40393c8323505 --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParsedParent.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.join.aggregations; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.bucket.ParsedSingleBucketAggregation; + +import java.io.IOException; + +public class ParsedParent extends ParsedSingleBucketAggregation implements Parent { + + @Override + public String getType() { + return ParentAggregationBuilder.NAME; + } + + public static ParsedParent fromXContent(XContentParser parser, final String name) throws IOException { + return parseXContent(parser, new ParsedParent(), name); + } +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/spi/ParentJoinNamedXContentProvider.java b/modules/parent-join/src/main/java/org/elasticsearch/join/spi/ParentJoinNamedXContentProvider.java index 250241014613c..03924471d5a91 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/spi/ParentJoinNamedXContentProvider.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/spi/ParentJoinNamedXContentProvider.java @@ -23,20 +23,26 @@ import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder; +import org.elasticsearch.join.aggregations.ParentAggregationBuilder; import org.elasticsearch.join.aggregations.ParsedChildren; +import org.elasticsearch.join.aggregations.ParsedParent; import org.elasticsearch.plugins.spi.NamedXContentProvider; import org.elasticsearch.search.aggregations.Aggregation; +import java.util.Arrays; import java.util.List; -import static java.util.Collections.singletonList; - public class ParentJoinNamedXContentProvider implements NamedXContentProvider { @Override public List getNamedXContentParsers() { - ParseField parseField = new ParseField(ChildrenAggregationBuilder.NAME); - ContextParser contextParser = (p, name) -> ParsedChildren.fromXContent(p, (String) name); - return singletonList(new NamedXContentRegistry.Entry(Aggregation.class, parseField, contextParser)); + ParseField parseFieldChildren = new ParseField(ChildrenAggregationBuilder.NAME); + ParseField parseFieldParent = new ParseField(ParentAggregationBuilder.NAME); + ContextParser contextParserChildren = (p, name) -> ParsedChildren.fromXContent(p, (String) name); + ContextParser contextParserParent = (p, name) -> ParsedParent.fromXContent(p, (String) name); + return Arrays.asList( + new NamedXContentRegistry.Entry(Aggregation.class, parseFieldChildren, contextParserChildren), + new NamedXContentRegistry.Entry(Aggregation.class, parseFieldParent, contextParserParent) + ); } } diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildIT.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildIT.java new file mode 100644 index 0000000000000..45518b8b7c04c --- /dev/null +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildIT.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.join.query.ParentChildTestCase; +import org.junit.Before; + +/** + * Small base test-class which combines stuff used for Children and Parent aggregation tests + */ +public abstract class AbstractParentChildIT extends ParentChildTestCase { + protected final Map categoryToControl = new HashMap<>(); + protected final Map articleToControl = new HashMap<>(); + + @Before + public void setupCluster() throws Exception { + assertAcked( + prepareCreate("test") + .addMapping("doc", + addFieldMappings(buildParentJoinFieldMappingFromSimplifiedDef("join_field", true, "article", "comment"), + "commenter", "keyword", "category", "keyword")) + ); + + List requests = new ArrayList<>(); + String[] uniqueCategories = new String[randomIntBetween(1, 25)]; + for (int i = 0; i < uniqueCategories.length; i++) { + uniqueCategories[i] = Integer.toString(i); + } + int catIndex = 0; + + int numParentDocs = randomIntBetween(uniqueCategories.length, uniqueCategories.length * 5); + for (int i = 0; i < numParentDocs; i++) { + String id = "article-" + i; + + // TODO: this array is always of length 1, and testChildrenAggs fails if this is changed + String[] categories = new String[randomIntBetween(1,1)]; + for (int j = 0; j < categories.length; j++) { + String category = categories[j] = uniqueCategories[catIndex++ % uniqueCategories.length]; + Control control = categoryToControl.computeIfAbsent(category, Control::new); + control.articleIds.add(id); + articleToControl.put(id, new ParentControl(category)); + } + + IndexRequestBuilder indexRequest = createIndexRequest("test", "article", id, null, "category", categories, "randomized", true); + requests.add(indexRequest); + } + + String[] commenters = new String[randomIntBetween(5, 50)]; + for (int i = 0; i < commenters.length; i++) { + commenters[i] = Integer.toString(i); + } + + int id = 0; + for (Control control : categoryToControl.values()) { + for (String articleId : control.articleIds) { + int numChildDocsPerParent = randomIntBetween(0, 5); + for (int i = 0; i < numChildDocsPerParent; i++) { + String commenter = commenters[id % commenters.length]; + String idValue = "comment-" + id++; + control.commentIds.add(idValue); + Set ids = control.commenterToCommentId.computeIfAbsent(commenter, k -> new HashSet<>()); + ids.add(idValue); + + articleToControl.get(articleId).commentIds.add(idValue); + + IndexRequestBuilder indexRequest = createIndexRequest("test", "comment", idValue, + articleId, "commenter", commenter, "randomized", true); + requests.add(indexRequest); + } + } + } + + requests.add(createIndexRequest("test", "article", "a", null, "category", new String[]{"a"}, "randomized", false)); + requests.add(createIndexRequest("test", "article", "b", null, "category", new String[]{"a", "b"}, "randomized", false)); + requests.add(createIndexRequest("test", "article", "c", null, "category", new String[]{"a", "b", "c"}, "randomized", false)); + requests.add(createIndexRequest("test", "article", "d", null, "category", new String[]{"c"}, "randomized", false)); + requests.add(createIndexRequest("test", "comment", "e", "a")); + requests.add(createIndexRequest("test", "comment", "f", "c")); + + indexRandom(true, requests); + ensureSearchable("test"); + } + + + protected static final class Control { + + final String category; + final Set articleIds = new HashSet<>(); + final Set commentIds = new HashSet<>(); + final Map> commenterToCommentId = new HashMap<>(); + + private Control(String category) { + this.category = category; + } + } + + protected static final class ParentControl { + final String category; + final Set commentIds = new HashSet<>(); + + private ParentControl(String category) { + this.category = category; + } + } +} diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java index 61f00647f3c06..b2c2f6f9ea511 100644 --- a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java @@ -25,7 +25,6 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.join.query.ParentChildTestCase; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -33,11 +32,8 @@ import org.elasticsearch.search.aggregations.metrics.Sum; import org.elasticsearch.search.aggregations.metrics.TopHits; import org.elasticsearch.search.sort.SortOrder; -import org.junit.Before; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -58,80 +54,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; -public class ChildrenIT extends ParentChildTestCase { - - - private static final Map categoryToControl = new HashMap<>(); - - - @Before - public void setupCluster() throws Exception { - categoryToControl.clear(); - assertAcked( - prepareCreate("test") - .addMapping("doc", - addFieldMappings(buildParentJoinFieldMappingFromSimplifiedDef("join_field", true, "article", "comment"), - "commenter", "keyword", "category", "keyword")) - ); - - List requests = new ArrayList<>(); - String[] uniqueCategories = new String[randomIntBetween(1, 25)]; - for (int i = 0; i < uniqueCategories.length; i++) { - uniqueCategories[i] = Integer.toString(i); - } - int catIndex = 0; - - int numParentDocs = randomIntBetween(uniqueCategories.length, uniqueCategories.length * 5); - for (int i = 0; i < numParentDocs; i++) { - String id = "article-" + i; - - // TODO: this array is always of length 1, and testChildrenAggs fails if this is changed - String[] categories = new String[randomIntBetween(1,1)]; - for (int j = 0; j < categories.length; j++) { - String category = categories[j] = uniqueCategories[catIndex++ % uniqueCategories.length]; - Control control = categoryToControl.get(category); - if (control == null) { - categoryToControl.put(category, control = new Control()); - } - control.articleIds.add(id); - } - - requests.add(createIndexRequest("test", "article", id, null, "category", categories, "randomized", true)); - } - - String[] commenters = new String[randomIntBetween(5, 50)]; - for (int i = 0; i < commenters.length; i++) { - commenters[i] = Integer.toString(i); - } - - int id = 0; - for (Control control : categoryToControl.values()) { - for (String articleId : control.articleIds) { - int numChildDocsPerParent = randomIntBetween(0, 5); - for (int i = 0; i < numChildDocsPerParent; i++) { - String commenter = commenters[id % commenters.length]; - String idValue = "comment-" + id++; - control.commentIds.add(idValue); - Set ids = control.commenterToCommentId.get(commenter); - if (ids == null) { - control.commenterToCommentId.put(commenter, ids = new HashSet<>()); - } - ids.add(idValue); - requests.add(createIndexRequest("test", "comment", idValue, articleId, "commenter", commenter)); - } - } - } - - requests.add(createIndexRequest("test", "article", "a", null, "category", new String[]{"a"}, "randomized", false)); - requests.add(createIndexRequest("test", "article", "b", null, "category", new String[]{"a", "b"}, "randomized", false)); - requests.add(createIndexRequest("test", "article", "c", null, "category", new String[]{"a", "b", "c"}, "randomized", false)); - requests.add(createIndexRequest("test", "article", "d", null, "category", new String[]{"c"}, "randomized", false)); - requests.add(createIndexRequest("test", "comment", "e", "a")); - requests.add(createIndexRequest("test", "comment", "f", "c")); - - indexRandom(true, requests); - ensureSearchable("test"); - } +public class ChildrenIT extends AbstractParentChildIT { public void testChildrenAggs() throws Exception { SearchResponse searchResponse = client().prepareSearch("test") @@ -455,10 +378,4 @@ public void testPostCollectAllLeafReaders() throws Exception { children = parents.getBuckets().get(0).getAggregations().get("child_docs"); assertThat(children.getDocCount(), equalTo(2L)); } - - private static final class Control { - final Set articleIds = new HashSet<>(); - final Set commentIds = new HashSet<>(); - final Map> commenterToCommentId = new HashMap<>(); - } } diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java new file mode 100644 index 0000000000000..072ad1ee1ad6e --- /dev/null +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java @@ -0,0 +1,337 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermInSetQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.mapper.ContentPath; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.Mapper; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.join.mapper.MetaJoinFieldMapper; +import org.elasticsearch.join.mapper.ParentJoinFieldMapper; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalMin; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Consumer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ChildrenToParentAggregatorTests extends AggregatorTestCase { + + private static final String CHILD_TYPE = "child_type"; + private static final String PARENT_TYPE = "parent_type"; + + public void testNoDocs() throws IOException { + Directory directory = newDirectory(); + + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + // intentionally not writing any docs + indexWriter.close(); + IndexReader indexReader = DirectoryReader.open(directory); + + testCase(new MatchAllDocsQuery(), newSearcher(indexReader, false, true), childrenToParent -> { + assertEquals(0, childrenToParent.getDocCount()); + Aggregation parentAggregation = childrenToParent.getAggregations().get("in_parent"); + assertEquals(0, childrenToParent.getDocCount()); + assertNotNull("Aggregations: " + childrenToParent.getAggregations().asMap(), parentAggregation); + assertEquals(Double.POSITIVE_INFINITY, ((InternalMin) parentAggregation).getValue(), Double.MIN_VALUE); + }); + indexReader.close(); + directory.close(); + } + + public void testParentChild() throws IOException { + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + + final Map> expectedParentChildRelations = setupIndex(indexWriter); + indexWriter.close(); + + IndexReader indexReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), + new ShardId(new Index("foo", "_na_"), 1)); + // TODO set "maybeWrap" to true for IndexSearcher once #23338 is resolved + IndexSearcher indexSearcher = newSearcher(indexReader, false, true); + + // verify with all documents + testCase(new MatchAllDocsQuery(), indexSearcher, parent -> { + int expectedTotalParents = 0; + int expectedMinValue = Integer.MAX_VALUE; + for (Tuple expectedValues : expectedParentChildRelations.values()) { + expectedTotalParents++; + expectedMinValue = Math.min(expectedMinValue, expectedValues.v2()); + } + assertEquals("Having " + parent.getDocCount() + " docs and aggregation results: " + + parent.getAggregations().asMap(), + expectedTotalParents, parent.getDocCount()); + assertEquals(expectedMinValue, ((InternalMin) parent.getAggregations().get("in_parent")).getValue(), Double.MIN_VALUE); + }); + + // verify for each children + for (String parent : expectedParentChildRelations.keySet()) { + testCase(new TermInSetQuery(IdFieldMapper.NAME, Uid.encodeId("child0_" + parent)), + indexSearcher, aggregation -> { + assertEquals("Expected one result for min-aggregation for parent: " + parent + + ", but had aggregation-results: " + aggregation, + 1, aggregation.getDocCount()); + assertEquals(expectedParentChildRelations.get(parent).v2(), + ((InternalMin) aggregation.getAggregations().get("in_parent")).getValue(), Double.MIN_VALUE); + }); + } + + indexReader.close(); + directory.close(); + } + + + public void testParentChildTerms() throws IOException { + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + + final Map> expectedParentChildRelations = setupIndex(indexWriter); + indexWriter.close(); + + SortedMap entries = new TreeMap<>(); + for (Tuple value : expectedParentChildRelations.values()) { + Long l = entries.computeIfAbsent(value.v2(), integer -> 0L); + entries.put(value.v2(), l+1); + } + List> sortedValues = new ArrayList<>(entries.entrySet()); + sortedValues.sort((o1, o2) -> { + // sort larger values first + int ret = o2.getValue().compareTo(o1.getValue()); + if(ret != 0) { + return ret; + } + + // on equal value, sort by key + return o1.getKey().compareTo(o2.getKey()); + }); + + IndexReader indexReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), + new ShardId(new Index("foo", "_na_"), 1)); + // TODO set "maybeWrap" to true for IndexSearcher once #23338 is resolved + IndexSearcher indexSearcher = newSearcher(indexReader, false, true); + + // verify a terms-aggregation inside the parent-aggregation + testCaseTerms(new MatchAllDocsQuery(), indexSearcher, parent -> { + assertNotNull(parent); + LongTerms valueTerms = parent.getAggregations().get("value_terms"); + assertNotNull(valueTerms); + + List valueTermsBuckets = valueTerms.getBuckets(); + assertNotNull(valueTermsBuckets); + assertEquals("Had: " + parent, sortedValues.size(), valueTermsBuckets.size()); + int i = 0; + for (Map.Entry entry : sortedValues) { + LongTerms.Bucket bucket = valueTermsBuckets.get(i); + assertEquals(entry.getKey().longValue(), bucket.getKeyAsNumber()); + assertEquals(entry.getValue(), (Long)bucket.getDocCount()); + + i++; + } + }); + + indexReader.close(); + directory.close(); + } + + public void testTermsParentChildTerms() throws IOException { + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + + final Map> expectedParentChildRelations = setupIndex(indexWriter); + indexWriter.close(); + + SortedMap sortedValues = new TreeMap<>(); + for (Tuple value : expectedParentChildRelations.values()) { + Long l = sortedValues.computeIfAbsent(value.v2(), integer -> 0L); + sortedValues.put(value.v2(), l+1); + } + + IndexReader indexReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), + new ShardId(new Index("foo", "_na_"), 1)); + // TODO set "maybeWrap" to true for IndexSearcher once #23338 is resolved + IndexSearcher indexSearcher = newSearcher(indexReader, false, true); + + // verify a terms-aggregation inside the parent-aggregation which itself is inside a + // terms-aggregation on the child-documents + testCaseTermsParentTerms(new MatchAllDocsQuery(), indexSearcher, longTerms -> { + assertNotNull(longTerms); + + // TODO: test some more here + for (LongTerms.Bucket bucket : longTerms.getBuckets()) { + assertNotNull(bucket); + assertNotNull(bucket.getKeyAsString()); + } + }); + + /*for(int j = 0;j < 20;j++) { + long start = System.currentTimeMillis(); + for (int i = 0; i < 100; i++) { + testCaseTermsParentTerms(new MatchAllDocsQuery(), indexSearcher, Assert::assertNotNull); + } + + System.out.println("Duration: " + (System.currentTimeMillis() - start)); + }*/ + + indexReader.close(); + directory.close(); + } + + private static Map> setupIndex(RandomIndexWriter iw) throws IOException { + Map> expectedValues = new HashMap<>(); + int numParents = randomIntBetween(1, 10); + for (int i = 0; i < numParents; i++) { + String parent = "parent" + i; + int randomValue = randomIntBetween(0, 100); + List parentDocument = createParentDocument(parent, randomValue); + /*long parentDocId =*/ iw.addDocument(parentDocument); + //System.out.println("Parent: " + parent + ": " + randomValue + ", id: " + parentDocId); + int numChildren = randomIntBetween(1, 10); + int minValue = Integer.MAX_VALUE; + for (int c = 0; c < numChildren; c++) { + minValue = Math.min(minValue, randomValue); + int randomSubValue = randomIntBetween(0, 100); + List childDocument = createChildDocument("child" + c + "_" + parent, parent, randomSubValue); + /*long childDocId =*/ iw.addDocument(childDocument); + //System.out.println("Child: " + "child" + c + "_" + parent + ": " + randomSubValue + ", id: " + childDocId); + } + expectedValues.put(parent, new Tuple<>(numChildren, minValue)); + } + return expectedValues; + } + + private static List createParentDocument(String id, int value) { + return Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(id), Field.Store.NO), + new StringField("join_field", PARENT_TYPE, Field.Store.NO), + createJoinField(PARENT_TYPE, id), + new SortedNumericDocValuesField("number", value) + ); + } + + private static List createChildDocument(String childId, String parentId, int value) { + return Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(childId), Field.Store.NO), + new StringField("join_field", CHILD_TYPE, Field.Store.NO), + createJoinField(PARENT_TYPE, parentId), + new SortedNumericDocValuesField("subNumber", value) + ); + } + + private static SortedDocValuesField createJoinField(String parentType, String id) { + return new SortedDocValuesField("join_field#" + parentType, new BytesRef(id)); + } + + @Override + protected MapperService mapperServiceMock() { + ParentJoinFieldMapper joinFieldMapper = createJoinFieldMapper(); + MapperService mapperService = mock(MapperService.class); + MetaJoinFieldMapper.MetaJoinFieldType metaJoinFieldType = mock(MetaJoinFieldMapper.MetaJoinFieldType.class); + when(metaJoinFieldType.getMapper()).thenReturn(joinFieldMapper); + when(mapperService.fullName("_parent_join")).thenReturn(metaJoinFieldType); + return mapperService; + } + + private static ParentJoinFieldMapper createJoinFieldMapper() { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + return new ParentJoinFieldMapper.Builder("join_field") + .addParent(PARENT_TYPE, Collections.singleton(CHILD_TYPE)) + .build(new Mapper.BuilderContext(settings, new ContentPath(0))); + } + + private void testCase(Query query, IndexSearcher indexSearcher, Consumer verify) + throws IOException { + + ParentAggregationBuilder aggregationBuilder = new ParentAggregationBuilder("_name", CHILD_TYPE); + aggregationBuilder.subAggregation(new MinAggregationBuilder("in_parent").field("number")); + + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + fieldType.setName("number"); + InternalParent result = search(indexSearcher, query, aggregationBuilder, fieldType); + verify.accept(result); + } + + private void testCaseTerms(Query query, IndexSearcher indexSearcher, Consumer verify) + throws IOException { + + ParentAggregationBuilder aggregationBuilder = new ParentAggregationBuilder("_name", CHILD_TYPE); + aggregationBuilder.subAggregation(new TermsAggregationBuilder("value_terms", ValueType.LONG).field("number")); + + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + fieldType.setName("number"); + InternalParent result = search(indexSearcher, query, aggregationBuilder, fieldType); + verify.accept(result); + } + + // run a terms aggregation on the number in child-documents, then a parent aggregation and then terms on the parent-number + private void testCaseTermsParentTerms(Query query, IndexSearcher indexSearcher, Consumer verify) + throws IOException { + AggregationBuilder aggregationBuilder = + new TermsAggregationBuilder("subvalue_terms", ValueType.LONG).field("subNumber"). + subAggregation(new ParentAggregationBuilder("to_parent", CHILD_TYPE). + subAggregation(new TermsAggregationBuilder("value_terms", ValueType.LONG).field("number"))); + + MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + fieldType.setName("number"); + MappedFieldType subFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + subFieldType.setName("subNumber"); + LongTerms result = search(indexSearcher, query, aggregationBuilder, fieldType, subFieldType); + verify.accept(result); + } +} diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/InternalParentTests.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/InternalParentTests.java new file mode 100644 index 0000000000000..be4792a867d33 --- /dev/null +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/InternalParentTests.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.NamedXContentRegistry.Entry; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalSingleBucketAggregationTestCase; +import org.elasticsearch.search.aggregations.bucket.ParsedSingleBucketAggregation; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +public class InternalParentTests extends InternalSingleBucketAggregationTestCase { + + @Override + protected List getNamedXContents() { + List extendedNamedXContents = new ArrayList<>(super.getNamedXContents()); + extendedNamedXContents.add(new Entry(Aggregation.class, new ParseField(ParentAggregationBuilder.NAME), + (p, c) -> ParsedParent.fromXContent(p, (String) c))); + return extendedNamedXContents ; + } + + @Override + protected InternalParent createTestInstance(String name, long docCount, InternalAggregations aggregations, + List pipelineAggregators, Map metaData) { + return new InternalParent(name, docCount, aggregations, pipelineAggregators, metaData); + } + + @Override + protected void extraAssertReduced(InternalParent reduced, List inputs) { + // Nothing extra to assert + } + + @Override + protected Reader instanceReader() { + return InternalParent::new; + } + + @Override + protected Class implementationClass() { + return ParsedParent.class; + } +} diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java new file mode 100644 index 0000000000000..d587e0945c946 --- /dev/null +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.join.aggregations.JoinAggregationBuilders.parent; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; + +public class ParentIT extends AbstractParentChildIT { + + public void testSimpleParentAgg() throws Exception { + final SearchRequestBuilder searchRequest = client().prepareSearch("test") + .setSize(10000) + .setQuery(matchQuery("randomized", true)) + .addAggregation( + parent("to_article", "comment") + .subAggregation( + terms("category").field("category").size(10000))); + SearchResponse searchResponse = searchRequest.get(); + assertSearchResponse(searchResponse); + + long articlesWithComment = articleToControl.values().stream().filter( + parentControl -> !parentControl.commentIds.isEmpty() + ).count(); + + Parent parentAgg = searchResponse.getAggregations().get("to_article"); + assertThat("Request: " + searchRequest + "\nResponse: " + searchResponse + "\n", + parentAgg.getDocCount(), equalTo(articlesWithComment)); + Terms categoryTerms = parentAgg.getAggregations().get("category"); + long categoriesWithComments = categoryToControl.values().stream().filter( + control -> !control.commentIds.isEmpty()).count(); + assertThat("Buckets: " + categoryTerms.getBuckets().stream().map( + (Function) MultiBucketsAggregation.Bucket::getKeyAsString).collect(Collectors.toList()) + + "\nCategories: " + categoryToControl.keySet(), + (long)categoryTerms.getBuckets().size(), equalTo(categoriesWithComments)); + for (Map.Entry entry : categoryToControl.entrySet()) { + // no children for this category -> no entry in the child to parent-aggregation + if(entry.getValue().commentIds.isEmpty()) { + assertNull(categoryTerms.getBucketByKey(entry.getKey())); + continue; + } + + final Terms.Bucket categoryBucket = categoryTerms.getBucketByKey(entry.getKey()); + assertNotNull("Failed for category " + entry.getKey(), + categoryBucket); + assertThat("Failed for category " + entry.getKey(), + categoryBucket.getKeyAsString(), equalTo(entry.getKey())); + + // count all articles in this category which have at least one comment + long articlesForCategory = articleToControl.values().stream(). + // only articles with this category + filter(parentControl -> parentControl.category.equals(entry.getKey())). + // only articles which have comments + filter(parentControl -> !parentControl.commentIds.isEmpty()). + count(); + assertThat("Failed for category " + entry.getKey(), + categoryBucket.getDocCount(), equalTo(articlesForCategory)); + } + } + + public void testParentAggs() throws Exception { + final SearchRequestBuilder searchRequest = client().prepareSearch("test") + .setSize(10000) + .setQuery(matchQuery("randomized", true)) + .addAggregation( + terms("commenter").field("commenter").size(10000).subAggregation(parent("to_article", "comment") + .subAggregation( + terms("category").field("category").size(10000).subAggregation( + topHits("top_category") + )) + ) + ); + SearchResponse searchResponse = searchRequest.get(); + assertSearchResponse(searchResponse); + + final Set commenters = getCommenters(); + final Map> commenterToComments = getCommenterToComments(); + + Terms categoryTerms = searchResponse.getAggregations().get("commenter"); + assertThat("Request: " + searchRequest + "\nResponse: " + searchResponse + "\n", + categoryTerms.getBuckets().size(), equalTo(commenters.size())); + for (String commenter : commenters) { + Terms.Bucket categoryBucket = categoryTerms.getBucketByKey(commenter); + assertThat(categoryBucket.getKeyAsString(), equalTo(commenter)); + assertThat(categoryBucket.getDocCount(), equalTo((long) commenterToComments.get(commenter).size())); + + Parent childrenBucket = categoryBucket.getAggregations().get("to_article"); + assertThat(childrenBucket.getName(), equalTo("to_article")); + + // TODO: verify some more + /*assertThat(childrenBucket.getDocCount(), equalTo((long) entry1.getValue().commentIds.size())); + assertThat(((InternalAggregation)childrenBucket).getProperty("_count"), + equalTo((long) entry1.getValue().commentIds.size())); + + Terms commentersTerms = childrenBucket.getAggregations().get("category"); + assertThat(((InternalAggregation)childrenBucket).getProperty("category"), sameInstance(commentersTerms)); + assertThat(commentersTerms.getBuckets().size(), equalTo(entry1.getValue().commenterToCommentId.size())); + for (Map.Entry> entry2 : entry1.getValue().commenterToCommentId.entrySet()) { + Terms.Bucket commentBucket = commentersTerms.getBucketByKey(entry2.getKey()); + assertThat(commentBucket.getKeyAsString(), equalTo(entry2.getKey())); + assertThat(commentBucket.getDocCount(), equalTo((long) entry2.getValue().size())); + + TopHits topHits = commentBucket.getAggregations().get("top_commenters"); + for (SearchHit searchHit : topHits.getHits().getHits()) { + assertThat(entry2.getValue().contains(searchHit.getId()), is(true)); + } + }*/ + } + } + + private Set getCommenters() { + return categoryToControl.values().stream().flatMap( + (Function>) control -> control.commenterToCommentId.keySet().stream()). + collect(Collectors.toSet()); + } + + private Map> getCommenterToComments() { + final Map> commenterToComments = new HashMap<>(); + for (Control control : categoryToControl.values()) { + for (Map.Entry> entry : control.commenterToCommentId.entrySet()) { + final Set comments = commenterToComments.computeIfAbsent(entry.getKey(), s -> new HashSet<>()); + comments.addAll(entry.getValue()); + } + } + return commenterToComments; + } + +// public void testNonExistingParentType() throws Exception { +// SearchResponse searchResponse = client().prepareSearch("test") +// .addAggregation( +// parent("non-existing", "xyz") +// ).get(); +// assertSearchResponse(searchResponse); +// +// Parent parent = searchResponse.getAggregations().get("non-existing"); +// assertThat(parent.getName(), equalTo("non-existing")); +// assertThat(parent.getDocCount(), equalTo(0L)); +// } + + + public void testTermsParentAggTerms() throws Exception { + final SearchRequestBuilder searchRequest = client().prepareSearch("test") + .setSize(10000) + .setQuery(matchQuery("randomized", true)) + .addAggregation( + terms("to_commenter").field("commenter").size(10000).subAggregation( + parent("to_article", "comment") + .subAggregation( + terms("category").field("category").size(10000)))); + SearchResponse searchResponse = searchRequest.get(); + assertSearchResponse(searchResponse); + + final Set commenters = getCommenters(); + final Map> commenterToComments = getCommenterToComments(); + + Terms commentersAgg = searchResponse.getAggregations().get("to_commenter"); + assertThat("Request: " + searchRequest + "\nResponse: " + searchResponse + "\n", + commentersAgg.getBuckets().size(), equalTo(commenters.size())); + for (Terms.Bucket commenterBucket : commentersAgg.getBuckets()) { + Set comments = commenterToComments.get(commenterBucket.getKeyAsString()); + assertNotNull(comments); + assertThat("Failed for commenter " + commenterBucket.getKeyAsString(), + commenterBucket.getDocCount(), equalTo((long)comments.size())); + + Parent articleAgg = commenterBucket.getAggregations().get("to_article"); + assertThat(articleAgg.getName(), equalTo("to_article")); + // find all articles for the comments for the current commenter + Set articles = articleToControl.values().stream().flatMap( + (Function>) parentControl -> parentControl.commentIds.stream(). + filter(comments::contains) + ).collect(Collectors.toSet()); + + assertThat(articleAgg.getDocCount(), equalTo((long)articles.size())); + + // TODO: add more verification + } + } +} diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentTests.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentTests.java new file mode 100644 index 0000000000000..1df36d28b49e5 --- /dev/null +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentTests.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import java.util.Collection; +import java.util.Collections; + +import org.elasticsearch.join.ParentJoinPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.aggregations.BaseAggregationTestCase; + +public class ParentTests extends BaseAggregationTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singleton(ParentJoinPlugin.class); + } + + @Override + protected ParentAggregationBuilder createTestAggregatorBuilder() { + String name = randomAlphaOfLengthBetween(3, 20); + String parentType = randomAlphaOfLengthBetween(5, 40); + return new ParentAggregationBuilder(name, parentType); + } +} From a86a159024b6ac726f0511887796dff9a0f91860 Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Thu, 4 Oct 2018 18:58:23 +0200 Subject: [PATCH 2/7] Add check to not collect buckets multiple times if not needed As stated in the PR, we can avoid collecting buckets into the multiple-array as long as it is the same bucket as before --- .../join/aggregations/ChildrenToParentAggregator.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java index aba71946dd15c..798fc4be93bc0 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java @@ -121,9 +121,15 @@ public void collect(int docId, long bucket) throws IOException { long globalOrdinal = globalOrdinals.nextOrd(); assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; if (globalOrdinal != -1) { - if (childrenOrdToBuckets.get(globalOrdinal) == -1) { + long bucketPrev = childrenOrdToBuckets.get(globalOrdinal); + + // add bucket if none is set, + if (bucketPrev == -1) { childrenOrdToBuckets.set(globalOrdinal, bucket); - } else { + } else if (bucketPrev != bucket) { + // otherwise store it in the array, but only if the bucket is different than + // the previous one for this childDoc, as we only need one occurrence + // of child -> parent for this aggregation long[] bucketOrds = childrenOrdToOtherBuckets.get(globalOrdinal); if (bucketOrds != null) { bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); From fbcf9b007dcf047bcad71122d532ae20b7d3ff60 Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Thu, 4 Oct 2018 18:53:19 +0200 Subject: [PATCH 3/7] Combine parent-join aggregator functionality and remove seenParents We can combine both aggregators into an abstract class, only the filter needs to be defined differently. Also the lookup via seenParents is not necessary any more after the previous change. --- .../AbstractParentChildAggregator.java | 172 ++++++++++++++++++ .../ChildrenToParentAggregator.java | 144 ++------------- .../ParentToChildrenAggregator.java | 124 ++----------- 3 files changed, 195 insertions(+), 245 deletions(-) create mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java new file mode 100644 index 0000000000000..c13958c7122c0 --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java @@ -0,0 +1,172 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.join.aggregations; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.LongObjectPagedHashMap; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public abstract class AbstractParentChildAggregator extends BucketsAggregator implements SingleBucketAggregator { + static final ParseField TYPE_FIELD = new ParseField("type"); + + protected final Weight childFilter; + protected final Weight parentFilter; + protected final ValuesSource.Bytes.WithOrdinals valuesSource; + + // Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, + // but then we don't have the reuse feature of BigArrays. + // Also if we know the highest possible value that a parent agg will create + // then we store multiple values into one slot + protected final LongArray ordinalToBuckets; + + // Only pay the extra storage price if the a parentOrd has multiple buckets + // Most of the times a parent doesn't have multiple buckets, since there is + // only one document per parent ord, + // only in the case of terms agg if a parent doc has multiple terms per + // field this is needed: + protected final LongObjectPagedHashMap ordinalToOtherBuckets; + protected boolean multipleBucketsPerOrdinal = false; + + public AbstractParentChildAggregator(String name, AggregatorFactories factories, + SearchContext context, Aggregator parent, Query childFilter, + Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource, + long maxOrd, List pipelineAggregators, Map metaData) + throws IOException { + super(name, factories, context, parent, pipelineAggregators, metaData); + // these two filters are cached in the parser + this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); + this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); + this.ordinalToBuckets = context.bigArrays().newLongArray(maxOrd, false); + this.ordinalToBuckets.fill(0, maxOrd, -1); + this.ordinalToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays()); + this.valuesSource = valuesSource; + } + + protected void storeToOtherBuckets(long globalOrdinal, long bucket) { + long[] bucketOrds = ordinalToOtherBuckets.get(globalOrdinal); + if (bucketOrds != null) { + bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); + bucketOrds[bucketOrds.length - 1] = bucket; + ordinalToOtherBuckets.put(globalOrdinal, bucketOrds); + } else { + ordinalToOtherBuckets.put(globalOrdinal, new long[] { bucket }); + } + multipleBucketsPerOrdinal = true; + } + + abstract Weight getCollectionFilter(); + + abstract Weight getPostCollectionFilter(); + + abstract LeafBucketCollector getLeafBucketCollector(SortedSetDocValues globalOrdinals, Bits docs); + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); + final Bits docs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), getCollectionFilter().scorerSupplier(ctx)); + return getLeafBucketCollector(globalOrdinals, docs); + } + + protected void doPostCollection() throws IOException { + IndexReader indexReader = context().searcher().getIndexReader(); + for (LeafReaderContext ctx : indexReader.leaves()) { + Scorer docsScorer = getPostCollectionFilter().scorer(ctx); + if (docsScorer == null) { + continue; + } + DocIdSetIterator docsIter = docsScorer.iterator(); + + final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); + + final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); + // Set the scorer, since we now replay only the parent or child docIds + sub.setScorer(new Scorable() { + @Override + public float score() { + return 1f; + } + + @Override + public int docID() { + return docsIter.docID(); + } + }); + + final Bits liveDocs = ctx.reader().getLiveDocs(); + for (int docId = docsIter + .nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = docsIter + .nextDoc()) { + if (liveDocs != null && liveDocs.get(docId) == false) { + continue; + } + if (globalOrdinals.advanceExact(docId)) { + long globalOrdinal = globalOrdinals.nextOrd(); + assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; + long bucketOrd = ordinalToBuckets.get(globalOrdinal); + if (bucketOrd != -1) { + collectBucket(sub, docId, bucketOrd); + if (multipleBucketsPerOrdinal) { + long[] otherBucketOrds = ordinalToOtherBuckets.get(globalOrdinal); + if (otherBucketOrds != null) { + for (long otherBucketOrd : otherBucketOrds) { + collectBucket(sub, docId, otherBucketOrd); + } + } + } + } + } + } + } + } + + @Override + protected void doClose() { + Releasables.close(ordinalToBuckets, ordinalToOtherBuckets); + } +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java index 798fc4be93bc0..5b9e1517aad52 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java @@ -18,79 +18,37 @@ */ package org.elasticsearch.join.aggregations; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorable; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.Bits; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.util.LongArray; -import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; -import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * A {@link BucketsAggregator} which resolves to the matching parent documents. * * It ensures that each parent only matches once per bucket. */ -public class ChildrenToParentAggregator extends BucketsAggregator implements SingleBucketAggregator { - - static final ParseField TYPE_FIELD = new ParseField("type"); - - private final Weight childFilter; - private final Weight parentFilter; - private final ValuesSource.Bytes.WithOrdinals valuesSource; - - // Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, - // but then we don't have the reuse feature of BigArrays. - // Also if we know the highest possible value that a parent agg will create - // then we store multiple values into one slot - private final LongArray childrenOrdToBuckets; - - // Only pay the extra storage price if the a parentOrd has multiple buckets - // Most of the times a parent doesn't have multiple buckets, since there is - // only one document per parent ord, - // only in the case of terms agg if a parent doc has multiple terms per - // field this is needed: - private final LongObjectPagedHashMap childrenOrdToOtherBuckets; - private boolean multipleBucketsPerChildrenOrd = false; +public class ChildrenToParentAggregator extends AbstractParentChildAggregator { public ChildrenToParentAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, Query childFilter, Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource, long maxOrd, List pipelineAggregators, Map metaData) throws IOException { - super(name, factories, context, parent, pipelineAggregators, metaData); - // these two filters are cached in the parser - this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.childrenOrdToBuckets = context.bigArrays().newLongArray(maxOrd, false); - this.childrenOrdToBuckets.fill(0, maxOrd, -1); - this.childrenOrdToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays()); - this.valuesSource = valuesSource; + super(name, factories, context, parent, childFilter, parentFilter, + valuesSource, maxOrd, pipelineAggregators, metaData); } @Override @@ -106,13 +64,7 @@ public InternalAggregation buildEmptyAggregation() { } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - if (valuesSource == null) { - return LeafBucketCollector.NO_OP_COLLECTOR; - } - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - final Bits childDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), childFilter.scorerSupplier(ctx)); + LeafBucketCollector getLeafBucketCollector(SortedSetDocValues globalOrdinals, Bits childDocs) { return new LeafBucketCollector() { @Override @@ -121,24 +73,16 @@ public void collect(int docId, long bucket) throws IOException { long globalOrdinal = globalOrdinals.nextOrd(); assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; if (globalOrdinal != -1) { - long bucketPrev = childrenOrdToBuckets.get(globalOrdinal); + long bucketPrev = ordinalToBuckets.get(globalOrdinal); - // add bucket if none is set, + // add bucket if none is set if (bucketPrev == -1) { - childrenOrdToBuckets.set(globalOrdinal, bucket); + ordinalToBuckets.set(globalOrdinal, bucket); } else if (bucketPrev != bucket) { // otherwise store it in the array, but only if the bucket is different than // the previous one for this childDoc, as we only need one occurrence // of child -> parent for this aggregation - long[] bucketOrds = childrenOrdToOtherBuckets.get(globalOrdinal); - if (bucketOrds != null) { - bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); - bucketOrds[bucketOrds.length - 1] = bucket; - childrenOrdToOtherBuckets.put(globalOrdinal, bucketOrds); - } else { - childrenOrdToOtherBuckets.put(globalOrdinal, new long[] { bucket }); - } - multipleBucketsPerChildrenOrd = true; + storeToOtherBuckets(globalOrdinal, bucket); } } } @@ -147,76 +91,12 @@ public void collect(int docId, long bucket) throws IOException { } @Override - protected void doPostCollection() throws IOException { - IndexReader indexReader = context().searcher().getIndexReader(); - for (LeafReaderContext ctx : indexReader.leaves()) { - Scorer parentDocsScorer = parentFilter.scorer(ctx); - if (parentDocsScorer == null) { - continue; - } - DocIdSetIterator parentDocsIter = parentDocsScorer.iterator(); - - final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); - - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - // Set the scorer, since we now replay only the parent docIds - sub.setScorer(new Scorable() { - @Override - public float score() { - return 1f; - } - - @Override - public int docID() { - return parentDocsIter.docID(); - } - }); - - // TODO: this is unwanted allocation, just for initial verification - // of the implementation idea, probably needs to be done differently - // for production use so we do not cause memory churn here - //Set> seenParents = new HashSet<>(); - Map> seenParents = new HashMap<>(); - - final Bits liveDocs = ctx.reader().getLiveDocs(); - for (int docId = parentDocsIter - .nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = parentDocsIter - .nextDoc()) { - if (liveDocs != null && liveDocs.get(docId) == false) { - continue; - } - if (globalOrdinals.advanceExact(docId)) { - long globalOrdinal = globalOrdinals.nextOrd(); - assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; - long bucketOrd = childrenOrdToBuckets.get(globalOrdinal); - if (bucketOrd != -1) { - collectBucketIfUnique(sub, seenParents, docId, bucketOrd); - if (multipleBucketsPerChildrenOrd) { - long[] otherBucketOrds = childrenOrdToOtherBuckets.get(globalOrdinal); - if (otherBucketOrds != null) { - for (long otherBucketOrd : otherBucketOrds) { - // only collect each parentId once per bucket - collectBucketIfUnique(sub, seenParents, docId, otherBucketOrd); - } - } - } - } - } - } - } - } - - private void collectBucketIfUnique(LeafBucketCollector sub, Map> seenParents, - int docId, long bucketOrd) throws IOException { - // only collect each parentId once per bucket - Set seenBucketOrds = seenParents.computeIfAbsent(docId, integer -> new HashSet<>()); - if (seenBucketOrds.add(bucketOrd)) { - collectBucket(sub, docId, bucketOrd); - } + Weight getCollectionFilter() { + return childFilter; } @Override - protected void doClose() { - Releasables.close(childrenOrdToBuckets, childrenOrdToOtherBuckets); + Weight getPostCollectionFilter() { + return parentFilter; } } diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java index 064d1d1e5977c..18d4560ab95fc 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java @@ -18,73 +18,33 @@ */ package org.elasticsearch.join.aggregations; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorable; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.Bits; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.util.LongArray; -import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; -import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; // The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this // aggregation, for this reason that collector can't be used -public class ParentToChildrenAggregator extends BucketsAggregator implements SingleBucketAggregator { - - static final ParseField TYPE_FIELD = new ParseField("type"); - - private final Weight childFilter; - private final Weight parentFilter; - private final ValuesSource.Bytes.WithOrdinals valuesSource; - - // Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, - // but then we don't have the reuse feature of BigArrays. - // Also if we know the highest possible value that a parent agg will create - // then we store multiple values into one slot - private final LongArray parentOrdToBuckets; - - // Only pay the extra storage price if the a parentOrd has multiple buckets - // Most of the times a parent doesn't have multiple buckets, since there is - // only one document per parent ord, - // only in the case of terms agg if a parent doc has multiple terms per - // field this is needed: - private final LongObjectPagedHashMap parentOrdToOtherBuckets; - private boolean multipleBucketsPerParentOrd = false; +public class ParentToChildrenAggregator extends AbstractParentChildAggregator { public ParentToChildrenAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, Query childFilter, Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource, long maxOrd, List pipelineAggregators, Map metaData) throws IOException { - super(name, factories, context, parent, pipelineAggregators, metaData); - // these two filters are cached in the parser - this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.parentOrdToBuckets = context.bigArrays().newLongArray(maxOrd, false); - this.parentOrdToBuckets.fill(0, maxOrd, -1); - this.parentOrdToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays()); - this.valuesSource = valuesSource; + super(name, factories, context, parent, childFilter, parentFilter, + valuesSource, maxOrd, pipelineAggregators, metaData); } @Override @@ -100,13 +60,7 @@ public InternalAggregation buildEmptyAggregation() { } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - if (valuesSource == null) { - return LeafBucketCollector.NO_OP_COLLECTOR; - } - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), parentFilter.scorerSupplier(ctx)); + LeafBucketCollector getLeafBucketCollector(SortedSetDocValues globalOrdinals, Bits parentDocs) { return new LeafBucketCollector() { @Override @@ -115,18 +69,10 @@ public void collect(int docId, long bucket) throws IOException { long globalOrdinal = globalOrdinals.nextOrd(); assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; if (globalOrdinal != -1) { - if (parentOrdToBuckets.get(globalOrdinal) == -1) { - parentOrdToBuckets.set(globalOrdinal, bucket); + if (ordinalToBuckets.get(globalOrdinal) == -1) { + ordinalToBuckets.set(globalOrdinal, bucket); } else { - long[] bucketOrds = parentOrdToOtherBuckets.get(globalOrdinal); - if (bucketOrds != null) { - bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); - bucketOrds[bucketOrds.length - 1] = bucket; - parentOrdToOtherBuckets.put(globalOrdinal, bucketOrds); - } else { - parentOrdToOtherBuckets.put(globalOrdinal, new long[] { bucket }); - } - multipleBucketsPerParentOrd = true; + storeToOtherBuckets(globalOrdinal, bucket); } } } @@ -135,60 +81,12 @@ public void collect(int docId, long bucket) throws IOException { } @Override - protected void doPostCollection() throws IOException { - IndexReader indexReader = context().searcher().getIndexReader(); - for (LeafReaderContext ctx : indexReader.leaves()) { - Scorer childDocsScorer = childFilter.scorer(ctx); - if (childDocsScorer == null) { - continue; - } - DocIdSetIterator childDocsIter = childDocsScorer.iterator(); - - final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); - - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - // Set the scorer, since we now replay only the child docIds - sub.setScorer(new Scorable() { - @Override - public float score() { - return 1f; - } - - @Override - public int docID() { - return childDocsIter.docID(); - } - }); - - final Bits liveDocs = ctx.reader().getLiveDocs(); - for (int docId = childDocsIter - .nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter - .nextDoc()) { - if (liveDocs != null && liveDocs.get(docId) == false) { - continue; - } - if (globalOrdinals.advanceExact(docId)) { - long globalOrdinal = globalOrdinals.nextOrd(); - assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; - long bucketOrd = parentOrdToBuckets.get(globalOrdinal); - if (bucketOrd != -1) { - collectBucket(sub, docId, bucketOrd); - if (multipleBucketsPerParentOrd) { - long[] otherBucketOrds = parentOrdToOtherBuckets.get(globalOrdinal); - if (otherBucketOrds != null) { - for (long otherBucketOrd : otherBucketOrds) { - collectBucket(sub, docId, otherBucketOrd); - } - } - } - } - } - } - } + Weight getCollectionFilter() { + return parentFilter; } @Override - protected void doClose() { - Releasables.close(parentOrdToBuckets, parentOrdToOtherBuckets); + Weight getPostCollectionFilter() { + return childFilter; } } From b73af5a5dda98280fa14c40a7fa575699b50b8be Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Fri, 26 Oct 2018 23:01:41 +0200 Subject: [PATCH 4/7] Rework parent-aggregation after work in PR #34845 The required changes are now even less as the new base class ParentJoinAggregator implements collecting buckets in a different way. --- .../AbstractParentChildAggregator.java | 172 ------------------ .../ChildrenToParentAggregator.java | 58 +----- .../aggregations/ParentAggregatorFactory.java | 38 ++-- 3 files changed, 32 insertions(+), 236 deletions(-) delete mode 100644 modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java deleted file mode 100644 index c13958c7122c0..0000000000000 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/AbstractParentChildAggregator.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.join.aggregations; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorable; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; -import org.apache.lucene.util.Bits; -import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.util.LongArray; -import org.elasticsearch.common.util.LongObjectPagedHashMap; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; -import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -public abstract class AbstractParentChildAggregator extends BucketsAggregator implements SingleBucketAggregator { - static final ParseField TYPE_FIELD = new ParseField("type"); - - protected final Weight childFilter; - protected final Weight parentFilter; - protected final ValuesSource.Bytes.WithOrdinals valuesSource; - - // Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, - // but then we don't have the reuse feature of BigArrays. - // Also if we know the highest possible value that a parent agg will create - // then we store multiple values into one slot - protected final LongArray ordinalToBuckets; - - // Only pay the extra storage price if the a parentOrd has multiple buckets - // Most of the times a parent doesn't have multiple buckets, since there is - // only one document per parent ord, - // only in the case of terms agg if a parent doc has multiple terms per - // field this is needed: - protected final LongObjectPagedHashMap ordinalToOtherBuckets; - protected boolean multipleBucketsPerOrdinal = false; - - public AbstractParentChildAggregator(String name, AggregatorFactories factories, - SearchContext context, Aggregator parent, Query childFilter, - Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource, - long maxOrd, List pipelineAggregators, Map metaData) - throws IOException { - super(name, factories, context, parent, pipelineAggregators, metaData); - // these two filters are cached in the parser - this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.ordinalToBuckets = context.bigArrays().newLongArray(maxOrd, false); - this.ordinalToBuckets.fill(0, maxOrd, -1); - this.ordinalToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays()); - this.valuesSource = valuesSource; - } - - protected void storeToOtherBuckets(long globalOrdinal, long bucket) { - long[] bucketOrds = ordinalToOtherBuckets.get(globalOrdinal); - if (bucketOrds != null) { - bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); - bucketOrds[bucketOrds.length - 1] = bucket; - ordinalToOtherBuckets.put(globalOrdinal, bucketOrds); - } else { - ordinalToOtherBuckets.put(globalOrdinal, new long[] { bucket }); - } - multipleBucketsPerOrdinal = true; - } - - abstract Weight getCollectionFilter(); - - abstract Weight getPostCollectionFilter(); - - abstract LeafBucketCollector getLeafBucketCollector(SortedSetDocValues globalOrdinals, Bits docs); - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - if (valuesSource == null) { - return LeafBucketCollector.NO_OP_COLLECTOR; - } - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - final Bits docs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), getCollectionFilter().scorerSupplier(ctx)); - return getLeafBucketCollector(globalOrdinals, docs); - } - - protected void doPostCollection() throws IOException { - IndexReader indexReader = context().searcher().getIndexReader(); - for (LeafReaderContext ctx : indexReader.leaves()) { - Scorer docsScorer = getPostCollectionFilter().scorer(ctx); - if (docsScorer == null) { - continue; - } - DocIdSetIterator docsIter = docsScorer.iterator(); - - final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); - - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - // Set the scorer, since we now replay only the parent or child docIds - sub.setScorer(new Scorable() { - @Override - public float score() { - return 1f; - } - - @Override - public int docID() { - return docsIter.docID(); - } - }); - - final Bits liveDocs = ctx.reader().getLiveDocs(); - for (int docId = docsIter - .nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = docsIter - .nextDoc()) { - if (liveDocs != null && liveDocs.get(docId) == false) { - continue; - } - if (globalOrdinals.advanceExact(docId)) { - long globalOrdinal = globalOrdinals.nextOrd(); - assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; - long bucketOrd = ordinalToBuckets.get(globalOrdinal); - if (bucketOrd != -1) { - collectBucket(sub, docId, bucketOrd); - if (multipleBucketsPerOrdinal) { - long[] otherBucketOrds = ordinalToOtherBuckets.get(globalOrdinal); - if (otherBucketOrds != null) { - for (long otherBucketOrd : otherBucketOrds) { - collectBucket(sub, docId, otherBucketOrd); - } - } - } - } - } - } - } - } - - @Override - protected void doClose() { - Releasables.close(ordinalToBuckets, ordinalToOtherBuckets); - } -} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java index 5b9e1517aad52..f11fc7e423b38 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java @@ -18,15 +18,11 @@ */ package org.elasticsearch.join.aggregations; -import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.search.Query; -import org.apache.lucene.search.Weight; -import org.apache.lucene.util.Bits; +import org.elasticsearch.common.ParseField; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; @@ -36,19 +32,18 @@ import java.util.Map; /** - * A {@link BucketsAggregator} which resolves to the matching parent documents. - * - * It ensures that each parent only matches once per bucket. + * A {@link org.elasticsearch.search.aggregations.bucket.BucketsAggregator} which resolves + * to the matching parent documents. */ -public class ChildrenToParentAggregator extends AbstractParentChildAggregator { +public class ChildrenToParentAggregator extends ParentJoinAggregator { + + static final ParseField TYPE_FIELD = new ParseField("type"); public ChildrenToParentAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, Query childFilter, Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource, - long maxOrd, List pipelineAggregators, Map metaData) - throws IOException { - super(name, factories, context, parent, childFilter, parentFilter, - valuesSource, maxOrd, pipelineAggregators, metaData); + long maxOrd, List pipelineAggregators, Map metaData) throws IOException { + super(name, factories, context, parent, childFilter, parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData); } @Override @@ -62,41 +57,4 @@ public InternalAggregation buildEmptyAggregation() { return new InternalParent(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData()); } - - @Override - LeafBucketCollector getLeafBucketCollector(SortedSetDocValues globalOrdinals, Bits childDocs) { - return new LeafBucketCollector() { - - @Override - public void collect(int docId, long bucket) throws IOException { - if (childDocs.get(docId) && globalOrdinals.advanceExact(docId)) { - long globalOrdinal = globalOrdinals.nextOrd(); - assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; - if (globalOrdinal != -1) { - long bucketPrev = ordinalToBuckets.get(globalOrdinal); - - // add bucket if none is set - if (bucketPrev == -1) { - ordinalToBuckets.set(globalOrdinal, bucket); - } else if (bucketPrev != bucket) { - // otherwise store it in the array, but only if the bucket is different than - // the previous one for this childDoc, as we only need one occurrence - // of child -> parent for this aggregation - storeToOtherBuckets(globalOrdinal, bucket); - } - } - } - } - }; - } - - @Override - Weight getCollectionFilter() { - return childFilter; - } - - @Override - Weight getPostCollectionFilter() { - return parentFilter; - } } diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java index ee7ea6cd21ead..2ae3da7c47af3 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentAggregatorFactory.java @@ -35,39 +35,49 @@ import java.util.List; import java.util.Map; -public class ParentAggregatorFactory - extends ValuesSourceAggregatorFactory { +public class ParentAggregatorFactory extends ValuesSourceAggregatorFactory { private final Query parentFilter; private final Query childFilter; - public ParentAggregatorFactory(String name, ValuesSourceConfig config, - Query childFilter, Query parentFilter, SearchContext context, AggregatorFactory parent, - AggregatorFactories.Builder subFactoriesBuilder, Map metaData) throws IOException { + public ParentAggregatorFactory(String name, + ValuesSourceConfig config, + Query childFilter, + Query parentFilter, + SearchContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { super(name, config, context, parent, subFactoriesBuilder, metaData); + this.childFilter = childFilter; this.parentFilter = parentFilter; } @Override - protected Aggregator createUnmapped(Aggregator parent, List pipelineAggregators, Map metaData) - throws IOException { + protected Aggregator createUnmapped(Aggregator parent, + List pipelineAggregators, Map metaData) throws IOException { return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) { - @Override public InternalAggregation buildEmptyAggregation() { return new InternalParent(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData()); } - }; } @Override - protected Aggregator doCreateInternal(WithOrdinals valuesSource, Aggregator children, - boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) - throws IOException { + protected Aggregator doCreateInternal(WithOrdinals valuesSource, + Aggregator children, + boolean collectsFromSingleBucket, + List pipelineAggregators, + Map metaData) throws IOException { + long maxOrd = valuesSource.globalMaxOrd(context.searcher()); - return new ChildrenToParentAggregator(name, factories, context, children, childFilter, - parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData); + if (collectsFromSingleBucket) { + return new ChildrenToParentAggregator(name, factories, context, children, childFilter, + parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData); + } else { + return asMultiBucketAggregator(this, context, children); + } } } From 01aa7d01f0e7fa289ad97eeea537003be86692fc Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Mon, 5 Nov 2018 22:17:37 +0100 Subject: [PATCH 5/7] Adjust tests for parent-aggregation some more based on review-comments Adjust import/javadoc, remove commented code/TODOs --- .../ChildrenToParentAggregator.java | 4 +- .../join/aggregations/ChildrenTests.java | 3 +- .../ChildrenToParentAggregatorTests.java | 10 -- .../join/aggregations/ParentIT.java | 103 +++++++++++------- 4 files changed, 68 insertions(+), 52 deletions(-) diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java index f11fc7e423b38..8c2ac5373b4b4 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java @@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; @@ -32,8 +33,7 @@ import java.util.Map; /** - * A {@link org.elasticsearch.search.aggregations.bucket.BucketsAggregator} which resolves - * to the matching parent documents. + * A {@link BucketsAggregator} which resolves to the matching parent documents. */ public class ChildrenToParentAggregator extends ParentJoinAggregator { diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenTests.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenTests.java index 85a97c4b9b413..58d315d2d43ed 100644 --- a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenTests.java +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenTests.java @@ -37,8 +37,7 @@ protected Collection> getPlugins() { protected ChildrenAggregationBuilder createTestAggregatorBuilder() { String name = randomAlphaOfLengthBetween(3, 20); String childType = randomAlphaOfLengthBetween(5, 40); - ChildrenAggregationBuilder factory = new ChildrenAggregationBuilder(name, childType); - return factory; + return new ChildrenAggregationBuilder(name, childType); } } diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java index 072ad1ee1ad6e..685c872fa72d4 100644 --- a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregatorTests.java @@ -213,22 +213,12 @@ public void testTermsParentChildTerms() throws IOException { testCaseTermsParentTerms(new MatchAllDocsQuery(), indexSearcher, longTerms -> { assertNotNull(longTerms); - // TODO: test some more here for (LongTerms.Bucket bucket : longTerms.getBuckets()) { assertNotNull(bucket); assertNotNull(bucket.getKeyAsString()); } }); - /*for(int j = 0;j < 20;j++) { - long start = System.currentTimeMillis(); - for (int i = 0; i < 100; i++) { - testCaseTermsParentTerms(new MatchAllDocsQuery(), indexSearcher, Assert::assertNotNull); - } - - System.out.println("Duration: " + (System.currentTimeMillis() - start)); - }*/ - indexReader.close(); directory.close(); } diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java index d587e0945c946..56c48cbd416d9 100644 --- a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java @@ -21,11 +21,13 @@ import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -96,9 +98,9 @@ public void testParentAggs() throws Exception { .setSize(10000) .setQuery(matchQuery("randomized", true)) .addAggregation( - terms("commenter").field("commenter").size(10000).subAggregation(parent("to_article", "comment") - .subAggregation( - terms("category").field("category").size(10000).subAggregation( + terms("to_commenter").field("commenter").size(10000).subAggregation( + parent("to_article", "comment").subAggregation( + terms("to_category").field("category").size(10000).subAggregation( topHits("top_category") )) ) @@ -109,9 +111,43 @@ public void testParentAggs() throws Exception { final Set commenters = getCommenters(); final Map> commenterToComments = getCommenterToComments(); - Terms categoryTerms = searchResponse.getAggregations().get("commenter"); + Terms categoryTerms = searchResponse.getAggregations().get("to_commenter"); assertThat("Request: " + searchRequest + "\nResponse: " + searchResponse + "\n", categoryTerms.getBuckets().size(), equalTo(commenters.size())); + for (Terms.Bucket commenterBucket : categoryTerms.getBuckets()) { + Set comments = commenterToComments.get(commenterBucket.getKeyAsString()); + assertNotNull(comments); + assertThat("Failed for commenter " + commenterBucket.getKeyAsString(), + commenterBucket.getDocCount(), equalTo((long)comments.size())); + + Parent articleAgg = commenterBucket.getAggregations().get("to_article"); + assertThat(articleAgg.getName(), equalTo("to_article")); + // find all articles for the comments for the current commenter + Set articles = articleToControl.values().stream().flatMap( + (Function>) parentControl -> parentControl.commentIds.stream(). + filter(comments::contains) + ).collect(Collectors.toSet()); + + assertThat(articleAgg.getDocCount(), equalTo((long)articles.size())); + + Terms categoryAgg = articleAgg.getAggregations().get("to_category"); + assertNotNull(categoryAgg); + + List categories = categoryToControl.entrySet(). + stream(). + filter(entry -> entry.getValue().commenterToCommentId.containsKey(commenterBucket.getKeyAsString())). + map(Map.Entry::getKey). + collect(Collectors.toList()); + + for (String category : categories) { + Terms.Bucket categoryBucket = categoryAgg.getBucketByKey(category); + assertNotNull(categoryBucket); + + Aggregation topCategory = categoryBucket.getAggregations().get("top_category"); + assertNotNull(topCategory); + } + } + for (String commenter : commenters) { Terms.Bucket categoryBucket = categoryTerms.getBucketByKey(commenter); assertThat(categoryBucket.getKeyAsString(), equalTo(commenter)); @@ -119,25 +155,6 @@ public void testParentAggs() throws Exception { Parent childrenBucket = categoryBucket.getAggregations().get("to_article"); assertThat(childrenBucket.getName(), equalTo("to_article")); - - // TODO: verify some more - /*assertThat(childrenBucket.getDocCount(), equalTo((long) entry1.getValue().commentIds.size())); - assertThat(((InternalAggregation)childrenBucket).getProperty("_count"), - equalTo((long) entry1.getValue().commentIds.size())); - - Terms commentersTerms = childrenBucket.getAggregations().get("category"); - assertThat(((InternalAggregation)childrenBucket).getProperty("category"), sameInstance(commentersTerms)); - assertThat(commentersTerms.getBuckets().size(), equalTo(entry1.getValue().commenterToCommentId.size())); - for (Map.Entry> entry2 : entry1.getValue().commenterToCommentId.entrySet()) { - Terms.Bucket commentBucket = commentersTerms.getBucketByKey(entry2.getKey()); - assertThat(commentBucket.getKeyAsString(), equalTo(entry2.getKey())); - assertThat(commentBucket.getDocCount(), equalTo((long) entry2.getValue().size())); - - TopHits topHits = commentBucket.getAggregations().get("top_commenters"); - for (SearchHit searchHit : topHits.getHits().getHits()) { - assertThat(entry2.getValue().contains(searchHit.getId()), is(true)); - } - }*/ } } @@ -158,18 +175,17 @@ private Map> getCommenterToComments() { return commenterToComments; } -// public void testNonExistingParentType() throws Exception { -// SearchResponse searchResponse = client().prepareSearch("test") -// .addAggregation( -// parent("non-existing", "xyz") -// ).get(); -// assertSearchResponse(searchResponse); -// -// Parent parent = searchResponse.getAggregations().get("non-existing"); -// assertThat(parent.getName(), equalTo("non-existing")); -// assertThat(parent.getDocCount(), equalTo(0L)); -// } + public void testNonExistingParentType() throws Exception { + SearchResponse searchResponse = client().prepareSearch("test") + .addAggregation( + parent("non-existing", "xyz") + ).get(); + assertSearchResponse(searchResponse); + Parent parent = searchResponse.getAggregations().get("non-existing"); + assertThat(parent.getName(), equalTo("non-existing")); + assertThat(parent.getDocCount(), equalTo(0L)); + } public void testTermsParentAggTerms() throws Exception { final SearchRequestBuilder searchRequest = client().prepareSearch("test") @@ -177,9 +193,8 @@ public void testTermsParentAggTerms() throws Exception { .setQuery(matchQuery("randomized", true)) .addAggregation( terms("to_commenter").field("commenter").size(10000).subAggregation( - parent("to_article", "comment") - .subAggregation( - terms("category").field("category").size(10000)))); + parent("to_article", "comment").subAggregation( + terms("to_category").field("category").size(10000)))); SearchResponse searchResponse = searchRequest.get(); assertSearchResponse(searchResponse); @@ -205,7 +220,19 @@ public void testTermsParentAggTerms() throws Exception { assertThat(articleAgg.getDocCount(), equalTo((long)articles.size())); - // TODO: add more verification + Terms categoryAgg = articleAgg.getAggregations().get("to_category"); + assertNotNull(categoryAgg); + + List categories = categoryToControl.entrySet(). + stream(). + filter(entry -> entry.getValue().commenterToCommentId.containsKey(commenterBucket.getKeyAsString())). + map(Map.Entry::getKey). + collect(Collectors.toList()); + + for (String category : categories) { + Terms.Bucket categoryBucket = categoryAgg.getBucketByKey(category); + assertNotNull(categoryBucket); + } } } } From 769ad0f2a99998f65d92a90a8074d414b065957c Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Tue, 6 Nov 2018 23:29:04 +0100 Subject: [PATCH 6/7] Abstract base class for tests needs to be named ...TestCase --- ...tractParentChildIT.java => AbstractParentChildTestCase.java} | 2 +- .../java/org/elasticsearch/join/aggregations/ChildrenIT.java | 2 +- .../test/java/org/elasticsearch/join/aggregations/ParentIT.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/{AbstractParentChildIT.java => AbstractParentChildTestCase.java} (98%) diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildIT.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildTestCase.java similarity index 98% rename from modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildIT.java rename to modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildTestCase.java index 45518b8b7c04c..3dc14b2f9c47b 100644 --- a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildIT.java +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/AbstractParentChildTestCase.java @@ -35,7 +35,7 @@ /** * Small base test-class which combines stuff used for Children and Parent aggregation tests */ -public abstract class AbstractParentChildIT extends ParentChildTestCase { +public abstract class AbstractParentChildTestCase extends ParentChildTestCase { protected final Map categoryToControl = new HashMap<>(); protected final Map articleToControl = new HashMap<>(); diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java index b2c2f6f9ea511..46008451736f9 100644 --- a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ChildrenIT.java @@ -54,7 +54,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; -public class ChildrenIT extends AbstractParentChildIT { +public class ChildrenIT extends AbstractParentChildTestCase { public void testChildrenAggs() throws Exception { SearchResponse searchResponse = client().prepareSearch("test") diff --git a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java index 56c48cbd416d9..635195e62fe30 100644 --- a/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java +++ b/modules/parent-join/src/test/java/org/elasticsearch/join/aggregations/ParentIT.java @@ -41,7 +41,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; -public class ParentIT extends AbstractParentChildIT { +public class ParentIT extends AbstractParentChildTestCase { public void testSimpleParentAgg() throws Exception { final SearchRequestBuilder searchRequest = client().prepareSearch("test") From d37af45bdf26e426a9701a7740c4081d84641ac4 Mon Sep 17 00:00:00 2001 From: Dominik Stadler Date: Mon, 5 Nov 2018 21:49:28 +0100 Subject: [PATCH 7/7] Add documentation for parent-aggregation similar to children-aggregation --- docs/reference/aggregations/bucket.asciidoc | 2 + .../bucket/parent-aggregation.asciidoc | 213 ++++++++++++++++++ 2 files changed, 215 insertions(+) create mode 100644 docs/reference/aggregations/bucket/parent-aggregation.asciidoc diff --git a/docs/reference/aggregations/bucket.asciidoc b/docs/reference/aggregations/bucket.asciidoc index ddb55e8d34c8e..52b27c578929f 100644 --- a/docs/reference/aggregations/bucket.asciidoc +++ b/docs/reference/aggregations/bucket.asciidoc @@ -49,6 +49,8 @@ include::bucket/missing-aggregation.asciidoc[] include::bucket/nested-aggregation.asciidoc[] +include::bucket/parent-aggregation.asciidoc[] + include::bucket/range-aggregation.asciidoc[] include::bucket/reverse-nested-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/bucket/parent-aggregation.asciidoc b/docs/reference/aggregations/bucket/parent-aggregation.asciidoc new file mode 100644 index 0000000000000..d25d882205d66 --- /dev/null +++ b/docs/reference/aggregations/bucket/parent-aggregation.asciidoc @@ -0,0 +1,213 @@ +[[search-aggregations-bucket-parent-aggregation]] +=== Parent Aggregation + +A special single bucket aggregation that selects parent documents that have the specified type, as defined in a <>. + +This aggregation has a single option: + +* `type` - The child type that should be selected. + +For example, let's say we have an index of questions and answers. The answer type has the following `join` field in the mapping: + +[source,js] +-------------------------------------------------- +PUT parent_example +{ + "mappings": { + "_doc": { + "properties": { + "join": { + "type": "join", + "relations": { + "question": "answer" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE + +The `question` document contain a tag field and the `answer` documents contain an owner field. With the `parent` +aggregation the owner buckets can be mapped to the tag buckets in a single request even though the two fields exist in +two different kinds of documents. + +An example of a question document: + +[source,js] +-------------------------------------------------- +PUT parent_example/_doc/1 +{ + "join": { + "name": "question" + }, + "body": "

I have Windows 2003 server and i bought a new Windows 2008 server...", + "title": "Whats the best way to file transfer my site from server to a newer one?", + "tags": [ + "windows-server-2003", + "windows-server-2008", + "file-transfer" + ] +} +-------------------------------------------------- +// CONSOLE +// TEST[continued] + +Examples of `answer` documents: + +[source,js] +-------------------------------------------------- +PUT parent_example/_doc/2?routing=1 +{ + "join": { + "name": "answer", + "parent": "1" + }, + "owner": { + "location": "Norfolk, United Kingdom", + "display_name": "Sam", + "id": 48 + }, + "body": "

Unfortunately you're pretty much limited to FTP...", + "creation_date": "2009-05-04T13:45:37.030" +} + +PUT parent_example/_doc/3?routing=1&refresh +{ + "join": { + "name": "answer", + "parent": "1" + }, + "owner": { + "location": "Norfolk, United Kingdom", + "display_name": "Troll", + "id": 49 + }, + "body": "

Use Linux...", + "creation_date": "2009-05-05T13:45:37.030" +} +-------------------------------------------------- +// CONSOLE +// TEST[continued] + +The following request can be built that connects the two together: + +[source,js] +-------------------------------------------------- +POST parent_example/_search?size=0 +{ + "aggs": { + "top-names": { + "terms": { + "field": "owner.display_name.keyword", + "size": 10 + }, + "aggs": { + "to-questions": { + "parent": { + "type" : "answer" + }, + "aggs": { + "top-tags": { + "terms": { + "field": "tags.keyword", + "size": 10 + } + } + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[continued] + +<1> The `type` points to type / mapping with the name `answer`. + +The above example returns the top answer owners and per owner the top question tags. + +Possible response: + +[source,js] +-------------------------------------------------- +{ + "took": 9, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": 3, + "max_score": null, + "hits": [] + }, + "aggregations": { + "top-names": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0, + "buckets": [ + { + "key": "Sam", + "doc_count": 1, <1> + "to-questions": { + "doc_count": 1, <2> + "top-tags": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0, + "buckets": [ + { + "key": "file-transfer", + "doc_count": 1 + }, + { + "key": "windows-server-2003", + "doc_count": 1 + }, + { + "key": "windows-server-2008", + "doc_count": 1 + } + ] + } + } + }, + { + "key": "Troll", + "doc_count": 1, <1> + "to-questions": { + "doc_count": 1, <2> + "top-tags": { + "doc_count_error_upper_bound": 0, + "sum_other_doc_count": 0, + "buckets": [ + { + "key": "file-transfer", + "doc_count": 1 + }, + { + "key": "windows-server-2003", + "doc_count": 1 + }, + { + "key": "windows-server-2008", + "doc_count": 1 + } + ] + } + } + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 9/"took": $body.took/] + +<1> The number of answer documents with the tag `Sam`, `Troll`, etc. +<2> The number of question documents that are related to answer documents with the tag `Sam`, `Troll`, etc.