Skip to content

Commit

Permalink
ESQL: Alias duplicated aggregations in a stats (elastic#100642)
Browse files Browse the repository at this point in the history
Replace duplicated aggregations in a stat with an alias (through a 
synthetic eval). Additionally introduce agg normalization which replaces
field aliases  inside agg and the literals in Count with "*".

This improves the query:

eval x = salary stats c = count(), m = min(x), m1 = min(salary), c1 =
count(1)

to stats c = count(*), m = min(x) eval m1 = m, c1 = c keep c, m, m1, c1

Fix elastic#100544
  • Loading branch information
costin authored Oct 18, 2023
1 parent 03ea4bb commit baf251e
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/100642.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 100642
summary: "ESQL: Alias duplicated aggregations in a stats"
area: ES|QL
type: enhancement
issues:
- 100544
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,19 @@ c:l | job_positions:s
4 |Reporting Analyst
4 |Tech Lead
;

duplicateAggregationsWithoutGrouping
from employees | eval x = salary | stats c = count(), m = min(x), m1 = min(salary), c1 = count(1);

c:l | m:i | m1:i | c1:l
100 | 25324 | 25324 | 100
;

duplicateAggregationsWithGrouping
from employees | eval x = salary | stats c = count(), m = min(x), m1 = min(salary), c1 = count(1) by gender | sort gender;

c:l| m:i | m1:i | c1:l| gender:s
33 | 25976 | 25976 | 33 | F
57 | 25945 | 25945 | 57 | M
10 | 25324 | 25324 | 10 | null
;
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.optimizer;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
Expand Down Expand Up @@ -57,8 +58,10 @@
import org.elasticsearch.xpack.ql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.CollectionUtils;
import org.elasticsearch.xpack.ql.util.Holder;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.time.ZoneId;
import java.util.ArrayList;
Expand Down Expand Up @@ -95,13 +98,14 @@ protected static List<Batch<LogicalPlan>> rules() {
new SubstituteSurrogates(),
new ReplaceRegexMatch(),
new ReplaceAliasingEvalWithProject()
// new ReplaceTextFieldAttributesWithTheKeywordSubfield()
// new NormalizeAggregate(), - waits on https://github.com/elastic/elasticsearch/issues/100634
);

var operators = new Batch<>(
"Operator Optimization",
new CombineProjections(),
new CombineEvals(),
new ReplaceDuplicateAggWithEval(),
new PruneEmptyPlans(),
new PropagateEmptyRelation(),
new ConvertStringToByteRef(),
Expand Down Expand Up @@ -947,4 +951,127 @@ private LogicalPlan rule(Eval eval) {
}

}

/**
* Normalize aggregation functions by:
* 1. replaces reference to field attributes with their source
* 2. in case of Count, aligns the various forms (Count(1), Count(0), Count(), Count(*)) to Count(*)
*/
// TODO waiting on https://github.com/elastic/elasticsearch/issues/100634
static class NormalizeAggregate extends Rule<LogicalPlan, LogicalPlan> {

@Override
public LogicalPlan apply(LogicalPlan plan) {
AttributeMap<Expression> aliases = new AttributeMap<>();

// traverse the tree bottom-up
// 1. if it's Aggregate, normalize the aggregates
// regardless, collect the attributes but only if they refer to an attribute or literal
plan = plan.transformUp(p -> {
if (p instanceof Aggregate agg) {
p = normalize(agg, aliases);
}
p.forEachExpression(Alias.class, a -> {
var child = a.child();
if (child.foldable() || child instanceof NamedExpression) {
aliases.putIfAbsent(a.toAttribute(), child);
}
});

return p;
});
return plan;
}

private static LogicalPlan normalize(Aggregate aggregate, AttributeMap<Expression> aliases) {
var aggs = aggregate.aggregates();
List<NamedExpression> newAggs = new ArrayList<>(aggs.size());
boolean changed = false;

for (NamedExpression agg : aggs) {
if (agg instanceof Alias as && as.child() instanceof AggregateFunction af) {
// replace field reference
if (af.field() instanceof NamedExpression ne) {
Attribute attr = ne.toAttribute();
var resolved = aliases.resolve(attr, attr);
if (resolved != attr) {
changed = true;
var newChildren = CollectionUtils.combine(Collections.singletonList(resolved), af.parameters());
// update the reference so Count can pick it up
af = (AggregateFunction) af.replaceChildren(newChildren);
agg = as.replaceChild(af);
}
}
// handle Count(*)
if (af instanceof Count count) {
var field = af.field();
if (field.foldable()) {
var fold = field.fold();
if (fold != null && StringUtils.WILDCARD.equals(fold) == false) {
changed = true;
var source = count.source();
agg = as.replaceChild(new Count(source, new Literal(source, StringUtils.WILDCARD, DataTypes.KEYWORD)));
}
}
}
}
newAggs.add(agg);
}
return changed ? new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), newAggs) : aggregate;
}
}

