Skip to content

Commit

Permalink
Add additional tests for outer join to BaseJdbcConnectorTest
Browse files Browse the repository at this point in the history
This uncovers a bug in Postgres connector that pushing down FULL OUTER join
queries with inequality join conditions fails with an error like "FULL JOIN is
only supported with merge-joinable or hash-joinable join conditions".

So FULL OUTER join pushdown is disabled for Postgres connector at the moment.
  • Loading branch information
vlad-lyutenko authored and hashhar committed Nov 16, 2022
1 parent 4774881 commit 603c83d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.DYNAMIC_FILTERING_WAIT_TIMEOUT;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.jdbc.JdbcMetadataSessionProperties.JOIN_PUSHDOWN_ENABLED;
import static io.trino.plugin.jdbc.JoinOperator.FULL_JOIN;
import static io.trino.plugin.jdbc.JoinOperator.JOIN;
import static io.trino.plugin.jdbc.RemoteDatabaseEvent.Status.CANCELLED;
import static io.trino.plugin.jdbc.RemoteDatabaseEvent.Status.RUNNING;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType;
Expand All @@ -71,6 +73,7 @@
import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree;
import static io.trino.sql.planner.assertions.PlanMatchPattern.exchange;
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
import static io.trino.testing.DataProviders.toDataProvider;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN_CORRELATION;
Expand Down Expand Up @@ -1149,21 +1152,14 @@ public void verifySupportsJoinPushdownDeclaration()
anyTree(node(TableScanNode.class))));
}

@Test
public void testJoinPushdown()
@Test(dataProvider = "joinOperators")
public void testJoinPushdown(JoinOperator joinOperator)
{
PlanMatchPattern joinOverTableScans =
node(JoinNode.class,
anyTree(node(TableScanNode.class)),
anyTree(node(TableScanNode.class)));

PlanMatchPattern broadcastJoinOverTableScans =
node(JoinNode.class,
node(TableScanNode.class),
exchange(ExchangeNode.Scope.LOCAL,
exchange(ExchangeNode.Scope.REMOTE, ExchangeNode.Type.REPLICATE,
node(TableScanNode.class))));

Session session = joinPushdownEnabled(getSession());

if (!hasBehavior(SUPPORTS_JOIN_PUSHDOWN)) {
Expand All @@ -1172,6 +1168,12 @@ public void testJoinPushdown()
return;
}

if (joinOperator == FULL_JOIN && !hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN)) {
assertThat(query(session, "SELECT r.name, n.name FROM nation n FULL JOIN region r ON n.regionkey = r.regionkey"))
.isNotFullyPushedDown(joinOverTableScans);
return;
}

