Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parent-aggregation to parent-join module #34210

Merged
merged 9 commits into from
Nov 8, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void testDefaultNamedXContents() {

public void testProvidedNamedXContents() {
List<NamedXContentRegistry.Entry> namedXContents = RestHighLevelClient.getProvidedNamedXContents();
assertEquals(10, namedXContents.size());
assertEquals(11, namedXContents.size());
Map<Class<?>, Integer> categories = new HashMap<>();
List<String> names = new ArrayList<>();
for (NamedXContentRegistry.Entry namedXContent : namedXContents) {
Expand All @@ -629,7 +629,7 @@ public void testProvidedNamedXContents() {
}
}
assertEquals(3, categories.size());
assertEquals(Integer.valueOf(2), categories.get(Aggregation.class));
assertEquals(Integer.valueOf(3), categories.get(Aggregation.class));
assertTrue(names.contains(ChildrenAggregationBuilder.NAME));
assertTrue(names.contains(MatrixStatsAggregationBuilder.NAME));
assertEquals(Integer.valueOf(4), categories.get(EvaluationMetric.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder;
import org.elasticsearch.join.aggregations.InternalChildren;
import org.elasticsearch.join.aggregations.InternalParent;
import org.elasticsearch.join.aggregations.ParentAggregationBuilder;
import org.elasticsearch.join.mapper.ParentJoinFieldMapper;
import org.elasticsearch.join.query.HasChildQueryBuilder;
import org.elasticsearch.join.query.HasParentQueryBuilder;
Expand Down Expand Up @@ -51,9 +53,11 @@ public List<QuerySpec<?>> getQueries() {

@Override
public List<AggregationSpec> getAggregations() {
return Collections.singletonList(
new AggregationSpec(ChildrenAggregationBuilder.NAME, ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse)
.addResultReader(InternalChildren::new)
return Arrays.asList(
new AggregationSpec(ChildrenAggregationBuilder.NAME, ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse)
.addResultReader(InternalChildren::new),
new AggregationSpec(ParentAggregationBuilder.NAME, ParentAggregationBuilder::new, ParentAggregationBuilder::parse)
.addResultReader(InternalParent::new)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.join.aggregations;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public abstract class AbstractParentChildAggregator extends BucketsAggregator implements SingleBucketAggregator {
static final ParseField TYPE_FIELD = new ParseField("type");

protected final Weight childFilter;
protected final Weight parentFilter;
protected final ValuesSource.Bytes.WithOrdinals valuesSource;

// Maybe use PagedGrowableWriter? This will be less wasteful than LongArray,
// but then we don't have the reuse feature of BigArrays.
// Also if we know the highest possible value that a parent agg will create
// then we store multiple values into one slot
protected final LongArray ordinalToBuckets;

// Only pay the extra storage price if the a parentOrd has multiple buckets
// Most of the times a parent doesn't have multiple buckets, since there is
// only one document per parent ord,
// only in the case of terms agg if a parent doc has multiple terms per
// field this is needed:
protected final LongObjectPagedHashMap<long[]> ordinalToOtherBuckets;
protected boolean multipleBucketsPerOrdinal = false;

public AbstractParentChildAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent, Query childFilter,
Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
// these two filters are cached in the parser
this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
this.ordinalToBuckets = context.bigArrays().newLongArray(maxOrd, false);
this.ordinalToBuckets.fill(0, maxOrd, -1);
this.ordinalToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays());
this.valuesSource = valuesSource;
}

protected void storeToOtherBuckets(long globalOrdinal, long bucket) {
centic9 marked this conversation as resolved.
Show resolved Hide resolved
long[] bucketOrds = ordinalToOtherBuckets.get(globalOrdinal);
if (bucketOrds != null) {
bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1);
bucketOrds[bucketOrds.length - 1] = bucket;
ordinalToOtherBuckets.put(globalOrdinal, bucketOrds);
} else {
ordinalToOtherBuckets.put(globalOrdinal, new long[] { bucket });
}
multipleBucketsPerOrdinal = true;
}

abstract Weight getCollectionFilter();

abstract Weight getPostCollectionFilter();

abstract LeafBucketCollector getLeafBucketCollector(SortedSetDocValues globalOrdinals, Bits docs);

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
final Bits docs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), getCollectionFilter().scorerSupplier(ctx));
return getLeafBucketCollector(globalOrdinals, docs);
}

protected void doPostCollection() throws IOException {
IndexReader indexReader = context().searcher().getIndexReader();
for (LeafReaderContext ctx : indexReader.leaves()) {
Scorer docsScorer = getPostCollectionFilter().scorer(ctx);
if (docsScorer == null) {
continue;
}
DocIdSetIterator docsIter = docsScorer.iterator();

final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);

final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
// Set the scorer, since we now replay only the parent or child docIds
sub.setScorer(new Scorable() {
@Override
public float score() {
return 1f;
}

@Override
public int docID() {
return docsIter.docID();
}
});

final Bits liveDocs = ctx.reader().getLiveDocs();
for (int docId = docsIter
.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = docsIter
.nextDoc()) {
if (liveDocs != null && liveDocs.get(docId) == false) {
continue;
}
if (globalOrdinals.advanceExact(docId)) {
long globalOrdinal = globalOrdinals.nextOrd();
assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
long bucketOrd = ordinalToBuckets.get(globalOrdinal);
if (bucketOrd != -1) {
collectBucket(sub, docId, bucketOrd);
if (multipleBucketsPerOrdinal) {
long[] otherBucketOrds = ordinalToOtherBuckets.get(globalOrdinal);
if (otherBucketOrds != null) {
for (long otherBucketOrd : otherBucketOrds) {
collectBucket(sub, docId, otherBucketOrd);
}
}
}
}
}
}
}
}

@Override
protected void doClose() {
Releasables.close(ordinalToBuckets, ordinalToOtherBuckets);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.join.aggregations;

import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* A {@link BucketsAggregator} which resolves to the matching parent documents.
*
* It ensures that each parent only matches once per bucket.
*/
public class ChildrenToParentAggregator extends AbstractParentChildAggregator {

public ChildrenToParentAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent, Query childFilter,
Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
super(name, factories, context, parent, childFilter, parentFilter,
valuesSource, maxOrd, pipelineAggregators, metaData);
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalParent(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), pipelineAggregators(), metaData());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalParent(name, 0, buildEmptySubAggregations(), pipelineAggregators(),
metaData());
}

@Override
LeafBucketCollector getLeafBucketCollector(SortedSetDocValues globalOrdinals, Bits childDocs) {
return new LeafBucketCollector() {

@Override
public void collect(int docId, long bucket) throws IOException {
if (childDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
long globalOrdinal = globalOrdinals.nextOrd();
assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
if (globalOrdinal != -1) {
long bucketPrev = ordinalToBuckets.get(globalOrdinal);

// add bucket if none is set
if (bucketPrev == -1) {
ordinalToBuckets.set(globalOrdinal, bucket);
} else if (bucketPrev != bucket) {
// otherwise store it in the array, but only if the bucket is different than
// the previous one for this childDoc, as we only need one occurrence
// of child -> parent for this aggregation
centic9 marked this conversation as resolved.
Show resolved Hide resolved
storeToOtherBuckets(globalOrdinal, bucket);
}
}
}
}
};
}

@Override
Weight getCollectionFilter() {
return childFilter;
}

@Override
Weight getPostCollectionFilter() {
return parentFilter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.join.aggregations;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Results of the {@link ChildrenToParentAggregator}.
*/
public class InternalParent extends InternalSingleBucketAggregation implements Parent {
public InternalParent(String name, long docCount, InternalAggregations aggregations, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, docCount, aggregations, pipelineAggregators, metaData);
}

/**
* Read from a stream.
*/
public InternalParent(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ParentAggregationBuilder.NAME;
}

@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalParent(name, docCount, subAggregations, pipelineAggregators(), getMetaData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,11 @@ public abstract class JoinAggregationBuilders {
public static ChildrenAggregationBuilder children(String name, String childType) {
return new ChildrenAggregationBuilder(name, childType);
}

/**
* Create a new {@link Parent} aggregation with the given name.
*/
public static ParentAggregationBuilder parent(String name, String childType) {
return new ParentAggregationBuilder(name, childType);
}
}
Loading