/**
* Replace aggregations that are duplicated inside an Aggregate with an Eval to avoid duplicated compute.
* stats a = min(x), b = min(x), c = count(*), d = count() by g
* becomes
* stats a = min(x), c = count(*) by g
* eval b = a, d = c
* keep a, b, c, d, g
*/
static class ReplaceDuplicateAggWithEval extends OptimizerRules.OptimizerRule<Aggregate> {

ReplaceDuplicateAggWithEval() {
super(TransformDirection.UP);
}

@Override
protected LogicalPlan rule(Aggregate aggregate) {
LogicalPlan plan = aggregate;

boolean foundDuplicate = false;
var aggs = aggregate.aggregates();
Map<AggregateFunction, Attribute> seenAggs = Maps.newMapWithExpectedSize(aggs.size());
List<NamedExpression> projections = new ArrayList<>();
List<NamedExpression> keptAggs = new ArrayList<>(aggs.size());

for (NamedExpression agg : aggs) {
var attr = agg.toAttribute();
if (agg instanceof Alias as && as.child() instanceof AggregateFunction af) {
var seen = seenAggs.putIfAbsent(af, attr);
if (seen != null) {
foundDuplicate = true;
projections.add(as.replaceChild(seen));
}
// otherwise keep the agg in place
else {
keptAggs.add(agg);
projections.add(attr);
}
} else {
keptAggs.add(agg);
projections.add(attr);
}
}

// at least one duplicate found - add the projection (to keep the output in place)
if (foundDuplicate) {
var source = aggregate.source();
var newAggregate = new Aggregate(source, aggregate.child(), aggregate.groupings(), keptAggs);
plan = new Project(source, newAggregate, projections);
}

return plan;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.planner.FilterTests;
import org.elasticsearch.xpack.esql.planner.Mapper;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -41,6 +42,7 @@
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Expressions;
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.ql.index.EsIndex;
Expand Down Expand Up @@ -299,21 +301,36 @@ public void testAnotherCountAllWithFilter() {
assertThat(expected.toString(), is(esStatsQuery.query().toString()));
}

/**
* Expected
* ProjectExec[[c{r}#3, c{r}#3 AS call, c_literal{r}#7]]
* \_LimitExec[500[INTEGER]]
* \_AggregateExec[[],[COUNT([2a][KEYWORD]) AS c, COUNT(1[INTEGER]) AS c_literal],FINAL,null]
* \_ExchangeExec[[count{r}#18, seen{r}#19, count{r}#20, seen{r}#21],true]
* \_EsStatsQueryExec[test], stats[Stat[name=*, type=COUNT, query=null], Stat[name=*, type=COUNT, query=null]]],
* query[{"esql_single_value":{"field":"emp_no","next":{"range":{"emp_no":{"gt":10010,"boost":1.0}}}}}]
* [count{r}#23, seen{r}#24, count{r}#25, seen{r}#26], limit[],
*/
public void testMultiCountAllWithFilter() {
var plan = plan("""
from test
| where emp_no > 10010
| stats c = count(), call = count(*), c_literal = count(1)
""", IS_SV_STATS);

var limit = as(plan, LimitExec.class);
var project = as(plan, ProjectExec.class);
var projections = project.projections();
assertThat(Expressions.names(projections), contains("c", "call", "c_literal"));
var alias = as(projections.get(1), Alias.class);
assertThat(Expressions.name(alias.child()), is("c"));
var limit = as(project.child(), LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(agg.getMode(), is(FINAL));
assertThat(Expressions.names(agg.aggregates()), contains("c", "call", "c_literal"));
assertThat(Expressions.names(agg.aggregates()), contains("c", "c_literal"));
var exchange = as(agg.child(), ExchangeExec.class);
var esStatsQuery = as(exchange.child(), EsStatsQueryExec.class);
assertThat(esStatsQuery.limit(), is(nullValue()));
assertThat(Expressions.names(esStatsQuery.output()), contains("count", "seen", "count", "seen", "count", "seen"));
assertThat(Expressions.names(esStatsQuery.output()), contains("count", "seen", "count", "seen"));
var expected = wrapWithSingleQuery(QueryBuilders.rangeQuery("emp_no").gt(10010), "emp_no");
assertThat(expected.toString(), is(esStatsQuery.query().toString()));
}
Expand Down
Loading

0 comments on commit baf251e

Please sign in to comment.