From fb6f893121fefedcdb608c8dcc3da75275ea0605 Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Wed, 28 Oct 2020 01:39:22 -0700 Subject: [PATCH] Add local dynamic filter support in Iceberg --- .../hive/TestHiveDynamicPartitionPruning.java | 10 --- presto-iceberg/pom.xml | 6 ++ .../iceberg/IcebergPageSourceProvider.java | 33 +++++--- .../iceberg/AbstractTestIcebergSmoke.java | 81 +++++++++++++++++++ .../testing/AbstractTestQueryFramework.java | 11 +++ 5 files changed, 119 insertions(+), 22 deletions(-) diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDynamicPartitionPruning.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDynamicPartitionPruning.java index f6d6a1b798ca..7c78738d9ab3 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDynamicPartitionPruning.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDynamicPartitionPruning.java @@ -432,14 +432,4 @@ public void testRightJoinWithNonSelectiveBuildSide() assertEquals(domainStats.getDiscreteValuesCount(), 0); assertEquals(domainStats.getRangeCount(), 100); } - - private DynamicFiltersStats getDynamicFilteringStats(QueryId queryId) - { - DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); - return runner.getCoordinator() - .getQueryManager() - .getFullQueryInfo(queryId) - .getQueryStats() - .getDynamicFiltersStats(); - } } diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index eae2f8db37d5..e822232869be 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -230,6 +230,12 @@ test + + io.airlift + testing + test + + org.assertj assertj-core diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java index 9bb71f08a33a..36604d00287a 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java @@ -168,7 +168,8 @@ public ConnectorPageSource createPageSource( split.getLength(), split.getFileFormat(), regularColumns, - table.getPredicate()); + table.getPredicate(), + dynamicFilter.transform(IcebergColumnHandle.class::cast)); return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey()); } @@ -181,7 +182,8 @@ private ConnectorPageSource createDataPageSource( long length, FileFormat fileFormat, List dataColumns, - TupleDomain predicate) + TupleDomain predicate, + TupleDomain dynamicFilter) { switch (fileFormat) { case ORC: @@ -212,7 +214,8 @@ private ConnectorPageSource createDataPageSource( .withLazyReadSmallRanges(getOrcLazyReadSmallRanges(session)) .withNestedLazy(isOrcNestedLazy(session)) .withBloomFiltersEnabled(isOrcBloomFiltersEnabled(session)), - fileFormatDataSourceStats); + fileFormatDataSourceStats, + dynamicFilter); case PARQUET: return createParquetPageSource( hdfsEnvironment, @@ -226,7 +229,8 @@ private ConnectorPageSource createDataPageSource( .withFailOnCorruptedStatistics(isFailOnCorruptedParquetStatistics(session)) .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)), predicate, - fileFormatDataSourceStats); + fileFormatDataSourceStats, + dynamicFilter); } throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat); } @@ -240,9 +244,10 @@ private static ConnectorPageSource createOrcPageSource( long length, long fileSize, List columns, - TupleDomain effectivePredicate, + TupleDomain predicate, OrcReaderOptions options, - FileFormatDataSourceStats stats) + FileFormatDataSourceStats stats, + TupleDomain dynamicFilter) { OrcDataSource orcDataSource = null; try { @@ -270,7 +275,8 @@ private static ConnectorPageSource createOrcPageSource( TupleDomainOrcPredicateBuilder predicateBuilder = TupleDomainOrcPredicate.builder() .setBloomFiltersEnabled(options.isBloomFiltersEnabled()); - Map effectivePredicateDomains = effectivePredicate.getDomains() + Map effectivePredicateDomains = predicate.intersect(dynamicFilter) + .getDomains() .orElseThrow(() -> new IllegalArgumentException("Effective predicate is none")); List fileReadColumns = new ArrayList<>(columns.size()); List fileReadTypes = new ArrayList<>(columns.size()); @@ -350,8 +356,9 @@ private static ConnectorPageSource createParquetPageSource( long length, List regularColumns, ParquetReaderOptions options, - TupleDomain effectivePredicate, - FileFormatDataSourceStats fileFormatDataSourceStats) + TupleDomain predicate, + FileFormatDataSourceStats fileFormatDataSourceStats, + TupleDomain dynamicFilter) { AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext(); @@ -384,7 +391,7 @@ private static ConnectorPageSource createParquetPageSource( MessageType requestedSchema = new MessageType(fileSchema.getName(), parquetFields.stream().filter(Objects::nonNull).collect(toImmutableList())); Map, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema); - TupleDomain parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate); + TupleDomain parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, predicate, dynamicFilter); Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); List blocks = new ArrayList<>(); @@ -450,14 +457,16 @@ private static ConnectorPageSource createParquetPageSource( } } - private static TupleDomain getParquetTupleDomain(Map, RichColumnDescriptor> descriptorsByPath, TupleDomain effectivePredicate) + private static TupleDomain getParquetTupleDomain(Map, RichColumnDescriptor> descriptorsByPath, + TupleDomain effectivePredicate, + TupleDomain dynamicFilter) { if (effectivePredicate.isNone()) { return TupleDomain.none(); } ImmutableMap.Builder predicate = ImmutableMap.builder(); - effectivePredicate.getDomains().get().forEach((columnHandle, domain) -> { + dynamicFilter.intersect(effectivePredicate).getDomains().get().forEach((columnHandle, domain) -> { String baseType = columnHandle.getType().getTypeSignature().getBase(); // skip looking up predicates for complex types as Parquet only stores stats for primitives if (!baseType.equals(StandardTypes.MAP) && !baseType.equals(StandardTypes.ARRAY) && !baseType.equals(StandardTypes.ROW)) { diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/AbstractTestIcebergSmoke.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/AbstractTestIcebergSmoke.java index 5d6ffd24645b..dd9a431f2ce4 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/AbstractTestIcebergSmoke.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/AbstractTestIcebergSmoke.java @@ -19,6 +19,8 @@ import io.prestosql.metadata.Metadata; import io.prestosql.metadata.QualifiedObjectName; import io.prestosql.metadata.TableHandle; +import io.prestosql.operator.OperatorStats; +import io.prestosql.server.DynamicFilterService; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.Constraint; import io.prestosql.spi.predicate.NullableValue; @@ -26,11 +28,13 @@ import io.prestosql.spi.statistics.ColumnStatistics; import io.prestosql.spi.statistics.DoubleRange; import io.prestosql.spi.statistics.TableStatistics; +import io.prestosql.sql.analyzer.FeaturesConfig; import io.prestosql.testing.AbstractTestIntegrationSmokeTest; import io.prestosql.testing.DistributedQueryRunner; import io.prestosql.testing.MaterializedResult; import io.prestosql.testing.MaterializedRow; import io.prestosql.testing.QueryRunner; +import io.prestosql.testing.ResultWithQueryId; import org.apache.iceberg.FileFormat; import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; @@ -45,6 +49,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertLessThan; +import static io.prestosql.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static io.prestosql.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static io.prestosql.spi.type.DoubleType.DOUBLE; import static io.prestosql.spi.type.VarcharType.VARCHAR; @@ -1389,4 +1396,78 @@ private void dropTable(String table) assertUpdate(session, "DROP TABLE " + table); assertFalse(getQueryRunner().tableExists(session, table)); } + + @Test + public void testLocalDynamicFilterWithEmptyBuildSide() + { + Session session = Session.builder(super.getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name()) + .build(); + + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + session, + "SELECT * FROM lineitem JOIN supplier ON lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'"); + assertEquals(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem"); + assertEquals(probeStats.getInputPositions(), 0); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers()); + + DynamicFilterService.DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L); + } + + @Test + public void testDynamicFilterWithSelectiveBuildSide() + { + Session session = Session.builder(super.getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name()) + .build(); + + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + session, + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 172799.49"); + assertGreaterThan(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem"); + // Probe-side is partially scanned + assertLessThan(probeStats.getInputPositions(), 60175L); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers()); + + DynamicFilterService.DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L); + } + + @Test + public void testDynamicFilterWithNonSelectiveBuildSide() + { + Session session = Session.builder(super.getSession()) + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name()) + .build(); + + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + ResultWithQueryId result = runner.executeWithQueryId( + session, + "SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey"); + assertGreaterThan(result.getResult().getRowCount(), 0); + + OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem"); + // Probe-side is fully scanned + assertEquals(probeStats.getInputPositions(), 60175L); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0); + + DynamicFilterService.DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId()); + assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 1L); + assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L); + } } diff --git a/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java b/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java index b8be67f03fb0..b45a6f63081e 100644 --- a/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java +++ b/presto-testing/src/main/java/io/prestosql/testing/AbstractTestQueryFramework.java @@ -29,6 +29,7 @@ import io.prestosql.execution.warnings.WarningCollector; import io.prestosql.metadata.Metadata; import io.prestosql.operator.OperatorStats; +import io.prestosql.server.DynamicFilterService; import io.prestosql.spi.QueryId; import io.prestosql.spi.type.Type; import io.prestosql.spi.type.TypeOperators; @@ -462,4 +463,14 @@ protected final T closeAfterClass(T resource) { return afterClassCloser.register(resource); } + + protected DynamicFilterService.DynamicFiltersStats getDynamicFilteringStats(QueryId queryId) + { + DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); + return runner.getCoordinator() + .getQueryManager() + .getFullQueryInfo(queryId) + .getQueryStats() + .getDynamicFiltersStats(); + } }