Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional tests for outer join push downs #14841

Merged
merged 3 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.sql.planner.assertions.PlanAssert;
import io.trino.sql.planner.assertions.PlanMatchPattern;
import io.trino.sql.planner.optimizations.PlanNodeSearcher;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.testing.LocalQueryRunner;
Expand Down Expand Up @@ -526,6 +527,21 @@ public QueryAssert isNotFullyPushedDown(PlanMatchPattern retainedSubplan)
});
}

/**
* Verifies join query is not fully pushed down by containing JOIN node.
*/
public QueryAssert joinIsNotFullyPushedDown()
hashhar marked this conversation as resolved.
Show resolved Hide resolved
hashhar marked this conversation as resolved.
Show resolved Hide resolved
{
return verifyPlan(plan -> {
if (PlanNodeSearcher.searchFrom(plan.getRoot())
.whereIsInstanceOfAny(JoinNode.class)
.findFirst()
.isEmpty()) {
throw new IllegalStateException("Join node should be present in explain plan, when pushdown is not applied");
}
});
}

/**
* Verifies query has the expected plan and that results are the same as when pushdown is fully disabled.
*/
Expand Down Expand Up @@ -557,6 +573,21 @@ private QueryAssert hasPlan(PlanMatchPattern expectedPlan, Consumer<Plan> additi
return this;
}

private QueryAssert verifyPlan(Consumer<Plan> planVerification)
{
transaction(runner.getTransactionManager(), runner.getAccessControl())
.execute(session, session -> {
Plan plan = runner.createPlan(session, query, WarningCollector.NOOP);
planVerification.accept(plan);
});

if (!skipResultsCorrectnessCheckForPushdown) {
// Compare the results with pushdown disabled, so that explicit matches() call is not needed
hasCorrectResultsRegardlessOfPushdown();
}
return this;
}

@CanIgnoreReturnValue
public QueryAssert hasCorrectResultsRegardlessOfPushdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,16 @@
import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.DYNAMIC_FILTERING_WAIT_TIMEOUT;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.JOIN_PUSHDOWN_ENABLED;
import static io.trino.plugin.jdbc.JoinOperator.FULL_JOIN;
import static io.trino.plugin.jdbc.JoinOperator.JOIN;
import static io.trino.plugin.jdbc.RemoteDatabaseEvent.Status.CANCELLED;
import static io.trino.plugin.jdbc.RemoteDatabaseEvent.Status.RUNNING;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.PARTITIONED;
import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree;
import static io.trino.sql.planner.assertions.PlanMatchPattern.exchange;
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN_CORRELATION;
Expand Down Expand Up @@ -1126,9 +1128,7 @@ public void testJoinPushdownDisabled()
.build();

assertThat(query(noJoinPushdown, "SELECT r.name, n.name FROM nation n JOIN region r ON n.regionkey = r.regionkey"))
.isNotFullyPushedDown(node(JoinNode.class,
anyTree(node(TableScanNode.class)),
anyTree(node(TableScanNode.class))));
.joinIsNotFullyPushedDown();
}

/**
Expand All @@ -1143,32 +1143,37 @@ public void verifySupportsJoinPushdownDeclaration()
}

assertThat(query(joinPushdownEnabled(getSession()), "SELECT r.name, n.name FROM nation n JOIN region r ON n.regionkey = r.regionkey"))
.isNotFullyPushedDown(
node(JoinNode.class,
anyTree(node(TableScanNode.class)),
anyTree(node(TableScanNode.class))));
.joinIsNotFullyPushedDown();
}

/**
* Verify !SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN declaration is true.
*/
@Test
public void testJoinPushdown()
public void verifySupportsJoinPushdownWithFullJoinDeclaration()
{
PlanMatchPattern joinOverTableScans =
node(JoinNode.class,
anyTree(node(TableScanNode.class)),
anyTree(node(TableScanNode.class)));
if (hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN)) {
// Covered by testJoinPushdown
return;
}

PlanMatchPattern broadcastJoinOverTableScans =
node(JoinNode.class,
node(TableScanNode.class),
exchange(ExchangeNode.Scope.LOCAL,
exchange(ExchangeNode.Scope.REMOTE, ExchangeNode.Type.REPLICATE,
node(TableScanNode.class))));
assertThat(query(joinPushdownEnabled(getSession()), "SELECT r.name, n.name FROM nation n FULL JOIN region r ON n.regionkey = r.regionkey"))
.joinIsNotFullyPushedDown();
}

