Skip to content

Commit

Permalink
ESQL: Dependency check for binary plans (elastic#118326) (elastic#118654
Browse files Browse the repository at this point in the history
)

Make the dependency checker for query plans take into account binary plans and make sure that fields required from the left hand side are actually obtained from there (and analogously for the right).
  • Loading branch information
alex-spies authored Dec 13, 2024
1 parent 3a2dc1e commit 041def0
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class EsIndex implements Writeable {
private final Map<String, EsField> mapping;
private final Map<String, IndexMode> indexNameWithModes;

/**
* Intended for tests. Returns an index with an empty index mode map.
*/
public EsIndex(String name, Map<String, EsField> mapping) {
this(name, mapping, Map.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

public final class LogicalVerifier {

private static final PlanConsistencyChecker<LogicalPlan> DEPENDENCY_CHECK = new PlanConsistencyChecker<>();
public static final LogicalVerifier INSTANCE = new LogicalVerifier();

private LogicalVerifier() {}
Expand All @@ -25,7 +24,7 @@ public Failures verify(LogicalPlan plan) {
Failures dependencyFailures = new Failures();

plan.forEachUp(p -> {
DEPENDENCY_CHECK.checkPlan(p, dependencyFailures);
PlanConsistencyChecker.checkPlan(p, dependencyFailures);

if (failures.hasFailures() == false) {
p.forEachExpression(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand All @@ -28,7 +27,6 @@
public final class PhysicalVerifier {

public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
private static final PlanConsistencyChecker<PhysicalPlan> DEPENDENCY_CHECK = new PlanConsistencyChecker<>();

private PhysicalVerifier() {}

Expand All @@ -44,11 +42,6 @@ public Collection<Failure> verify(PhysicalPlan plan) {
}

plan.forEachDown(p -> {
if (p instanceof AggregateExec agg) {
var exclude = Expressions.references(agg.ordinalAttributes());
DEPENDENCY_CHECK.checkPlan(p, exclude, depFailures);
return;
}
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
Expand All @@ -62,7 +55,7 @@ public Collection<Failure> verify(PhysicalPlan plan) {
);
}
}
DEPENDENCY_CHECK.checkPlan(p, depFailures);
PlanConsistencyChecker.checkPlan(p, depFailures);
});

if (depFailures.hasFailures()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,42 @@
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;

import java.util.HashSet;
import java.util.Set;

import static org.elasticsearch.xpack.esql.common.Failure.fail;

public class PlanConsistencyChecker<P extends QueryPlan<P>> {
public class PlanConsistencyChecker {

/**
* Check whether a single {@link QueryPlan} produces no duplicate attributes and its children provide all of its required
* {@link QueryPlan#references() references}. Otherwise, add
* {@link org.elasticsearch.xpack.esql.common.Failure Failure}s to the {@link Failures} object.
*/
public void checkPlan(P p, Failures failures) {
checkPlan(p, AttributeSet.EMPTY, failures);
}

public void checkPlan(P p, AttributeSet exclude, Failures failures) {
AttributeSet refs = p.references();
AttributeSet input = p.inputSet();
AttributeSet missing = refs.subtract(input).subtract(exclude);
// TODO: for Joins, we should probably check if the required fields from the left child are actually in the left child, not
// just any child (and analogously for the right child).
if (missing.isEmpty() == false) {
failures.add(fail(p, "Plan [{}] optimized incorrectly due to missing references {}", p.nodeString(), missing));
public static void checkPlan(QueryPlan<?> p, Failures failures) {
if (p instanceof BinaryPlan binaryPlan) {
checkMissingBinary(
p,
binaryPlan.leftReferences(),
binaryPlan.left().outputSet(),
binaryPlan.rightReferences(),
binaryPlan.right().outputSet(),
failures
);
} else if (p instanceof BinaryExec binaryExec) {
checkMissingBinary(
p,
binaryExec.leftReferences(),
binaryExec.left().outputSet(),
binaryExec.rightReferences(),
binaryExec.right().outputSet(),
failures
);
} else {
checkMissing(p, p.references(), p.inputSet(), "missing references", failures);
}

Set<String> outputAttributeNames = new HashSet<>();
Expand All @@ -49,4 +60,29 @@ public void checkPlan(P p, AttributeSet exclude, Failures failures) {
}
}
}

private static void checkMissingBinary(
QueryPlan<?> plan,
AttributeSet leftReferences,
AttributeSet leftInput,
AttributeSet rightReferences,
AttributeSet rightInput,
Failures failures
) {
checkMissing(plan, leftReferences, leftInput, "missing references from left hand side", failures);
checkMissing(plan, rightReferences, rightInput, "missing references from right hand side", failures);
}

private static void checkMissing(
QueryPlan<?> plan,
AttributeSet references,
AttributeSet input,
String detailErrorMessage,
Failures failures
) {
AttributeSet missing = references.subtract(input);
if (missing.isEmpty() == false) {
failures.add(fail(plan, "Plan [{}] optimized incorrectly due to {} {}", plan.nodeString(), detailErrorMessage, missing));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.util.Arrays;
Expand All @@ -30,6 +31,10 @@ public LogicalPlan right() {
return right;
}

public abstract AttributeSet leftReferences();

public abstract AttributeSet rightReferences();

@Override
public final BinaryPlan replaceChildren(List<LogicalPlan> newChildren) {
return replaceChildren(newChildren.get(0), newChildren.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand Down Expand Up @@ -97,6 +98,16 @@ public List<Attribute> output() {
return lazyOutput;
}

@Override
public AttributeSet leftReferences() {
return Expressions.references(config().leftFields());
}

@Override
public AttributeSet rightReferences() {
return Expressions.references(config().rightFields());
}

public List<Attribute> rightOutputFields() {
AttributeSet leftInputs = left().outputSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ public List<Attribute> output() {

@Override
protected AttributeSet computeReferences() {
return mode.isInputPartial() ? new AttributeSet(intermediateAttributes) : Aggregate.computeReferences(aggregates, groupings);
return mode.isInputPartial()
? new AttributeSet(intermediateAttributes)
: Aggregate.computeReferences(aggregates, groupings).subtract(new AttributeSet(ordinalAttributes()));
}

/** Returns the attributes that can be loaded from ordinals -- no explicit extraction is needed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.plan.physical;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.io.IOException;
Expand Down Expand Up @@ -40,6 +41,10 @@ public PhysicalPlan right() {
return right;
}

public abstract AttributeSet leftReferences();

public abstract AttributeSet rightReferences();

@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ protected AttributeSet computeReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet leftReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet rightReferences() {
return Expressions.references(rightFields);
}

@Override
public HashJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
return new HashJoinExec(source(), left, right, matchFields, leftFields, rightFields, output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,21 @@ protected AttributeSet computeReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet leftReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet rightReferences() {
// TODO: currently it's hard coded that we add all fields from the lookup index. But the output we "officially" get from the right
// hand side is inconsistent:
// - After logical optimization, there's a FragmentExec with an EsRelation on the right hand side with all the fields.
// - After local physical optimization, there's just an EsQueryExec here, with no fields other than _doc mentioned and we don't
// insert field extractions in the plan, either.
return AttributeSet.EMPTY;
}

@Override
public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.analysis;

import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
Expand Down Expand Up @@ -104,6 +105,11 @@ public static LogicalPlan analyze(String query, String mapping, QueryParams para
return analyzer.analyze(plan);
}

public static IndexResolution loadMapping(String resource, String indexName, IndexMode indexMode) {
EsIndex test = new EsIndex(indexName, EsqlTestUtils.loadMapping(resource), Map.of(indexName, indexMode));
return IndexResolution.valid(test);
}

public static IndexResolution loadMapping(String resource, String indexName) {
EsIndex test = new EsIndex(indexName, EsqlTestUtils.loadMapping(resource));
return IndexResolution.valid(test);
Expand All @@ -118,7 +124,7 @@ public static IndexResolution expandedDefaultIndexResolution() {
}

public static IndexResolution defaultLookupResolution() {
return loadMapping("mapping-languages.json", "languages_lookup");
return loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP);
}

public static EnrichResolution defaultEnrichResolution() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.core.expression.Literal.NULL;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
Expand Down Expand Up @@ -221,7 +222,13 @@ public static void init() {
EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD));
IndexResolution getIndexResult = IndexResolution.valid(test);
analyzer = new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, enrichResolution),
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
getIndexResult,
defaultLookupResolution(),
enrichResolution
),
TEST_VERIFIER
);

Expand Down Expand Up @@ -4896,6 +4903,26 @@ public void testPlanSanityCheck() throws Exception {
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [salary"));
}

public void testPlanSanityCheckWithBinaryPlans() throws Exception {
var plan = optimizedPlan("""
FROM test
| RENAME languages AS language_code
| LOOKUP JOIN languages_lookup ON language_code
""");

var project = as(plan, Project.class);
var limit = as(project.child(), Limit.class);
var join = as(limit.child(), Join.class);

var joinWithInvalidLeftPlan = join.replaceChildren(join.right(), join.right());
IllegalStateException e = expectThrows(IllegalStateException.class, () -> logicalOptimizer.optimize(joinWithInvalidLeftPlan));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from left hand side [language_code"));

var joinWithInvalidRightPlan = join.replaceChildren(join.left(), join.left());
e = expectThrows(IllegalStateException.class, () -> logicalOptimizer.optimize(joinWithInvalidRightPlan));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from right hand side [language_code"));
}

// https://github.com/elastic/elasticsearch/issues/104995
public void testNoWrongIsNotNullPruning() {
var plan = optimizedPlan("""
Expand Down
Loading

0 comments on commit 041def0

Please sign in to comment.