Skip to content

Commit

Permalink
Add local dynamic filter support in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
knwg145 committed Oct 28, 2020
1 parent 368c56f commit fb6f893
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,4 @@ public void testRightJoinWithNonSelectiveBuildSide()
assertEquals(domainStats.getDiscreteValuesCount(), 0);
assertEquals(domainStats.getRangeCount(), 100);
}

private DynamicFiltersStats getDynamicFilteringStats(QueryId queryId)
{
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
return runner.getCoordinator()
.getQueryManager()
.getFullQueryInfo(queryId)
.getQueryStats()
.getDynamicFiltersStats();
}
}
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ public ConnectorPageSource createPageSource(
split.getLength(),
split.getFileFormat(),
regularColumns,
table.getPredicate());
table.getPredicate(),
dynamicFilter.transform(IcebergColumnHandle.class::cast));

return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
}
Expand All @@ -181,7 +182,8 @@ private ConnectorPageSource createDataPageSource(
long length,
FileFormat fileFormat,
List<IcebergColumnHandle> dataColumns,
TupleDomain<IcebergColumnHandle> predicate)
TupleDomain<IcebergColumnHandle> predicate,
TupleDomain<IcebergColumnHandle> dynamicFilter)
{
switch (fileFormat) {
case ORC:
Expand Down Expand Up @@ -212,7 +214,8 @@ private ConnectorPageSource createDataPageSource(
.withLazyReadSmallRanges(getOrcLazyReadSmallRanges(session))
.withNestedLazy(isOrcNestedLazy(session))
.withBloomFiltersEnabled(isOrcBloomFiltersEnabled(session)),
fileFormatDataSourceStats);
fileFormatDataSourceStats,
dynamicFilter);
case PARQUET:
return createParquetPageSource(
hdfsEnvironment,
Expand All @@ -226,7 +229,8 @@ private ConnectorPageSource createDataPageSource(
.withFailOnCorruptedStatistics(isFailOnCorruptedParquetStatistics(session))
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
predicate,
fileFormatDataSourceStats);
fileFormatDataSourceStats,
dynamicFilter);
}
throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
Expand All @@ -240,9 +244,10 @@ private static ConnectorPageSource createOrcPageSource(
long length,
long fileSize,
List<IcebergColumnHandle> columns,
TupleDomain<IcebergColumnHandle> effectivePredicate,
TupleDomain<IcebergColumnHandle> predicate,
OrcReaderOptions options,
FileFormatDataSourceStats stats)
FileFormatDataSourceStats stats,
TupleDomain<IcebergColumnHandle> dynamicFilter)
{
OrcDataSource orcDataSource = null;
try {
Expand Down Expand Up @@ -270,7 +275,8 @@ private static ConnectorPageSource createOrcPageSource(

TupleDomainOrcPredicateBuilder predicateBuilder = TupleDomainOrcPredicate.builder()
.setBloomFiltersEnabled(options.isBloomFiltersEnabled());
Map<IcebergColumnHandle, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
Map<IcebergColumnHandle, Domain> effectivePredicateDomains = predicate.intersect(dynamicFilter)
.getDomains()
.orElseThrow(() -> new IllegalArgumentException("Effective predicate is none"));
List<OrcColumn> fileReadColumns = new ArrayList<>(columns.size());
List<Type> fileReadTypes = new ArrayList<>(columns.size());
Expand Down Expand Up @@ -350,8 +356,9 @@ private static ConnectorPageSource createParquetPageSource(
long length,
List<IcebergColumnHandle> regularColumns,
ParquetReaderOptions options,
TupleDomain<IcebergColumnHandle> effectivePredicate,
FileFormatDataSourceStats fileFormatDataSourceStats)
TupleDomain<IcebergColumnHandle> predicate,
FileFormatDataSourceStats fileFormatDataSourceStats,
TupleDomain<IcebergColumnHandle> dynamicFilter)
{
AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext();

Expand Down Expand Up @@ -384,7 +391,7 @@ private static ConnectorPageSource createParquetPageSource(

MessageType requestedSchema = new MessageType(fileSchema.getName(), parquetFields.stream().filter(Objects::nonNull).collect(toImmutableList()));
Map<List<String>, RichColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, effectivePredicate);
TupleDomain<ColumnDescriptor> parquetTupleDomain = getParquetTupleDomain(descriptorsByPath, predicate, dynamicFilter);
Predicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC);

List<BlockMetaData> blocks = new ArrayList<>();
Expand Down Expand Up @@ -450,14 +457,16 @@ private static ConnectorPageSource createParquetPageSource(
}
}

private static TupleDomain<ColumnDescriptor> getParquetTupleDomain(Map<List<String>, RichColumnDescriptor> descriptorsByPath, TupleDomain<IcebergColumnHandle> effectivePredicate)
private static TupleDomain<ColumnDescriptor> getParquetTupleDomain(Map<List<String>, RichColumnDescriptor> descriptorsByPath,
TupleDomain<IcebergColumnHandle> effectivePredicate,
TupleDomain<IcebergColumnHandle> dynamicFilter)
{
if (effectivePredicate.isNone()) {
return TupleDomain.none();
}

ImmutableMap.Builder<ColumnDescriptor, Domain> predicate = ImmutableMap.builder();
effectivePredicate.getDomains().get().forEach((columnHandle, domain) -> {
dynamicFilter.intersect(effectivePredicate).getDomains().get().forEach((columnHandle, domain) -> {
String baseType = columnHandle.getType().getTypeSignature().getBase();
// skip looking up predicates for complex types as Parquet only stores stats for primitives
if (!baseType.equals(StandardTypes.MAP) && !baseType.equals(StandardTypes.ARRAY) && !baseType.equals(StandardTypes.ROW)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.TableHandle;
import io.prestosql.operator.OperatorStats;
import io.prestosql.server.DynamicFilterService;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.predicate.NullableValue;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.statistics.ColumnStatistics;
import io.prestosql.spi.statistics.DoubleRange;
import io.prestosql.spi.statistics.TableStatistics;
import io.prestosql.sql.analyzer.FeaturesConfig;
import io.prestosql.testing.AbstractTestIntegrationSmokeTest;
import io.prestosql.testing.DistributedQueryRunner;
import io.prestosql.testing.MaterializedResult;
import io.prestosql.testing.MaterializedRow;
import io.prestosql.testing.QueryRunner;
import io.prestosql.testing.ResultWithQueryId;
import org.apache.iceberg.FileFormat;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;
Expand All @@ -45,6 +49,9 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.testing.Assertions.assertGreaterThan;
import static io.airlift.testing.Assertions.assertLessThan;
import static io.prestosql.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.prestosql.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
Expand Down Expand Up @@ -1389,4 +1396,78 @@ private void dropTable(String table)
assertUpdate(session, "DROP TABLE " + table);
assertFalse(getQueryRunner().tableExists(session, table));
}

@Test
public void testLocalDynamicFilterWithEmptyBuildSide()
{
Session session = Session.builder(super.getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.build();

DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
session,
"SELECT * FROM lineitem JOIN supplier ON lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'");
assertEquals(result.getResult().getRowCount(), 0);

OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem");
assertEquals(probeStats.getInputPositions(), 0);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers());

DynamicFilterService.DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId());
assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L);
}

@Test
public void testDynamicFilterWithSelectiveBuildSide()
{
Session session = Session.builder(super.getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.build();

DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
session,
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 172799.49");
assertGreaterThan(result.getResult().getRowCount(), 0);

OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem");
// Probe-side is partially scanned
assertLessThan(probeStats.getInputPositions(), 60175L);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers());

DynamicFilterService.DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId());
assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L);
}

@Test
public void testDynamicFilterWithNonSelectiveBuildSide()
{
Session session = Session.builder(super.getSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, FeaturesConfig.JoinDistributionType.BROADCAST.name())
.build();

DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
session,
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey");
assertGreaterThan(result.getResult().getRowCount(), 0);

OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem");
// Probe-side is fully scanned
assertEquals(probeStats.getInputPositions(), 60175L);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0);

DynamicFilterService.DynamicFiltersStats dynamicFiltersStats = getDynamicFilteringStats(result.getQueryId());
assertEquals(dynamicFiltersStats.getTotalDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getLazyDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getReplicatedDynamicFilters(), 1L);
assertEquals(dynamicFiltersStats.getDynamicFiltersCompleted(), 1L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.prestosql.execution.warnings.WarningCollector;
import io.prestosql.metadata.Metadata;
import io.prestosql.operator.OperatorStats;
import io.prestosql.server.DynamicFilterService;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeOperators;
Expand Down Expand Up @@ -462,4 +463,14 @@ protected final <T extends AutoCloseable> T closeAfterClass(T resource)
{
return afterClassCloser.register(resource);
}

protected DynamicFilterService.DynamicFiltersStats getDynamicFilteringStats(QueryId queryId)
{
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
return runner.getCoordinator()
.getQueryManager()
.getFullQueryInfo(queryId)
.getQueryStats()
.getDynamicFiltersStats();
}
}

0 comments on commit fb6f893

Please sign in to comment.