@Test(dataProvider = "joinOperators")
public void testJoinPushdown(JoinOperator joinOperator)
{
Session session = joinPushdownEnabled(getSession());

if (!hasBehavior(SUPPORTS_JOIN_PUSHDOWN)) {
assertThat(query(session, "SELECT r.name, n.name FROM nation n JOIN region r ON n.regionkey = r.regionkey"))
.isNotFullyPushedDown(joinOverTableScans);
.joinIsNotFullyPushedDown();
return;
}

if (joinOperator == FULL_JOIN && !hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN)) {
// Covered by verifySupportsJoinPushdownWithFullJoinDeclaration
return;
}

Expand All @@ -1191,129 +1196,113 @@ public void testJoinPushdown()
"nation_lowercase",
"AS SELECT nationkey, lower(name) name, regionkey FROM nation")) {
// basic case
assertThat(query(session, "SELECT r.name, n.name FROM nation n JOIN region r ON n.regionkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, format("SELECT r.name, n.name FROM nation n %s region r ON n.regionkey = r.regionkey", joinOperator))).isFullyPushedDown();

// join over different columns
assertThat(query(session, "SELECT r.name, n.name FROM nation n JOIN region r ON n.nationkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, format("SELECT r.name, n.name FROM nation n %s region r ON n.nationkey = r.regionkey", joinOperator))).isFullyPushedDown();

// pushdown when using USING
assertThat(query(session, "SELECT r.name, n.name FROM nation n JOIN region r USING(regionkey)")).isFullyPushedDown();
assertThat(query(session, format("SELECT r.name, n.name FROM nation n %s region r USING(regionkey)", joinOperator))).isFullyPushedDown();

// varchar equality predicate
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
"SELECT n.name, n2.regionkey FROM nation n JOIN nation n2 ON n.name = n2.name",
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY),
joinOverTableScans);
assertConditionallyPushedDown(
format("SELECT n.name, n2.regionkey FROM nation n %s nation n2 ON n.name = n2.name", joinOperator),
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY));
assertJoinConditionallyPushedDown(
session,
format("SELECT n.name, nl.regionkey FROM nation n JOIN %s nl ON n.name = nl.name", nationLowercaseTable.getName()),
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY),
joinOverTableScans);
format("SELECT n.name, nl.regionkey FROM nation n %s %s nl ON n.name = nl.name", joinOperator, nationLowercaseTable.getName()),
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY));

// multiple bigint predicates
assertThat(query(session, "SELECT n.name, c.name FROM nation n JOIN customer c ON n.nationkey = c.nationkey and n.regionkey = c.custkey"))
assertThat(query(session, format("SELECT n.name, c.name FROM nation n %s customer c ON n.nationkey = c.nationkey and n.regionkey = c.custkey", joinOperator)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next part // inequality - is tricky one, not sure why but queries with most of inequality operators start to be fully pushdown with OUTER joins (working on this to understand why)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is also added now, the problem was that we treat INNER join in this case as CROSS JOIN and disable push down for such cases:

public class PushJoinIntoTableScan
        implements Rule<JoinNode>
    @Override
    public Result apply(JoinNode joinNode, Captures captures, Context context)
    {
        if (joinNode.isCrossJoin()) {
            return Result.empty();
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem was that we treat INNER join in this case as CROSS JOIN

Join with no equi conditions gets planned as

FilterNode
- CrossJoin
  - Source A
  - Source B

and disable push down for such cases:

a safety measure

But we need to match the plan patterns like above and run Join pushdown for these as well.
I guiess @wendigo may be working on this. I remember explaining this to him.

.isFullyPushedDown();

// inequality
for (String operator : nonEqualities) {
// bigint inequality predicate
assertThat(query(withoutDynamicFiltering, format("SELECT r.name, n.name FROM nation n JOIN region r ON n.regionkey %s r.regionkey", operator)))
// Currently no pushdown as inequality predicate is removed from Join to maintain Cross Join and Filter as separate nodes
.isNotFullyPushedDown(broadcastJoinOverTableScans);
assertJoinConditionallyPushedDown(
withoutDynamicFiltering,
format("SELECT r.name, n.name FROM nation n %s region r ON n.regionkey %s r.regionkey", joinOperator, operator),
expectJoinPushdown(operator) && expectJoinPushdowOnInequalityOperator(joinOperator));

// varchar inequality predicate
assertThat(query(withoutDynamicFiltering, format("SELECT n.name, nl.name FROM nation n JOIN %s nl ON n.name %s nl.name", nationLowercaseTable.getName(), operator)))
// Currently no pushdown as inequality predicate is removed from Join to maintain Cross Join and Filter as separate nodes
.isNotFullyPushedDown(broadcastJoinOverTableScans);
assertJoinConditionallyPushedDown(
withoutDynamicFiltering,
format("SELECT n.name, nl.name FROM nation n %s %s nl ON n.name %s nl.name", joinOperator, nationLowercaseTable.getName(), operator),
expectVarcharJoinPushdown(operator) && expectJoinPushdowOnInequalityOperator(joinOperator));
}

// inequality along with an equality, which constitutes an equi-condition and allows filter to remain as part of the Join
for (String operator : nonEqualities) {
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
format("SELECT n.name, c.name FROM nation n JOIN customer c ON n.nationkey = c.nationkey AND n.regionkey %s c.custkey", operator),
expectJoinPushdown(operator),
joinOverTableScans);
format("SELECT n.name, c.name FROM nation n %s customer c ON n.nationkey = c.nationkey AND n.regionkey %s c.custkey", joinOperator, operator),
expectJoinPushdown(operator));
}

// varchar inequality along with an equality, which constitutes an equi-condition and allows filter to remain as part of the Join
for (String operator : nonEqualities) {
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
format("SELECT n.name, nl.name FROM nation n JOIN %s nl ON n.regionkey = nl.regionkey AND n.name %s nl.name", nationLowercaseTable.getName(), operator),
expectVarcharJoinPushdown(operator),
joinOverTableScans);
format("SELECT n.name, nl.name FROM nation n %s %s nl ON n.regionkey = nl.regionkey AND n.name %s nl.name", joinOperator, nationLowercaseTable.getName(), operator),
expectVarcharJoinPushdown(operator));
}

