Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local dynamic filter support in IcebergPageSourceProvder #5719

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.StandardTypes;
Expand Down Expand Up @@ -142,7 +143,7 @@ public ConnectorPageSource createPageSource(
ConnectorSplit connectorSplit,
ConnectorTableHandle connectorTable,
List<ColumnHandle> columns,
TupleDomain<ColumnHandle> dynamicFilter)
DynamicFilter dynamicFilter)
{
IcebergSplit split = (IcebergSplit) connectorSplit;
IcebergTableHandle table = (IcebergTableHandle) connectorTable;
Expand All @@ -167,7 +168,7 @@ public ConnectorPageSource createPageSource(
split.getLength(),
split.getFileFormat(),
regularColumns,
table.getPredicate());
table.getPredicate().intersect(dynamicFilter.getCurrentPredicate().transform(IcebergColumnHandle.class::cast).simplify(100)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you passed dynamicFilter to IcebergPageSource you could easily block on DF in io.prestosql.spi.connector.ConnectorPageSource#isBlocked. However, this should be behind feature toggle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems from the code that stripe pruning happens before IcebergPageSource is created in createDataPageSource -> createOrcPageSource -> reader.createRecordReader -> OrcRecordReader. So we would miss that even if we block in IcebergPageSource.
I think row group pruning could still be accomplished by blocking on DF in IcebergPageSource if dynamicFilter is pushed into StripeReader as well. Not sure if the changes are worth doing though.
Please correct if I'm missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably create createDataPageSource in IcebergPageSource in a lazy way (when DF is ready). This way we don't allocate resources until DF is ready (I'm not sure it's big of an issue though)


return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.metadata.TableHandle;
import io.prestosql.operator.OperatorStats;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.predicate.NullableValue;
Expand All @@ -31,6 +32,7 @@
import io.prestosql.testing.MaterializedResult;
import io.prestosql.testing.MaterializedRow;
import io.prestosql.testing.QueryRunner;
import io.prestosql.testing.ResultWithQueryId;
import org.apache.iceberg.FileFormat;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;
Expand All @@ -45,9 +47,13 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.testing.Assertions.assertGreaterThan;
import static io.airlift.testing.Assertions.assertLessThan;
import static io.prestosql.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static io.prestosql.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static io.prestosql.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST;
import static io.prestosql.testing.MaterializedResult.resultBuilder;
import static io.prestosql.testing.assertions.Assert.assertEquals;
import static io.prestosql.transaction.TransactionBuilder.transaction;
Expand Down Expand Up @@ -1389,4 +1395,55 @@ private void dropTable(String table)
assertUpdate(session, "DROP TABLE " + table);
assertFalse(getQueryRunner().tableExists(session, table));
}

@Test
public void testLocalDynamicFilterWithEmptyBuildSide()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests rely on fact that probe side will not progress until build side is not ready. Because of that these tests are fragile and might break when we change execution (which we will).
In order to write these tests reliable, Iceberg should support either waiting for local DF or waiting for DF for split generation. Then (in tests) we can enforce presence of DF before table scan happens

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we tackle introduce waiting for split generation like Hive separately and let this one go through with less restrictive assertions ?
Interestingly, TestHiveDistributedJoinQueries#testJoinWithEmptyBuildSide is not flaky despite not using lazy DF to delay probe side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would prevent optimization like #3957
I'm fine having local DF without testing for now (it's one liner), but please create an issue in prestosql (for blocking on DFs locally) and mention it in code.

Alternatively, we can simply add that blocking because it's pretty straight forward (just pass DF future to IcebergPageSource and block there)

{
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
withBroadcastJoin(),
"SELECT * FROM lineitem JOIN supplier ON lineitem.suppkey = supplier.suppkey AND supplier.name = 'abc'");
assertEquals(result.getResult().getRowCount(), 0);

OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem");
assertEquals(probeStats.getInputPositions(), 0);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers());
}

@Test
public void testDynamicFilterWithSelectiveBuildSide()
{
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
withBroadcastJoin(),
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey AND orders.totalprice = 172799.49");
assertGreaterThan(result.getResult().getRowCount(), 0);

OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem");
// Probe-side is partially scanned
assertLessThan(probeStats.getInputPositions(), 60175L);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers());
}

@Test
public void testDynamicFilterWithNonSelectiveBuildSide()
{
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner();
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId(
withBroadcastJoin(),
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use lineitem.suppkey = supplier.suppkey instead.
In this case the dynamic filter is set to all because the build side is too big, whereas we want to test the case where there is a dynamic filter but it doesn't filter anything.

assertGreaterThan(result.getResult().getRowCount(), 0);

OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch.lineitem");
// Probe-side is fully scanned
assertEquals(probeStats.getInputPositions(), 60175L);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), 0);
}

private Session withBroadcastJoin()
{
return Session.builder(this.getQueryRunner().getDefaultSession())
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.prestosql.execution.warnings.WarningCollector;
import io.prestosql.metadata.Metadata;
import io.prestosql.operator.OperatorStats;
import io.prestosql.server.DynamicFilterService;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

import io.prestosql.spi.QueryId;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeOperators;
Expand Down Expand Up @@ -442,7 +443,7 @@ protected OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId,
return false;
}
TableScanNode tableScanNode = (TableScanNode) filterNode.getSource();
return tableName.equals(tableScanNode.getTable().getConnectorHandle().toString());
return tableScanNode.getTable().getConnectorHandle().toString().contains(tableName);
})
.findOnlyElement()
.getId();
Expand Down