Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parent-aggregation to parent-join module #34210

Merged
merged 9 commits into from
Nov 8, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ public void testDefaultNamedXContents() {

public void testProvidedNamedXContents() {
List<NamedXContentRegistry.Entry> namedXContents = RestHighLevelClient.getProvidedNamedXContents();
assertEquals(10, namedXContents.size());
assertEquals(11, namedXContents.size());
Map<Class<?>, Integer> categories = new HashMap<>();
List<String> names = new ArrayList<>();
for (NamedXContentRegistry.Entry namedXContent : namedXContents) {
Expand All @@ -629,7 +629,7 @@ public void testProvidedNamedXContents() {
}
}
assertEquals(3, categories.size());
assertEquals(Integer.valueOf(2), categories.get(Aggregation.class));
assertEquals(Integer.valueOf(3), categories.get(Aggregation.class));
assertTrue(names.contains(ChildrenAggregationBuilder.NAME));
assertTrue(names.contains(MatrixStatsAggregationBuilder.NAME));
assertEquals(Integer.valueOf(4), categories.get(EvaluationMetric.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.join.aggregations.ChildrenAggregationBuilder;
import org.elasticsearch.join.aggregations.InternalChildren;
import org.elasticsearch.join.aggregations.InternalParent;
import org.elasticsearch.join.aggregations.ParentAggregationBuilder;
import org.elasticsearch.join.mapper.ParentJoinFieldMapper;
import org.elasticsearch.join.query.HasChildQueryBuilder;
import org.elasticsearch.join.query.HasParentQueryBuilder;
Expand Down Expand Up @@ -51,9 +53,11 @@ public List<QuerySpec<?>> getQueries() {

@Override
public List<AggregationSpec> getAggregations() {
return Collections.singletonList(
new AggregationSpec(ChildrenAggregationBuilder.NAME, ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse)
.addResultReader(InternalChildren::new)
return Arrays.asList(
new AggregationSpec(ChildrenAggregationBuilder.NAME, ChildrenAggregationBuilder::new, ChildrenAggregationBuilder::parse)
.addResultReader(InternalChildren::new),
new AggregationSpec(ParentAggregationBuilder.NAME, ParentAggregationBuilder::new, ParentAggregationBuilder::parse)
.addResultReader(InternalParent::new)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.join.aggregations;

import org.apache.lucene.search.Query;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* A {@link org.elasticsearch.search.aggregations.bucket.BucketsAggregator} which resolves
centic9 marked this conversation as resolved.
Show resolved Hide resolved
* to the matching parent documents.
*/
public class ChildrenToParentAggregator extends ParentJoinAggregator {

static final ParseField TYPE_FIELD = new ParseField("type");

public ChildrenToParentAggregator(String name, AggregatorFactories factories,
SearchContext context, Aggregator parent, Query childFilter,
Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource,
long maxOrd, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, factories, context, parent, childFilter, parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalParent(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), pipelineAggregators(), metaData());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalParent(name, 0, buildEmptySubAggregations(), pipelineAggregators(),
metaData());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.join.aggregations;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Results of the {@link ChildrenToParentAggregator}.
*/
public class InternalParent extends InternalSingleBucketAggregation implements Parent {
public InternalParent(String name, long docCount, InternalAggregations aggregations, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, docCount, aggregations, pipelineAggregators, metaData);
}

/**
* Read from a stream.
*/
public InternalParent(StreamInput in) throws IOException {
super(in);
}

@Override
public String getWriteableName() {
return ParentAggregationBuilder.NAME;
}

@Override
protected InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations) {
return new InternalParent(name, docCount, subAggregations, pipelineAggregators(), getMetaData());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,11 @@ public abstract class JoinAggregationBuilders {
public static ChildrenAggregationBuilder children(String name, String childType) {
return new ChildrenAggregationBuilder(name, childType);
}

/**
* Create a new {@link Parent} aggregation with the given name.
*/
public static ParentAggregationBuilder parent(String name, String childType) {
return new ParentAggregationBuilder(name, childType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.join.aggregations;

import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;

/**
* An single bucket aggregation that translates child documents to their parent documents.
*/
public interface Parent extends SingleBucketAggregation {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.join.aggregations;

import org.apache.lucene.search.Query;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.plain.SortedSetDVOrdinalsIndexFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.join.mapper.ParentIdFieldMapper;
import org.elasticsearch.join.mapper.ParentJoinFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.WithOrdinals;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class ParentAggregationBuilder
extends ValuesSourceAggregationBuilder<WithOrdinals, ParentAggregationBuilder> {

public static final String NAME = "parent";

private final String childType;
private Query parentFilter;
private Query childFilter;

/**
* @param name
* the name of this aggregation
* @param childType
* the type of children documents
*/
public ParentAggregationBuilder(String name, String childType) {
super(name, ValuesSourceType.BYTES, ValueType.STRING);
if (childType == null) {
throw new IllegalArgumentException("[childType] must not be null: [" + name + "]");
}
this.childType = childType;
}

protected ParentAggregationBuilder(ParentAggregationBuilder clone,
Builder factoriesBuilder, Map<String, Object> metaData) {
super(clone, factoriesBuilder, metaData);
this.childType = clone.childType;
this.childFilter = clone.childFilter;
this.parentFilter = clone.parentFilter;
}

@Override
protected AggregationBuilder shallowCopy(Builder factoriesBuilder, Map<String, Object> metaData) {
return new ParentAggregationBuilder(this, factoriesBuilder, metaData);
}

/**
* Read from a stream.
*/
public ParentAggregationBuilder(StreamInput in) throws IOException {
super(in, ValuesSourceType.BYTES, ValueType.STRING);
childType = in.readString();
}

@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeString(childType);
}

@Override
protected ValuesSourceAggregatorFactory<WithOrdinals, ?> innerBuild(SearchContext context,
ValuesSourceConfig<WithOrdinals> config,
AggregatorFactory<?> parent,
Builder subFactoriesBuilder) throws IOException {
return new ParentAggregatorFactory(name, config, childFilter, parentFilter, context, parent,
subFactoriesBuilder, metaData);
}

@Override
protected ValuesSourceConfig<WithOrdinals> resolveConfig(SearchContext context) {
ValuesSourceConfig<WithOrdinals> config = new ValuesSourceConfig<>(ValuesSourceType.BYTES);
joinFieldResolveConfig(context, config);
return config;
}

private void joinFieldResolveConfig(SearchContext context, ValuesSourceConfig<WithOrdinals> config) {
ParentJoinFieldMapper parentJoinFieldMapper = ParentJoinFieldMapper.getMapper(context.mapperService());
ParentIdFieldMapper parentIdFieldMapper = parentJoinFieldMapper.getParentIdFieldMapper(childType, false);
if (parentIdFieldMapper != null) {
parentFilter = parentIdFieldMapper.getParentFilter();
childFilter = parentIdFieldMapper.getChildFilter(childType);
MappedFieldType fieldType = parentIdFieldMapper.fieldType();
final SortedSetDVOrdinalsIndexFieldData fieldData = context.getForField(fieldType);
config.fieldContext(new FieldContext(fieldType.name(), fieldData, fieldType));
} else {
config.unmapped(true);
}
}

@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(ChildrenToParentAggregator.TYPE_FIELD.getPreferredName(), childType);
return builder;
}

public static ParentAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
String childType = null;

XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if ("type".equals(currentFieldName)) {
childType = parser.text();
} else {
throw new ParsingException(parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
}
} else {
throw new ParsingException(parser.getTokenLocation(), "Unexpected token " + token + " in [" + aggregationName + "].");
}
}

if (childType == null) {
throw new ParsingException(parser.getTokenLocation(),
"Missing [child_type] field for parent aggregation [" + aggregationName + "]");
}

return new ParentAggregationBuilder(aggregationName, childType);
}

@Override
protected int innerHashCode() {
return Objects.hash(childType);
}

@Override
protected boolean innerEquals(Object obj) {
ParentAggregationBuilder other = (ParentAggregationBuilder) obj;
return Objects.equals(childType, other.childType);
}

@Override
public String getType() {
return NAME;
}
}
Loading