// Disable DF here for the sake of negative test cases' expected plan. With DF enabled, some operators return in DF's FilterNode and some do not.
Session withoutDynamicFiltering = Session.builder(session)
.setSystemProperty("enable_dynamic_filtering", "false")
Expand All @@ -1191,48 +1193,50 @@ public void testJoinPushdown()
"nation_lowercase",
"AS SELECT nationkey, lower(name) name, regionkey FROM nation")) {
// basic case
assertThat(query(session, "SELECT r.name, n.name FROM nation n JOIN region r ON n.regionkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, format("SELECT r.name, n.name FROM nation n %s region r ON n.regionkey = r.regionkey", joinOperator))).isFullyPushedDown();

// join over different columns
assertThat(query(session, "SELECT r.name, n.name FROM nation n JOIN region r ON n.nationkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, format("SELECT r.name, n.name FROM nation n %s region r ON n.nationkey = r.regionkey", joinOperator))).isFullyPushedDown();

// pushdown when using USING
assertThat(query(session, "SELECT r.name, n.name FROM nation n JOIN region r USING(regionkey)")).isFullyPushedDown();
assertThat(query(session, format("SELECT r.name, n.name FROM nation n %s region r USING(regionkey)", joinOperator))).isFullyPushedDown();

// varchar equality predicate
assertConditionallyPushedDown(
session,
"SELECT n.name, n2.regionkey FROM nation n JOIN nation n2 ON n.name = n2.name",
format("SELECT n.name, n2.regionkey FROM nation n %s nation n2 ON n.name = n2.name", joinOperator),
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY),
joinOverTableScans);
assertConditionallyPushedDown(
session,
format("SELECT n.name, nl.regionkey FROM nation n JOIN %s nl ON n.name = nl.name", nationLowercaseTable.getName()),
format("SELECT n.name, nl.regionkey FROM nation n %s %s nl ON n.name = nl.name", joinOperator, nationLowercaseTable.getName()),
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY),
joinOverTableScans);

// multiple bigint predicates
assertThat(query(session, "SELECT n.name, c.name FROM nation n JOIN customer c ON n.nationkey = c.nationkey and n.regionkey = c.custkey"))
assertThat(query(session, format("SELECT n.name, c.name FROM nation n %s customer c ON n.nationkey = c.nationkey and n.regionkey = c.custkey", joinOperator)))
.isFullyPushedDown();

// inequality
for (String operator : nonEqualities) {
// bigint inequality predicate
assertThat(query(withoutDynamicFiltering, format("SELECT r.name, n.name FROM nation n JOIN region r ON n.regionkey %s r.regionkey", operator)))
// Currently no pushdown as inequality predicate is removed from Join to maintain Cross Join and Filter as separate nodes
.isNotFullyPushedDown(broadcastJoinOverTableScans);
assertJoinConditionallyPushedDown(
withoutDynamicFiltering,
format("SELECT r.name, n.name FROM nation n %s region r ON n.regionkey %s r.regionkey", joinOperator, operator),
expectJoinPushdown(operator) && expectJoinPushdowOnInequalityOperator(joinOperator));

// varchar inequality predicate
assertThat(query(withoutDynamicFiltering, format("SELECT n.name, nl.name FROM nation n JOIN %s nl ON n.name %s nl.name", nationLowercaseTable.getName(), operator)))
// Currently no pushdown as inequality predicate is removed from Join to maintain Cross Join and Filter as separate nodes
.isNotFullyPushedDown(broadcastJoinOverTableScans);
assertJoinConditionallyPushedDown(
withoutDynamicFiltering,
format("SELECT n.name, nl.name FROM nation n %s %s nl ON n.name %s nl.name", joinOperator, nationLowercaseTable.getName(), operator),
expectVarcharJoinPushdown(operator) && expectJoinPushdowOnInequalityOperator(joinOperator));
}

// inequality along with an equality, which constitutes an equi-condition and allows filter to remain as part of the Join
for (String operator : nonEqualities) {
assertConditionallyPushedDown(
session,
format("SELECT n.name, c.name FROM nation n JOIN customer c ON n.nationkey = c.nationkey AND n.regionkey %s c.custkey", operator),
format("SELECT n.name, c.name FROM nation n %s customer c ON n.nationkey = c.nationkey AND n.regionkey %s c.custkey", joinOperator, operator),
expectJoinPushdown(operator),
joinOverTableScans);
}
Expand All @@ -1241,70 +1245,55 @@ public void testJoinPushdown()
for (String operator : nonEqualities) {
assertConditionallyPushedDown(
session,
format("SELECT n.name, nl.name FROM nation n JOIN %s nl ON n.regionkey = nl.regionkey AND n.name %s nl.name", nationLowercaseTable.getName(), operator),
format("SELECT n.name, nl.name FROM nation n %s %s nl ON n.regionkey = nl.regionkey AND n.name %s nl.name", joinOperator, nationLowercaseTable.getName(), operator),
expectVarcharJoinPushdown(operator),
joinOverTableScans);
}

// LEFT JOIN
assertThat(query(session, "SELECT r.name, n.name FROM nation n LEFT JOIN region r ON n.nationkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, "SELECT r.name, n.name FROM region r LEFT JOIN nation n ON n.nationkey = r.regionkey")).isFullyPushedDown();

// RIGHT JOIN
assertThat(query(session, "SELECT r.name, n.name FROM nation n RIGHT JOIN region r ON n.nationkey = r.regionkey")).isFullyPushedDown();
assertThat(query(session, "SELECT r.name, n.name FROM region r RIGHT JOIN nation n ON n.nationkey = r.regionkey")).isFullyPushedDown();

// FULL JOIN
assertConditionallyPushedDown(
session,
"SELECT r.name, n.name FROM nation n FULL JOIN region r ON n.nationkey = r.regionkey",
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN),
joinOverTableScans);

// Join over a (double) predicate
assertThat(query(session, "" +
assertThat(query(session, format("" +
"SELECT c.name, n.name " +
"FROM (SELECT * FROM customer WHERE acctbal > 8000) c " +
"JOIN nation n ON c.custkey = n.nationkey"))
"%s nation n ON c.custkey = n.nationkey", joinOperator)))
.isFullyPushedDown();

// Join over a varchar equality predicate
assertConditionallyPushedDown(
session,
"SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address = 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"JOIN nation n ON c.custkey = n.nationkey",
format("SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address = 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"%s nation n ON c.custkey = n.nationkey", joinOperator),
hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY),
joinOverTableScans);

// Join over a varchar inequality predicate
assertConditionallyPushedDown(
session,
"SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address < 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"JOIN nation n ON c.custkey = n.nationkey",
format("SELECT c.name, n.name FROM (SELECT * FROM customer WHERE address < 'TcGe5gaZNgVePxU5kRrvXBfkasDTea') c " +
"%s nation n ON c.custkey = n.nationkey", joinOperator),
hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY),
joinOverTableScans);

// join over aggregation
assertConditionallyPushedDown(
session,
"SELECT * FROM (SELECT regionkey rk, count(nationkey) c FROM nation GROUP BY regionkey) n " +
"JOIN region r ON n.rk = r.regionkey",
format("SELECT * FROM (SELECT regionkey rk, count(nationkey) c FROM nation GROUP BY regionkey) n " +
"%s region r ON n.rk = r.regionkey", joinOperator),
hasBehavior(SUPPORTS_AGGREGATION_PUSHDOWN),
joinOverTableScans);

// join over LIMIT
assertConditionallyPushedDown(
session,
"SELECT * FROM (SELECT nationkey FROM nation LIMIT 30) n " +
"JOIN region r ON n.nationkey = r.regionkey",
format("SELECT * FROM (SELECT nationkey FROM nation LIMIT 30) n " +
"%s region r ON n.nationkey = r.regionkey", joinOperator),
hasBehavior(SUPPORTS_LIMIT_PUSHDOWN),
joinOverTableScans);

// join over TopN
assertConditionallyPushedDown(
session,
"SELECT * FROM (SELECT nationkey FROM nation ORDER BY regionkey LIMIT 5) n " +
"JOIN region r ON n.nationkey = r.regionkey",
format("SELECT * FROM (SELECT nationkey FROM nation ORDER BY regionkey LIMIT 5) n " +
"%s region r ON n.nationkey = r.regionkey", joinOperator),
hasBehavior(SUPPORTS_TOPN_PUSHDOWN),
joinOverTableScans);

Expand All @@ -1314,6 +1303,12 @@ public void testJoinPushdown()
}
}

@DataProvider
public Object[][] joinOperators()
{
return Stream.of(JoinOperator.values()).collect(toDataProvider());
}

@Test
public void testExplainAnalyzePhysicalReadWallTime()
{
Expand All @@ -1335,6 +1330,18 @@ protected QueryAssert assertConditionallyPushedDown(
return queryAssert.isNotFullyPushedDown(otherwiseExpected);
}

protected QueryAssert assertJoinConditionallyPushedDown(
Session session,
@Language("SQL") String query,
boolean condition)
{
QueryAssert queryAssert = assertThat(query(session, query));
if (condition) {
return queryAssert.isFullyPushedDown();
}
return queryAssert.joinIsNotFullyPushedDown();
}

protected void assertConditionallyOrderedPushedDown(
Session session,
@Language("SQL") String query,
Expand Down Expand Up @@ -1370,6 +1377,12 @@ protected boolean expectJoinPushdown(String operator)
throw new AssertionError(); // unreachable
}

protected boolean expectJoinPushdowOnInequalityOperator(JoinOperator joinOperator)
{
// Currently no pushdown as inequality predicate is removed from Join to maintain Cross Join and Filter as separate nodes
return joinOperator != JOIN;
}

private boolean expectVarcharJoinPushdown(String operator)
{
if ("IS NOT DISTINCT FROM".equals(operator)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

public enum JoinOperator
{
JOIN("JOIN"),
LEFT_JOIN("LEFT JOIN"),
RIGHT_JOIN("RIGHT JOIN"),
FULL_JOIN("FULL JOIN"),
/**/;

private final String value;

JoinOperator(String value)
{
this.value = value;
}

public String getValue()
{
return value;
}

@Override
public String toString()
{
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,10 @@ public Optional<PreparedQuery> implementJoin(
Map<JdbcColumnHandle, String> leftAssignments,
JoinStatistics statistics)
{
if (joinType == JoinType.FULL_OUTER) {
// FULL JOIN is only supported with merge-joinable or hash-joinable join conditions
return Optional.empty();
}
return implementJoinCostAware(
session,
joinType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
import static io.trino.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_AGGREGATION_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_LIMIT_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN;
Expand Down Expand Up @@ -117,7 +118,10 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return true;

case SUPPORTS_JOIN_PUSHDOWN:
case SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY:
return true;
case SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN:
return false;

case SUPPORTS_CREATE_TABLE_WITH_TABLE_COMMENT:
case SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT:
Expand Down Expand Up @@ -593,7 +597,7 @@ public void testStringJoinPushdownWithCollate()
assertConditionallyPushedDown(
session,
"SELECT r.name, n.name FROM nation n FULL JOIN region r ON n.nationkey = r.regionkey",
true,
hasBehavior(SUPPORTS_JOIN_PUSHDOWN_WITH_FULL_JOIN),
joinOverTableScans);

// Join over a (double) predicate
Expand Down

0 comments on commit 603c83d

Please sign in to comment.