From 237137a07eb82d16539b64217ad7e0fe418cc7c8 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 16 Sep 2021 16:28:33 -0400 Subject: [PATCH 1/5] Verify query correctness in TestHiveDynamicPartitionPruning --- .../hive/TestHiveDynamicPartitionPruning.java | 95 +++++++++++++------ 1 file changed, 64 insertions(+), 31 deletions(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java index e61b6a099b96..7eb9519c08fe 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java @@ -30,7 +30,6 @@ import java.util.List; import static com.google.common.collect.Iterables.getOnlyElement; -import static io.airlift.testing.Assertions.assertGreaterThan; import static io.airlift.units.Duration.nanosSince; import static io.trino.SystemSessionProperties.ENABLE_LARGE_DYNAMIC_FILTERS; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; @@ -43,6 +42,7 @@ import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED; import static io.trino.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.NONE; +import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; import static io.trino.tpch.TpchTable.getTables; import static io.trino.util.DynamicFiltersTestUtil.getSimplifiedDomainString; import static java.lang.String.format; @@ -100,10 +100,12 @@ protected Session getSession() @Test(timeOut = 30_000) public void testJoinWithEmptyBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'"); - assertEquals(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/1feaa0f928a02f577c8ac9ef6cc0c8ec2008a46d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -122,11 +124,13 @@ public void testJoinWithEmptyBuildSide() @Test(timeOut = 30_000) public void testJoinWithSelectiveBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey " + + "AND supplier.name = 'Supplier#000000001'"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey " + - "AND supplier.name = 'Supplier#000000001'"); - assertGreaterThan(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/1feaa0f928a02f577c8ac9ef6cc0c8ec2008a46d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -144,10 +148,12 @@ public void testJoinWithSelectiveBuildSide() @Test(timeOut = 30_000) public void testJoinWithNonSelectiveBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem JOIN supplier ON partitioned_lineitem.suppkey = supplier.suppkey"); - assertEquals(result.getResult().getRowCount(), LINEITEM_COUNT); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/1feaa0f928a02f577c8ac9ef6cc0c8ec2008a46d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -166,10 +172,12 @@ public void testJoinWithNonSelectiveBuildSide() @Test(timeOut = 30_000) public void testJoinLargeBuildSideRangeDynamicFiltering() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem JOIN orders ON partitioned_lineitem.orderkey = orders.orderkey"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem JOIN orders ON partitioned_lineitem.orderkey = orders.orderkey"); - assertEquals(result.getResult().getRowCount(), LINEITEM_COUNT); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/1feaa0f928a02f577c8ac9ef6cc0c8ec2008a46d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -192,13 +200,15 @@ public void testJoinLargeBuildSideRangeDynamicFiltering() public void testJoinWithMultipleDynamicFiltersOnProbe() { // supplier names Supplier#000000001 and Supplier#000000002 match suppkey 1 and 2 + @Language("SQL") String selectQuery = "SELECT * FROM (" + + "SELECT supplier.suppkey FROM " + + "partitioned_lineitem JOIN tpch.tiny.supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name IN ('Supplier#000000001', 'Supplier#000000002')" + + ") t JOIN supplier ON t.suppkey = supplier.suppkey AND supplier.suppkey IN (2, 3)"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM (" + - "SELECT supplier.suppkey FROM " + - "partitioned_lineitem JOIN tpch.tiny.supplier ON partitioned_lineitem.suppkey = supplier.suppkey AND supplier.name IN ('Supplier#000000001', 'Supplier#000000002')" + - ") t JOIN supplier ON t.suppkey = supplier.suppkey AND supplier.suppkey IN (2, 3)"); - assertGreaterThan(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/1feaa0f928a02f577c8ac9ef6cc0c8ec2008a46d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -226,10 +236,12 @@ public void testJoinWithImplicitCoercion() "SELECT orderkey, CAST(suppkey as int) suppkey_int FROM tpch.tiny.lineitem", LINEITEM_COUNT); + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem_int l JOIN supplier s ON l.suppkey_int = s.suppkey AND s.name = 'Supplier#000000001'"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem_int l JOIN supplier s ON l.suppkey_int = s.suppkey AND s.name = 'Supplier#000000001'"); - assertGreaterThan(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); @@ -243,10 +255,12 @@ public void testJoinWithImplicitCoercion() @Test(timeOut = 30_000) public void testSemiJoinWithEmptyBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'abc')"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'abc')"); - assertEquals(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/0fb16ab9d9c990e58fad63d4dab3dbbe482a077d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -264,10 +278,12 @@ public void testSemiJoinWithEmptyBuildSide() @Test(timeOut = 30_000) public void testSemiJoinWithSelectiveBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'Supplier#000000001')"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier WHERE name = 'Supplier#000000001')"); - assertGreaterThan(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/0fb16ab9d9c990e58fad63d4dab3dbbe482a077d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -285,10 +301,12 @@ public void testSemiJoinWithSelectiveBuildSide() @Test(timeOut = 30_000) public void testSemiJoinWithNonSelectiveBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier)"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem WHERE suppkey IN (SELECT suppkey FROM supplier)"); - assertGreaterThan(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/0fb16ab9d9c990e58fad63d4dab3dbbe482a077d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -307,10 +325,12 @@ public void testSemiJoinWithNonSelectiveBuildSide() @Test(timeOut = 30_000) public void testSemiJoinLargeBuildSideRangeDynamicFiltering() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem WHERE orderkey IN (SELECT orderkey FROM orders)"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem WHERE orderkey IN (SELECT orderkey FROM orders)"); - assertEquals(result.getResult().getRowCount(), LINEITEM_COUNT); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/0fb16ab9d9c990e58fad63d4dab3dbbe482a077d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -332,10 +352,12 @@ public void testSemiJoinLargeBuildSideRangeDynamicFiltering() @Test(timeOut = 30_000) public void testRightJoinWithEmptyBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'abc'"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'abc'"); - assertEquals(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/0fb16ab9d9c990e58fad63d4dab3dbbe482a077d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -353,10 +375,12 @@ public void testRightJoinWithEmptyBuildSide() @Test(timeOut = 30_000) public void testRightJoinWithSelectiveBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'Supplier#000000001'"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey WHERE name = 'Supplier#000000001'"); - assertGreaterThan(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/0fb16ab9d9c990e58fad63d4dab3dbbe482a077d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -374,10 +398,12 @@ public void testRightJoinWithSelectiveBuildSide() @Test(timeOut = 30_000) public void testRightJoinWithNonSelectiveBuildSide() { + @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( getSession(), - "SELECT * FROM partitioned_lineitem l RIGHT JOIN supplier s ON l.suppkey = s.suppkey"); - assertGreaterThan(result.getResult().getRowCount(), 0); + selectQuery); + MaterializedResult expected = computeActual(withDynamicFilteringDisabled(), selectQuery); + assertEqualsIgnoreOrder(result.getResult(), expected); // TODO bring back OperatorStats assertions from https://github.com/trinodb/trino/commit/0fb16ab9d9c990e58fad63d4dab3dbbe482a077d // after https://github.com/trinodb/trino/issues/5120 is fixed @@ -401,4 +427,11 @@ private DynamicFiltersStats getDynamicFilteringStats(QueryId queryId) .getQueryStats() .getDynamicFiltersStats(); } + + private Session withDynamicFilteringDisabled() + { + return Session.builder(getSession()) + .setSystemProperty("enable_dynamic_filtering", "false") + .build(); + } } From 1b231c74428931f596b2dbf870b3ddba5c572491 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 21 Oct 2021 14:48:52 -0400 Subject: [PATCH 2/5] Add AbstractTestDynamicPartitionPruning test class --- .../BaseDynamicPartitionPruningTest.java | 64 ++++++++----------- 1 file changed, 26 insertions(+), 38 deletions(-) rename plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java => testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java (90%) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java similarity index 90% rename from plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java rename to testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java index 7eb9519c08fe..995f75f22687 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruning.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseDynamicPartitionPruningTest.java @@ -11,31 +11,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.hive; +package io.trino.testing; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import io.airlift.log.Logger; +import com.google.common.collect.ImmutableSet; import io.trino.Session; +import io.trino.server.DynamicFilterService.DynamicFilterDomainStats; +import io.trino.server.DynamicFilterService.DynamicFiltersStats; import io.trino.spi.QueryId; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.ValueSet; -import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.MaterializedResult; -import io.trino.testing.QueryRunner; -import io.trino.testing.ResultWithQueryId; +import io.trino.tpch.TpchTable; import org.intellij.lang.annotations.Language; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.util.List; +import java.util.Map; +import java.util.Set; import static com.google.common.collect.Iterables.getOnlyElement; -import static io.airlift.units.Duration.nanosSince; import static io.trino.SystemSessionProperties.ENABLE_LARGE_DYNAMIC_FILTERS; import static io.trino.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; -import static io.trino.server.DynamicFilterService.DynamicFilterDomainStats; -import static io.trino.server.DynamicFilterService.DynamicFiltersStats; import static io.trino.spi.predicate.Domain.none; import static io.trino.spi.predicate.Domain.singleValue; import static io.trino.spi.predicate.Range.range; @@ -43,34 +42,26 @@ import static io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED; import static io.trino.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.NONE; import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; -import static io.trino.tpch.TpchTable.getTables; +import static io.trino.tpch.TpchTable.LINE_ITEM; +import static io.trino.tpch.TpchTable.ORDERS; +import static io.trino.tpch.TpchTable.SUPPLIER; import static io.trino.util.DynamicFiltersTestUtil.getSimplifiedDomainString; -import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class TestHiveDynamicPartitionPruning +public abstract class BaseDynamicPartitionPruningTest extends AbstractTestQueryFramework { - private static final Logger log = Logger.get(TestHiveDynamicPartitionPruning.class); private static final String PARTITIONED_LINEITEM = "partitioned_lineitem"; private static final long LINEITEM_COUNT = 60175; - - @Override - protected QueryRunner createQueryRunner() - throws Exception - { - return HiveQueryRunner.builder() - // Reduced partitioned join limit for large DF to enable DF min/max collection with ENABLE_LARGE_DYNAMIC_FILTERS - .addExtraProperty("dynamic-filtering.large-partitioned.max-distinct-values-per-driver", "100") - .addExtraProperty("dynamic-filtering.large-partitioned.range-row-limit-per-driver", "100000") - // disable semi join to inner join rewrite to test semi join operators explicitly - .addExtraProperty("optimizer.rewrite-filtering-semi-join-to-inner-join", "false") - .setHiveProperties(ImmutableMap.of("hive.dynamic-filtering-probe-blocking-timeout", "1h")) - .setInitialTables(getTables()) - .build(); - } + protected static final Set> REQUIRED_TABLES = ImmutableSet.of(LINE_ITEM, ORDERS, SUPPLIER); + protected static final Map EXTRA_PROPERTIES = ImmutableMap.of( + // Reduced partitioned join limit for large DF to enable DF min/max collection with ENABLE_LARGE_DYNAMIC_FILTERS + "dynamic-filtering.large-partitioned.max-distinct-values-per-driver", "100", + "dynamic-filtering.large-partitioned.range-row-limit-per-driver", "100000", + // disable semi join to inner join rewrite to test semi join operators explicitly + "optimizer.rewrite-filtering-semi-join-to-inner-join", "false"); @BeforeClass @Override @@ -79,13 +70,11 @@ public void init() { super.init(); // setup partitioned fact table for dynamic partition pruning - @Language("SQL") String sql = format("CREATE TABLE %s WITH (format = 'TEXTFILE', partitioned_by=array['suppkey']) AS " + - "SELECT orderkey, partkey, suppkey FROM %s", PARTITIONED_LINEITEM, "tpch.tiny.lineitem"); - long start = System.nanoTime(); - long rows = (Long) getQueryRunner().execute(sql).getMaterializedRows().get(0).getField(0); - log.info("Imported %s rows for %s in %s", rows, PARTITIONED_LINEITEM, nanosSince(start).convertToMostSuccinctTimeUnit()); + createLineitemTable(PARTITIONED_LINEITEM, ImmutableList.of("orderkey", "partkey", "suppkey"), ImmutableList.of("suppkey")); } + protected abstract void createLineitemTable(String tableName, List columns, List partitionColumns); + @Override protected Session getSession() { @@ -230,11 +219,10 @@ public void testJoinWithMultipleDynamicFiltersOnProbe() public void testJoinWithImplicitCoercion() { // setup partitioned fact table with integer suppkey - assertUpdate( - "CREATE TABLE partitioned_lineitem_int " + - "WITH (format = 'TEXTFILE', partitioned_by=array['suppkey_int']) AS " + - "SELECT orderkey, CAST(suppkey as int) suppkey_int FROM tpch.tiny.lineitem", - LINEITEM_COUNT); + createLineitemTable("partitioned_lineitem_int", ImmutableList.of("orderkey", "CAST(suppkey as int) suppkey_int"), ImmutableList.of("suppkey_int")); + assertQuery( + "SELECT count(*) FROM partitioned_lineitem_int", + "VALUES " + LINEITEM_COUNT); @Language("SQL") String selectQuery = "SELECT * FROM partitioned_lineitem_int l JOIN supplier s ON l.suppkey_int = s.suppkey AND s.name = 'Supplier#000000001'"; ResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId( From 0fb2b313b2e488f1e01cb3345e6683a87700bdee Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 17 Sep 2021 11:21:19 -0400 Subject: [PATCH 3/5] Add back test for Hive dynamic filtering of partition --- .../TestHiveDynamicPartitionPruningTest.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java new file mode 100644 index 000000000000..f5a6b6c96b65 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveDynamicPartitionPruningTest.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.google.common.collect.ImmutableMap; +import io.trino.testing.BaseDynamicPartitionPruningTest; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; + +import java.util.List; + +import static java.lang.String.format; +import static java.util.stream.Collectors.joining; + +public class TestHiveDynamicPartitionPruningTest + extends BaseDynamicPartitionPruningTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return HiveQueryRunner.builder() + .setExtraProperties(EXTRA_PROPERTIES) + .setHiveProperties(ImmutableMap.of("hive.dynamic-filtering-probe-blocking-timeout", "1h")) + .setInitialTables(REQUIRED_TABLES) + .build(); + } + + @Override + protected void createLineitemTable(String tableName, List columns, List partitionColumns) + { + @Language("SQL") String sql = format( + "CREATE TABLE %s WITH (format = 'TEXTFILE', partitioned_by=array[%s]) AS SELECT %s FROM tpch.tiny.lineitem", + tableName, + partitionColumns.stream().map(column -> "'" + column + "'").collect(joining(",")), + String.join(",", columns)); + getQueryRunner().execute(sql); + } +} From 0924252b8746b8209c660f0b61f605ea4b0ae3b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 16 Sep 2021 16:22:05 -0400 Subject: [PATCH 4/5] Use dynamic filter to prune Iceberg splits --- plugin/trino-iceberg/pom.xml | 6 + .../trino/plugin/iceberg/IcebergConfig.java | 17 + .../iceberg/IcebergSessionProperties.java | 13 + .../plugin/iceberg/IcebergSplitManager.java | 37 ++- .../plugin/iceberg/IcebergSplitSource.java | 254 +++++++++++++-- .../plugin/iceberg/TestIcebergConfig.java | 9 +- ...estIcebergDynamicPartitionPruningTest.java | 48 +++ .../iceberg/TestIcebergSplitSource.java | 292 ++++++++++++++++++ 8 files changed, 634 insertions(+), 42 deletions(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDynamicPartitionPruningTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index e11f65cdc9ad..66425d887e66 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -257,6 +257,12 @@ test + + io.airlift + testing + test + + org.assertj assertj-core diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 079cb3595487..de5728841a5d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -15,6 +15,7 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionCodec; import org.apache.iceberg.FileFormat; @@ -24,6 +25,7 @@ import static io.trino.plugin.hive.HiveCompressionCodec.GZIP; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; +import static java.util.concurrent.TimeUnit.SECONDS; public class IcebergConfig { @@ -33,6 +35,7 @@ public class IcebergConfig private int maxPartitionsPerWriter = 100; private boolean uniqueTableLocation; private CatalogType catalogType = HIVE_METASTORE; + private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); public CatalogType getCatalogType() { @@ -119,4 +122,18 @@ public IcebergConfig setUniqueTableLocation(boolean uniqueTableLocation) this.uniqueTableLocation = uniqueTableLocation; return this; } + + @NotNull + public Duration getDynamicFilteringWaitTimeout() + { + return dynamicFilteringWaitTimeout; + } + + @Config("iceberg.dynamic-filtering.wait-timeout") + @ConfigDescription("Duration to wait for completion of dynamic filters during split generation") + public IcebergConfig setDynamicFilteringWaitTimeout(Duration dynamicFilteringWaitTimeout) + { + this.dynamicFilteringWaitTimeout = dynamicFilteringWaitTimeout; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index d94a68e9b024..071305d38755 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; +import io.airlift.units.Duration; import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.hive.HiveCompressionCodec; @@ -33,6 +34,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty; +import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY; import static io.trino.spi.session.PropertyMetadata.booleanProperty; import static io.trino.spi.session.PropertyMetadata.doubleProperty; @@ -64,6 +66,7 @@ public final class IcebergSessionProperties private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size"; private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size"; private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size"; + private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; private final List> sessionProperties; @Inject @@ -190,6 +193,11 @@ public IcebergSessionProperties( "Parquet: Maximum number of rows passed to the writer in each batch", parquetWriterConfig.getBatchSize(), false)) + .add(durationProperty( + DYNAMIC_FILTERING_WAIT_TIMEOUT, + "Duration to wait for completion of dynamic filters during split generation", + icebergConfig.getDynamicFilteringWaitTimeout(), + false)) .build(); } @@ -310,4 +318,9 @@ public static int getParquetWriterBatchSize(ConnectorSession session) { return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class); } + + public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session) + { + return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index 0a02873ba5dd..4168ed5df985 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import io.airlift.units.Duration; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; @@ -22,12 +23,19 @@ import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; +import io.trino.spi.type.TypeManager; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import javax.inject.Inject; -import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; +import java.util.Set; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout; +import static io.trino.plugin.iceberg.IcebergUtil.getColumns; +import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions; import static java.util.Objects.requireNonNull; public class IcebergSplitManager @@ -36,11 +44,13 @@ public class IcebergSplitManager public static final int ICEBERG_DOMAIN_COMPACTION_THRESHOLD = 1000; private final IcebergTransactionManager transactionManager; + private final TypeManager typeManager; @Inject - public IcebergSplitManager(IcebergTransactionManager transactionManager) + public IcebergSplitManager(IcebergTransactionManager transactionManager, TypeManager typeManager) { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); } @Override @@ -58,17 +68,24 @@ public ConnectorSplitSource getSplits( } Table icebergTable = transactionManager.get(transaction).getIcebergTable(session, table.getSchemaTableName()); + Duration dynamicFilteringWaitTimeout = getDynamicFilteringWaitTimeout(session); + + Set identityPartitionFieldIds = getIdentityPartitions(icebergTable.spec()).keySet().stream() + .map(PartitionField::sourceId) + .collect(toImmutableSet()); + Set identityPartitionColumns = getColumns(icebergTable.schema(), typeManager).stream() + .filter(column -> identityPartitionFieldIds.contains(column.getId())) + .collect(toImmutableSet()); TableScan tableScan = icebergTable.newScan() - .filter(toIcebergExpression( - table.getEnforcedPredicate() - // TODO: Remove TupleDomain#simplify once Iceberg supports IN expression. Currently this - // is required for IN predicates on non-partition columns with large value list. Such - // predicates on partition columns are not supported. - // (See AbstractTestIcebergSmoke#testLargeInFailureOnPartitionedColumns) - .intersect(table.getUnenforcedPredicate().simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD)))) .useSnapshot(table.getSnapshotId().get()); - IcebergSplitSource splitSource = new IcebergSplitSource(table.getSchemaTableName(), tableScan.planTasks()); + IcebergSplitSource splitSource = new IcebergSplitSource( + table, + identityPartitionColumns, + tableScan, + dynamicFilter, + session.getTimeZoneKey(), + dynamicFilteringWaitTimeout); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index d6eeb2e3b8dd..2e620723ffbb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -13,83 +13,277 @@ */ package io.trino.plugin.iceberg; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.collect.Streams; +import io.airlift.units.Duration; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorPartitionHandle; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; -import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.Range; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.predicate.ValueSet; +import io.trino.spi.type.TimeZoneKey; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Type; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; +import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; -import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; -import static com.google.common.collect.Iterators.limit; +import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; +import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; +import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; +import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes; +import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.iceberg.types.Conversions.fromByteBuffer; public class IcebergSplitSource implements ConnectorSplitSource { - private final SchemaTableName schemaTableName; - private final CloseableIterable combinedScanIterable; - private final Iterator fileScanIterator; + private static final ConnectorSplitBatch EMPTY_BATCH = new ConnectorSplitBatch(ImmutableList.of(), false); + private static final ConnectorSplitBatch NO_MORE_SPLITS_BATCH = new ConnectorSplitBatch(ImmutableList.of(), true); - public IcebergSplitSource(SchemaTableName schemaTableName, CloseableIterable combinedScanIterable) - { - this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null"); - this.combinedScanIterable = requireNonNull(combinedScanIterable, "combinedScanIterable is null"); + private final IcebergTableHandle tableHandle; + private final Set identityPartitionColumns; + private final TableScan tableScan; + private final Map fieldIdToType; + private final DynamicFilter dynamicFilter; + private final TimeZoneKey sessionZone; + private final long dynamicFilteringWaitTimeoutMillis; + private final Stopwatch dynamicFilterWaitStopwatch; + + private CloseableIterable combinedScanIterable; + private Iterator fileScanIterator; + private TupleDomain pushedDownDynamicFilterPredicate; - this.fileScanIterator = Streams.stream(combinedScanIterable) - .map(CombinedScanTask::files) - .flatMap(Collection::stream) - .iterator(); + public IcebergSplitSource( + IcebergTableHandle tableHandle, + Set identityPartitionColumns, + TableScan tableScan, + DynamicFilter dynamicFilter, + TimeZoneKey sessionZone, + Duration dynamicFilteringWaitTimeout) + { + this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); + this.identityPartitionColumns = requireNonNull(identityPartitionColumns, "identityPartitionColumns is null"); + this.tableScan = requireNonNull(tableScan, "tableScan is null"); + this.fieldIdToType = primitiveFieldTypes(tableScan.schema()); + this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); + this.sessionZone = requireNonNull(sessionZone, "sessionZone is null"); + this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis(); + this.dynamicFilterWaitStopwatch = Stopwatch.createStarted(); } @Override public CompletableFuture getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) { - // TODO: move this to a background thread - List splits = new ArrayList<>(); - Iterator iterator = limit(fileScanIterator, maxSize); - while (iterator.hasNext()) { - FileScanTask task = iterator.next(); - if (!task.deletes().isEmpty()) { - throw new TrinoException(NOT_SUPPORTED, "Iceberg tables with delete files are not supported: " + schemaTableName); + long timeLeft = dynamicFilteringWaitTimeoutMillis - dynamicFilterWaitStopwatch.elapsed(MILLISECONDS); + if (dynamicFilter.isAwaitable() && timeLeft > 0) { + return dynamicFilter.isBlocked() + .thenApply(ignored -> EMPTY_BATCH) + .completeOnTimeout(EMPTY_BATCH, timeLeft, MILLISECONDS); + } + + if (combinedScanIterable == null) { + // Used to avoid duplicating work if the Dynamic Filter was already pushed down to the Iceberg API + this.pushedDownDynamicFilterPredicate = dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast); + TupleDomain fullPredicate = tableHandle.getUnenforcedPredicate() + .intersect(pushedDownDynamicFilterPredicate); + // TODO: (https://github.com/trinodb/trino/issues/9743): Consider removing TupleDomain#simplify + TupleDomain simplifiedPredicate = fullPredicate.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD); + if (!simplifiedPredicate.equals(fullPredicate)) { + // Pushed down predicate was simplified, always evaluate it against individual splits + this.pushedDownDynamicFilterPredicate = TupleDomain.all(); + } + + TupleDomain effectivePredicate = tableHandle.getEnforcedPredicate() + .intersect(simplifiedPredicate); + + if (effectivePredicate.isNone()) { + finish(); + return completedFuture(NO_MORE_SPLITS_BATCH); + } + + Expression filterExpression = toIcebergExpression(effectivePredicate); + this.combinedScanIterable = tableScan + .filter(filterExpression) + .includeColumnStats() + .planTasks(); + this.fileScanIterator = Streams.stream(combinedScanIterable) + .map(CombinedScanTask::files) + .flatMap(Collection::stream) + .iterator(); + } + + TupleDomain dynamicFilterPredicate = dynamicFilter.getCurrentPredicate() + .transformKeys(IcebergColumnHandle.class::cast); + if (dynamicFilterPredicate.isNone()) { + finish(); + return completedFuture(NO_MORE_SPLITS_BATCH); + } + + Iterator fileScanTasks = Iterators.limit(fileScanIterator, maxSize); + ImmutableList.Builder splits = ImmutableList.builder(); + while (fileScanTasks.hasNext()) { + FileScanTask scanTask = fileScanTasks.next(); + if (!scanTask.deletes().isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Iceberg tables with delete files are not supported: " + tableHandle.getSchemaTableName()); + } + + IcebergSplit icebergSplit = toIcebergSplit(scanTask); + + if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) { + if (!partitionMatchesPredicate( + identityPartitionColumns, + icebergSplit.getPartitionKeys(), + dynamicFilterPredicate, + sessionZone)) { + continue; + } + if (!fileMatchesPredicate( + fieldIdToType, + dynamicFilterPredicate, + scanTask.file().lowerBounds(), + scanTask.file().upperBounds(), + scanTask.file().nullValueCounts())) { + continue; + } } - splits.add(toIcebergSplit(task)); + splits.add(icebergSplit); } - return completedFuture(new ConnectorSplitBatch(splits, isFinished())); + return completedFuture(new ConnectorSplitBatch(splits.build(), isFinished())); + } + + private void finish() + { + close(); + this.combinedScanIterable = CloseableIterable.empty(); + this.fileScanIterator = Collections.emptyIterator(); } @Override public boolean isFinished() { - return !fileScanIterator.hasNext(); + return fileScanIterator != null && !fileScanIterator.hasNext(); } @Override public void close() { - try { - combinedScanIterable.close(); + if (combinedScanIterable != null) { + try { + combinedScanIterable.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + @VisibleForTesting + static boolean fileMatchesPredicate( + Map primitiveTypeForFieldId, + TupleDomain dynamicFilterPredicate, + Map lowerBounds, + Map upperBounds, + Map nullValueCounts) + { + if (dynamicFilterPredicate.isNone()) { + return false; + } + Map domains = dynamicFilterPredicate.getDomains().orElseThrow(); + + for (Map.Entry domainEntry : domains.entrySet()) { + IcebergColumnHandle column = domainEntry.getKey(); + Domain domain = domainEntry.getValue(); + + int fieldId = column.getId(); + Long nullValueCount = nullValueCounts.get(fieldId); + boolean mayContainNulls = nullValueCount == null || nullValueCount > 0; + Type type = primitiveTypeForFieldId.get(fieldId); + Domain statisticsDomain = domainForStatistics( + column.getType(), + fromByteBuffer(type, lowerBounds.get(fieldId)), + fromByteBuffer(type, upperBounds.get(fieldId)), + mayContainNulls); + if (!domain.overlaps(statisticsDomain)) { + return false; + } + } + return true; + } + + private static Domain domainForStatistics(io.trino.spi.type.Type type, Object lowerBound, Object upperBound, boolean containsNulls) + { + Type icebergType = toIcebergType(type); + if (lowerBound == null && upperBound == null) { + return Domain.create(ValueSet.all(type), containsNulls); + } + + Range statisticsRange; + if (lowerBound != null && upperBound != null) { + statisticsRange = Range.range( + type, + PartitionTable.convert(lowerBound, icebergType), + true, + PartitionTable.convert(upperBound, icebergType), + true); } - catch (IOException e) { - throw new UncheckedIOException(e); + else if (upperBound != null) { + statisticsRange = Range.lessThanOrEqual(type, PartitionTable.convert(upperBound, icebergType)); + } + else { + statisticsRange = Range.greaterThanOrEqual(type, PartitionTable.convert(lowerBound, icebergType)); + } + return Domain.create(ValueSet.ofRanges(statisticsRange), containsNulls); + } + + @VisibleForTesting + static boolean partitionMatchesPredicate( + Set identityPartitionColumns, + Map partitionKeys, + TupleDomain dynamicFilterPredicate, + TimeZoneKey timeZoneKey) + { + if (dynamicFilterPredicate.isNone()) { + return false; + } + Map domains = dynamicFilterPredicate.getDomains().orElseThrow(); + + for (IcebergColumnHandle partitionColumn : identityPartitionColumns) { + Domain allowedDomain = domains.get(partitionColumn); + if (allowedDomain != null) { + Object partitionValue = deserializePartitionValue(partitionColumn.getType(), partitionKeys.get(partitionColumn.getId()), partitionColumn.getName(), timeZoneKey); + if (!allowedDomain.includesNullableValue(partitionValue)) { + return false; + } + } } + return true; } - private ConnectorSplit toIcebergSplit(FileScanTask task) + private static IcebergSplit toIcebergSplit(FileScanTask task) { return new IcebergSplit( task.file().path().toString(), diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 292347ba5bc6..9636ae4561d2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionCodec; import org.testng.annotations.Test; @@ -27,6 +28,7 @@ import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; +import static java.util.concurrent.TimeUnit.MINUTES; public class TestIcebergConfig { @@ -39,7 +41,8 @@ public void testDefaults() .setUseFileSizeFromMetadata(true) .setMaxPartitionsPerWriter(100) .setUniqueTableLocation(false) - .setCatalogType(HIVE_METASTORE)); + .setCatalogType(HIVE_METASTORE) + .setDynamicFilteringWaitTimeout(new Duration(0, MINUTES))); } @Test @@ -52,6 +55,7 @@ public void testExplicitPropertyMappings() .put("iceberg.max-partitions-per-writer", "222") .put("iceberg.unique-table-location", "true") .put("iceberg.catalog.type", "GLUE") + .put("iceberg.dynamic-filtering.wait-timeout", "1h") .build(); IcebergConfig expected = new IcebergConfig() @@ -60,7 +64,8 @@ public void testExplicitPropertyMappings() .setUseFileSizeFromMetadata(false) .setMaxPartitionsPerWriter(222) .setUniqueTableLocation(true) - .setCatalogType(GLUE); + .setCatalogType(GLUE) + .setDynamicFilteringWaitTimeout(Duration.valueOf("1h")); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDynamicPartitionPruningTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDynamicPartitionPruningTest.java new file mode 100644 index 000000000000..b02965a40ef9 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDynamicPartitionPruningTest.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.testing.BaseDynamicPartitionPruningTest; +import io.trino.testing.QueryRunner; + +import java.util.List; + +import static java.lang.String.format; +import static java.util.stream.Collectors.joining; + +public class TestIcebergDynamicPartitionPruningTest + extends BaseDynamicPartitionPruningTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner( + EXTRA_PROPERTIES, + ImmutableMap.of("iceberg.dynamic-filtering.wait-timeout", "1h"), + REQUIRED_TABLES); + } + + @Override + protected void createLineitemTable(String tableName, List columns, List partitionColumns) + { + String sql = format( + "CREATE TABLE %s WITH (partitioning=array[%s]) AS SELECT %s FROM tpch.tiny.lineitem", + tableName, + partitionColumns.stream().map(column -> "'" + column + "'").collect(joining(",")), + String.join(",", columns)); + getQueryRunner().execute(sql); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java new file mode 100644 index 000000000000..a559f7ffabe3 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -0,0 +1,292 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.units.Duration; +import io.trino.plugin.hive.HdfsConfig; +import io.trino.plugin.hive.HdfsConfiguration; +import io.trino.plugin.hive.HdfsConfigurationInitializer; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveHdfsConfiguration; +import io.trino.plugin.hive.authentication.NoHdfsAuthentication; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.NullableValue; +import io.trino.spi.predicate.Range; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.predicate.ValueSet; +import io.trino.spi.type.TimeZoneKey; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.tpch.TpchTable.NATION; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class TestIcebergSplitSource + extends AbstractTestQueryFramework +{ + private File metastoreDir; + private HiveMetastore metastore; + private IcebergTableOperationsProvider operationsProvider; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + HdfsConfig config = new HdfsConfig(); + HdfsConfiguration configuration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(config), ImmutableSet.of()); + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(configuration, config, new NoHdfsAuthentication()); + + File tempDir = Files.createTempDirectory("test_iceberg_split_source").toFile(); + this.metastoreDir = new File(tempDir, "iceberg_data"); + this.metastore = createTestingFileHiveMetastore(metastoreDir); + this.operationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); + + return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of(), ImmutableList.of(NATION), Optional.of(metastoreDir)); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + throws IOException + { + deleteRecursively(metastoreDir.getParentFile().toPath(), ALLOW_INSECURE); + } + + @Test(timeOut = 30_000) + public void testIncompleteDynamicFilterTimeout() + throws Exception + { + long startMillis = System.currentTimeMillis(); + SchemaTableName schemaTableName = new SchemaTableName("tpch", "nation"); + IcebergTableHandle tableHandle = new IcebergTableHandle( + schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + TableType.DATA, + Optional.empty(), + TupleDomain.all(), + TupleDomain.all()); + Table nationTable = loadIcebergTable(metastore, operationsProvider, SESSION, schemaTableName); + + IcebergSplitSource splitSource = new IcebergSplitSource( + tableHandle, + ImmutableSet.of(), + nationTable.newScan(), + new DynamicFilter() + { + @Override + public Set getColumnsCovered() + { + return ImmutableSet.of(); + } + + @Override + public CompletableFuture isBlocked() + { + return CompletableFuture.runAsync(() -> { + try { + TimeUnit.HOURS.sleep(1); + } + catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + } + + @Override + public boolean isComplete() + { + return false; + } + + @Override + public boolean isAwaitable() + { + return true; + } + + @Override + public TupleDomain getCurrentPredicate() + { + return TupleDomain.all(); + } + }, + TimeZoneKey.UTC_KEY, + new Duration(2, SECONDS)); + + ImmutableList.Builder splits = ImmutableList.builder(); + while (!splitSource.isFinished()) { + splitSource.getNextBatch(null, 100).get() + .getSplits() + .stream() + .map(IcebergSplit.class::cast) + .forEach(splits::add); + } + assertThat(splits.build().size()).isGreaterThan(0); + assertTrue(splitSource.isFinished()); + assertThat(System.currentTimeMillis() - startMillis) + .as("IcebergSplitSource failed to wait for dynamicFilteringWaitTimeout") + .isGreaterThanOrEqualTo(2000); + } + + @Test + public void testBigintPartitionPruning() + { + IcebergColumnHandle bigintColumn = IcebergColumnHandle.primitiveIcebergColumnHandle(1, "name", BIGINT, Optional.empty()); + assertFalse(IcebergSplitSource.partitionMatchesPredicate( + ImmutableSet.of(bigintColumn), + ImmutableMap.of(1, "1000"), + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 100L))), + TimeZoneKey.UTC_KEY)); + assertTrue(IcebergSplitSource.partitionMatchesPredicate( + ImmutableSet.of(bigintColumn), + ImmutableMap.of(1, "1000"), + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 1000L))), + TimeZoneKey.UTC_KEY)); + assertFalse(IcebergSplitSource.partitionMatchesPredicate( + ImmutableSet.of(bigintColumn), + ImmutableMap.of(1, "1000"), + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.asNull(BIGINT))), + TimeZoneKey.UTC_KEY)); + } + + @Test + public void testBigintStatisticsPruning() + { + IcebergColumnHandle bigintColumn = IcebergColumnHandle.primitiveIcebergColumnHandle(1, "name", BIGINT, Optional.empty()); + Map primitiveTypes = ImmutableMap.of(1, Types.LongType.get()); + Map lowerBound = ImmutableMap.of(1, Conversions.toByteBuffer(Types.LongType.get(), 1000L)); + Map upperBound = ImmutableMap.of(1, Conversions.toByteBuffer(Types.LongType.get(), 2000L)); + + assertFalse(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 0L))), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 1000L))), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 1500L))), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 2000L))), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertFalse(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 3000L))), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + + Domain outsideStatisticsRangeAllowNulls = Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 100L, true)), true); + assertFalse(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, outsideStatisticsRangeAllowNulls)), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, outsideStatisticsRangeAllowNulls)), + lowerBound, + upperBound, + ImmutableMap.of(1, 1L))); + + Domain outsideStatisticsRangeNoNulls = Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 0L, true, 100L, true)), false); + assertFalse(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, outsideStatisticsRangeNoNulls)), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertFalse(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, outsideStatisticsRangeNoNulls)), + lowerBound, + upperBound, + ImmutableMap.of(1, 1L))); + + Domain insideStatisticsRange = Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 1001L, true, 1002L, true)), false); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, insideStatisticsRange)), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, insideStatisticsRange)), + lowerBound, + upperBound, + ImmutableMap.of(1, 1L))); + + Domain overlappingStatisticsRange = Domain.create(ValueSet.ofRanges(Range.range(BIGINT, 990L, true, 1010L, true)), false); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, overlappingStatisticsRange)), + lowerBound, + upperBound, + ImmutableMap.of(1, 0L))); + assertTrue(IcebergSplitSource.fileMatchesPredicate( + primitiveTypes, + TupleDomain.withColumnDomains(ImmutableMap.of(bigintColumn, overlappingStatisticsRange)), + lowerBound, + upperBound, + ImmutableMap.of(1, 1L))); + } +} From 2bb6095e81ec24bff504a14b7e0f86eb72af41d8 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 25 Oct 2021 08:49:44 +0200 Subject: [PATCH 5/5] empty