// LEFT JOIN
assertThat(query(session, "SELECT r.name, n.name FROM nation n LEFT JOIN region r ON n.nationkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, "SELECT r.name, n.name FROM region r LEFT JOIN nation n ON n.nationkey = r.regionkey")).isFullyPushedDown();

// RIGHT JOIN
assertThat(query(session, "SELECT r.name, n.name FROM nation n RIGHT JOIN region r ON n.nationkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, "SELECT r.name, n.name FROM region r RIGHT JOIN nation n ON n.nationkey = r.regionkey")).isFullyPushedDown();

hashhar marked this conversation as resolved.
Show resolved Hide resolved
// FULL JOIN
assertConditionallyPushedDown(
session,
"SELECT r.name, n.name FROM nation n FULL JOIN region r ON n.nationkey = r.regionkey",
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN),
joinOverTableScans);

// Join over a (double) predicate
assertThat(query(session, "" +
assertThat(query(session, format("" +
"SELECT c.name, n.name " +
"FROM (SELECT * FROM customer WHERE acctbal > 8000) c " +
"JOIN nation n ON c.custkey = n.nationkey"))
"%s nation n ON c.custkey = n.nationkey", joinOperator)))
.isFullyPushedDown();

// Join over a varchar equality predicate
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
"SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address = 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"JOIN nation n ON c.custkey = n.nationkey",
hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY),
joinOverTableScans);
format("SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address = 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"%s nation n ON c.custkey = n.nationkey", joinOperator),
hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY));

// Join over a varchar inequality predicate
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
"SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address < 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"JOIN nation n ON c.custkey = n.nationkey",
hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY),
joinOverTableScans);
format("SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address < 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"%s nation n ON c.custkey = n.nationkey", joinOperator),
hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY));

// join over aggregation
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
"SELECT * FROM (SELECT regionkey rk, count(nationkey) c FROM nation GROUP BY regionkey) n " +
"JOIN region r ON n.rk = r.regionkey",
hasBehavior(SUPPORTS_AGGREGATION_PUSHDOWN),
joinOverTableScans);
format("SELECT * FROM (SELECT regionkey rk, count(nationkey) c FROM nation GROUP BY regionkey) n " +
"%s region r ON n.rk = r.regionkey", joinOperator),
hasBehavior(SUPPORTS_AGGREGATION_PUSHDOWN));

// join over LIMIT
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
"SELECT * FROM (SELECT nationkey FROM nation LIMIT 30) n " +
"JOIN region r ON n.nationkey = r.regionkey",
hasBehavior(SUPPORTS_LIMIT_PUSHDOWN),
joinOverTableScans);
format("SELECT * FROM (SELECT nationkey FROM nation LIMIT 30) n " +
"%s region r ON n.nationkey = r.regionkey", joinOperator),
hasBehavior(SUPPORTS_LIMIT_PUSHDOWN));

