Skip to content

Commit

Permalink
ESQL: push down LIMIT past LOOKUP JOIN (#118495)
Browse files Browse the repository at this point in the history
Fix #117698 by enabling
push down of `LIMIT` past `LEFT JOIN`s.

There is a subtle point here: our `LOOKUP JOIN` currently _exactly
preserves the number of rows from the left hand side_. This is different
from SQL, where `LEFT JOIN` will return _at least one row for each row
from the left_, but may return multiple rows in case of multiple
matches. We, instead, throw multiple matches into multi-values, instead.
(C.f. [tests that I'm about to
add](https://github.com/elastic/elasticsearch/pull/118471/files#diff-334f3328c5f066a093ed8a5ea4a62cd6bcdb304b660b15763bb4f64d0e87ed7cR365-R369)
that demonstrate this.)

If we were to change our semantics to match SQL's, we'd have to adjust
the pushdown, too.
  • Loading branch information
alex-spies authored Dec 13, 2024
1 parent 23008be commit 4ff5acc
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;

import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V4;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V5;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.ASYNC;

public class MixedClusterEsqlSpecIT extends EsqlSpecTestCase {
Expand Down Expand Up @@ -96,7 +96,7 @@ protected boolean supportsInferenceTestService() {

@Override
protected boolean supportsIndexModeLookup() throws IOException {
return hasCapabilities(List.of(JOIN_LOOKUP_V4.capabilityName()));
return hasCapabilities(List.of(JOIN_LOOKUP_V5.capabilityName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V4;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V5;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.Mode.SYNC;
Expand Down Expand Up @@ -124,7 +124,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V4.capabilityName()));
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V5.capabilityName()));
}

private TestFeatureService remoteFeaturesService() throws IOException {
Expand Down Expand Up @@ -283,8 +283,8 @@ protected boolean supportsInferenceTestService() {

@Override
protected boolean supportsIndexModeLookup() throws IOException {
// CCS does not yet support JOIN_LOOKUP_V4 and clusters falsely report they have this capability
// return hasCapabilities(List.of(JOIN_LOOKUP_V4.capabilityName()));
// CCS does not yet support JOIN_LOOKUP_V5 and clusters falsely report they have this capability
// return hasCapabilities(List.of(JOIN_LOOKUP_V5.capabilityName()));
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

//TODO: this sometimes returns null instead of the looked up value (likely related to the execution order)
basicOnTheDataNode
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
Expand All @@ -22,7 +22,7 @@ emp_no:integer | language_code:integer | language_name:keyword
;

basicRow
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW language_code = 1
| LOOKUP JOIN languages_lookup ON language_code
Expand All @@ -33,7 +33,7 @@ language_code:integer | language_name:keyword
;

basicOnTheCoordinator
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| SORT emp_no
Expand All @@ -50,7 +50,7 @@ emp_no:integer | language_code:integer | language_name:keyword
;

subsequentEvalOnTheDataNode
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| EVAL language_code = languages
Expand All @@ -68,7 +68,7 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x
;

subsequentEvalOnTheCoordinator
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM employees
| SORT emp_no
Expand All @@ -85,8 +85,25 @@ emp_no:integer | language_code:integer | language_name:keyword | language_code_x
10003 | 4 | german | 8
;

sortEvalBeforeLookup
required_capability: join_lookup_v5

FROM employees
| SORT emp_no
| EVAL language_code = (emp_no % 10) + 1
| LOOKUP JOIN languages_lookup ON language_code
| KEEP emp_no, language_code, language_name
| LIMIT 3
;

emp_no:integer | language_code:integer | language_name:keyword
10001 | 2 | French
10002 | 3 | Spanish
10003 | 4 | German
;

lookupIPFromRow
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", right = "right"
| LOOKUP JOIN clientips_lookup ON client_ip
Expand All @@ -97,7 +114,7 @@ left | 172.21.0.5 | right | Development
;

lookupIPFromRowWithShadowing
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| LOOKUP JOIN clientips_lookup ON client_ip
Expand All @@ -108,7 +125,7 @@ left | 172.21.0.5 | right | Development
;

lookupIPFromRowWithShadowingKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| EVAL client_ip = client_ip::keyword
Expand All @@ -121,7 +138,7 @@ left | 172.21.0.5 | right | Development
;

lookupIPFromRowWithShadowingKeepReordered
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
| EVAL client_ip = client_ip::keyword
Expand All @@ -134,7 +151,7 @@ right | Development | 172.21.0.5
;

lookupIPFromIndex
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -153,7 +170,7 @@ ignoreOrder:true
;

lookupIPFromIndexKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -173,7 +190,7 @@ ignoreOrder:true
;

lookupIPFromIndexStats
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -189,7 +206,7 @@ count:long | env:keyword
;

lookupIPFromIndexStatsKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| EVAL client_ip = client_ip::keyword
Expand All @@ -206,7 +223,7 @@ count:long | env:keyword
;

lookupMessageFromRow
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", message = "Connected to 10.1.0.1", right = "right"
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -217,7 +234,7 @@ left | Connected to 10.1.0.1 | right | Success
;

lookupMessageFromRowWithShadowing
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right"
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -228,7 +245,7 @@ left | Connected to 10.1.0.1 | right | Success
;

lookupMessageFromRowWithShadowingKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

ROW left = "left", message = "Connected to 10.1.0.1", type = "unknown", right = "right"
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -240,7 +257,7 @@ left | Connected to 10.1.0.1 | right | Success
;

lookupMessageFromIndex
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -258,7 +275,7 @@ ignoreOrder:true
;

lookupMessageFromIndexKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -277,7 +294,7 @@ ignoreOrder:true
;

lookupMessageFromIndexKeepReordered
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -296,7 +313,7 @@ Success | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
;

lookupMessageFromIndexStats
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand All @@ -311,7 +328,7 @@ count:long | type:keyword
;

lookupMessageFromIndexStatsKeep
required_capability: join_lookup_v4
required_capability: join_lookup_v5

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ public enum Cap {
/**
* LOOKUP JOIN
*/
JOIN_LOOKUP_V4(Build.current().isSnapshot()),
JOIN_LOOKUP_V5(Build.current().isSnapshot()),

/**
* Fix for https://github.com/elastic/elasticsearch/issues/117054
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;

public final class PushDownAndCombineLimits extends OptimizerRules.OptimizerRule<Limit> {

Expand Down Expand Up @@ -63,8 +62,10 @@ public LogicalPlan rule(Limit limit) {
}
}
} else if (limit.child() instanceof Join join) {
if (join.config().type() == JoinTypes.LEFT && join.right() instanceof LocalRelation) {
// This is a hash join from something like a lookup.
if (join.config().type() == JoinTypes.LEFT) {
// NOTE! This is only correct because our LEFT JOINs preserve the number of rows from the left hand side.
// This deviates from SQL semantics. In SQL, multiple matches on the right hand side lead to multiple rows in the output.
// For us, multiple matches on the right hand side are collected into multi-values.
return join.replaceChildren(limit.replaceChild(join.left()), join.right());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public final void test() throws Throwable {
);
assumeFalse(
"lookup join disabled for csv tests",
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V4.capabilityName())
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.JOIN_LOOKUP_V5.capabilityName())
);
assumeFalse(
"can't use TERM function in csv tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,7 @@ public void testLookupMatchTypeWrong() {
}

public void testLookupJoinUnknownIndex() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled());

String errorMessage = "Unknown index [foobar]";
IndexResolution missingLookupIndex = IndexResolution.invalid(errorMessage);
Expand Down Expand Up @@ -2168,7 +2168,7 @@ public void testLookupJoinUnknownIndex() {
}

public void testLookupJoinUnknownField() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled());

String query = "FROM test | LOOKUP JOIN languages_lookup ON last_name";
String errorMessage = "1:45: Unknown column [last_name] in right side of join";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1964,7 +1964,7 @@ public void testSortByAggregate() {
}

public void testLookupJoinDataTypeMismatch() {
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V4.isEnabled());
assumeTrue("requires LOOKUP JOIN capability", EsqlCapabilities.Cap.JOIN_LOOKUP_V5.isEnabled());

query("FROM test | EVAL language_code = languages | LOOKUP JOIN languages_lookup ON language_code");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.xpack.esql.core.expression.predicate.logical.Or;
import org.elasticsearch.xpack.esql.core.expression.predicate.nulls.IsNotNull;
import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.util.Holder;
Expand Down Expand Up @@ -112,7 +113,9 @@
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
Expand All @@ -138,6 +141,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TWO;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptySource;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.fieldAttribute;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getFieldAttribute;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.localSource;
Expand Down Expand Up @@ -1291,6 +1295,26 @@ public void testCombineLimits() {
);
}

public void testPushdownLimitsPastLeftJoin() {
var leftChild = emptySource();
var rightChild = new LocalRelation(Source.EMPTY, List.of(fieldAttribute()), LocalSupplier.EMPTY);
assertNotEquals(leftChild, rightChild);

var joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), List.of());
var join = switch (randomIntBetween(0, 2)) {
case 0 -> new Join(EMPTY, leftChild, rightChild, joinConfig);
case 1 -> new LookupJoin(EMPTY, leftChild, rightChild, joinConfig);
case 2 -> new InlineJoin(EMPTY, leftChild, rightChild, joinConfig);
default -> throw new IllegalArgumentException();
};

var limit = new Limit(EMPTY, L(10), join);

var optimizedPlan = new PushDownAndCombineLimits().rule(limit);

assertEquals(join.replaceChildren(limit.replaceChild(join.left()), join.right()), optimizedPlan);
}

public void testMultipleCombineLimits() {
var numberOfLimits = randomIntBetween(3, 10);
var minimum = randomIntBetween(10, 99);
Expand Down

0 comments on commit 4ff5acc

Please sign in to comment.