// join over TopN
assertConditionallyPushedDown(
assertJoinConditionallyPushedDown(
session,
"SELECT * FROM (SELECT nationkey FROM nation ORDER BY regionkey LIMIT 5) n " +
"JOIN region r ON n.nationkey = r.regionkey",
hasBehavior(SUPPORTS_TOPN_PUSHDOWN),
joinOverTableScans);
format("SELECT * FROM (SELECT nationkey FROM nation ORDER BY regionkey LIMIT 5) n " +
"%s region r ON n.nationkey = r.regionkey", joinOperator),
hasBehavior(SUPPORTS_TOPN_PUSHDOWN));

// join over join
assertThat(query(session, "SELECT * FROM nation n, region r, customer c WHERE n.regionkey = r.regionkey AND r.regionkey = c.custkey"))
.isFullyPushedDown();
}
}

@DataProvider
public Object[][] joinOperators()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operators is the "execution" term while "type" is the SQL term maybe? Would joinTypes be better? (especially since there's no seaprate join operator for each of these "operators").

(No change requested, just seeking opinion from others).

{
return Stream.of(JoinOperator.values()).collect(toDataProvider());
}

@Test
public void testExplainAnalyzePhysicalReadWallTime()
{
Expand All @@ -1335,6 +1324,18 @@ protected QueryAssert assertConditionallyPushedDown(
return queryAssert.isNotFullyPushedDown(otherwiseExpected);
}

protected QueryAssert assertJoinConditionallyPushedDown(
Session session,
@Language("SQL") String query,
boolean condition)
{
QueryAssert queryAssert = assertThat(query(session, query));
if (condition) {
return queryAssert.isFullyPushedDown();
}
return queryAssert.joinIsNotFullyPushedDown();
}

protected void assertConditionallyOrderedPushedDown(
Session session,
@Language("SQL") String query,
Expand Down Expand Up @@ -1370,6 +1371,12 @@ protected boolean expectJoinPushdown(String operator)
throw new AssertionError(); // unreachable
}

protected boolean expectJoinPushdowOnInequalityOperator(JoinOperator joinOperator)
{
// Currently no pushdown as inequality predicate is removed from Join to maintain Cross Join and Filter as separate nodes
return joinOperator != JOIN;
}

private boolean expectVarcharJoinPushdown(String operator)
{
if ("IS NOT DISTINCT FROM".equals(operator)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

public enum JoinOperator
{
JOIN("JOIN"),
LEFT_JOIN("LEFT JOIN"),
RIGHT_JOIN("RIGHT JOIN"),
FULL_JOIN("FULL JOIN"),
Comment on lines +16 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it 1-to-1 mapping with io.trino.spi.connector.JoinType ?
have you considered to reuse JoinType?
have you considered to verify that JoinOperator contains all values of JoinType?
Do you anticipate other values can be present here in the future?

Copy link
Contributor Author

@vlad-lyutenko vlad-lyutenko Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this operator it's more about how JOIN condition is present in actual String query,
for example in future or for some connectors (in case of some bugs) we could add/write LEFT OUTER JOIN instead/additionally to LEFT JOIN. (however this is the same type of join)
So I'd prefer to keep these things separately

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to tackle already defined enums and their string representation elegantly.
And this does not seem to change frequently or any time soon.
so may be at least some verify (JoinOperator.values.size() vs JoinType.values().size())?

or for some connectors (in case of some bugs) we could add/write LEFT OUTER JOIN instead/additionally to LEFT JOIN.

don't see how to do that easily with current implementation, but I think it does not matter here/now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't see how to do that easily with current implementation, but I think it does not matter here/now.

you can just add to enum :
...
LEFT_OUTER_JOIN("LEFT OUTER JOIN"),
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so may be at least some verify (JoinOperator.values.size() vs JoinType.values().size())?

For me it's not mapping to actual joins, maybe another naming will help - like JoinOperatorStringRepresentation, don't know

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important to decouple the following three things:

  1. Join types on the plan level (represented by JoinNode.Type)
  2. Join types on SPI level (represented by JoinType)
  3. Mapping SPI join types to SQL strings (represented by JoinOperators here).

There's no reason for 1:1 mapping between 1 and 2.
There also no reason for 1:1 mapping between 2 and 3. e.g. LEFT OUTER can appear in SQL text as LEFT or LEFT OUTER - both are same thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to verify JoinType values are subset of JoinOperator but it sounds premature - it's only a problem when someone implements a new Join node, adds plan optimizer to push down to table scan, implements in some connector - all of this without adding tests.

/**/;

private final String value;

JoinOperator(String value)
{
this.value = value;
}

public String getValue()
{
return value;
}

@Override
public String toString()
{
return value;
Comment on lines +37 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice trick

}
}
Loading