Skip to content

Commit

Permalink
Use dynamic filter to prune Iceberg splits
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and findepi committed Oct 25, 2021
1 parent d02f358 commit 8f767f2
Show file tree
Hide file tree
Showing 8 changed files with 634 additions and 42 deletions.
6 changes: 6 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;
import org.apache.iceberg.FileFormat;

Expand All @@ -24,6 +25,7 @@
import static io.trino.plugin.hive.HiveCompressionCodec.GZIP;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
import static java.util.concurrent.TimeUnit.SECONDS;

public class IcebergConfig
{
Expand All @@ -33,6 +35,7 @@ public class IcebergConfig
private int maxPartitionsPerWriter = 100;
private boolean uniqueTableLocation;
private CatalogType catalogType = HIVE_METASTORE;
private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS);

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -119,4 +122,18 @@ public IcebergConfig setUniqueTableLocation(boolean uniqueTableLocation)
this.uniqueTableLocation = uniqueTableLocation;
return this;
}

@NotNull
public Duration getDynamicFilteringWaitTimeout()
{
return dynamicFilteringWaitTimeout;
}

@Config("iceberg.dynamic-filtering.wait-timeout")
@ConfigDescription("Duration to wait for completion of dynamic filters during split generation")
public IcebergConfig setDynamicFilteringWaitTimeout(Duration dynamicFilteringWaitTimeout)
{
this.dynamicFilteringWaitTimeout = dynamicFilteringWaitTimeout;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.hive.HiveCompressionCodec;
Expand All @@ -33,6 +34,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
Expand Down Expand Up @@ -64,6 +66,7 @@ public final class IcebergSessionProperties
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String PARQUET_WRITER_BATCH_SIZE = "parquet_writer_batch_size";
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand Down Expand Up @@ -190,6 +193,11 @@ public IcebergSessionProperties(
"Parquet: Maximum number of rows passed to the writer in each batch",
parquetWriterConfig.getBatchSize(),
false))
.add(durationProperty(
DYNAMIC_FILTERING_WAIT_TIMEOUT,
"Duration to wait for completion of dynamic filters during split generation",
icebergConfig.getDynamicFilteringWaitTimeout(),
false))
.build();
}

Expand Down Expand Up @@ -310,4 +318,9 @@ public static int getParquetWriterBatchSize(ConnectorSession session)
{
return session.getProperty(PARQUET_WRITER_BATCH_SIZE, Integer.class);
}

public static Duration getDynamicFilteringWaitTimeout(ConnectorSession session)
{
return session.getProperty(DYNAMIC_FILTERING_WAIT_TIMEOUT, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
Expand All @@ -22,12 +23,19 @@
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;

import javax.inject.Inject;

import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
import static io.trino.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand All @@ -36,11 +44,13 @@ public class IcebergSplitManager
public static final int ICEBERG_DOMAIN_COMPACTION_THRESHOLD = 1000;

private final IcebergTransactionManager transactionManager;
private final TypeManager typeManager;

@Inject
public IcebergSplitManager(IcebergTransactionManager transactionManager)
public IcebergSplitManager(IcebergTransactionManager transactionManager, TypeManager typeManager)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
Expand All @@ -58,17 +68,24 @@ public ConnectorSplitSource getSplits(
}

Table icebergTable = transactionManager.get(transaction).getIcebergTable(session, table.getSchemaTableName());
Duration dynamicFilteringWaitTimeout = getDynamicFilteringWaitTimeout(session);

Set<Integer> identityPartitionFieldIds = getIdentityPartitions(icebergTable.spec()).keySet().stream()
.map(PartitionField::sourceId)
.collect(toImmutableSet());
Set<IcebergColumnHandle> identityPartitionColumns = getColumns(icebergTable.schema(), typeManager).stream()
.filter(column -> identityPartitionFieldIds.contains(column.getId()))
.collect(toImmutableSet());

TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(
table.getEnforcedPredicate()
// TODO: Remove TupleDomain#simplify once Iceberg supports IN expression. Currently this
// is required for IN predicates on non-partition columns with large value list. Such
// predicates on partition columns are not supported.
// (See AbstractTestIcebergSmoke#testLargeInFailureOnPartitionedColumns)
.intersect(table.getUnenforcedPredicate().simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD))))
.useSnapshot(table.getSnapshotId().get());
IcebergSplitSource splitSource = new IcebergSplitSource(table.getSchemaTableName(), tableScan.planTasks());
IcebergSplitSource splitSource = new IcebergSplitSource(
table,
identityPartitionColumns,
tableScan,
dynamicFilter,
session.getTimeZoneKey(),
dynamicFilteringWaitTimeout);

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
Expand Down
Loading

0 comments on commit 8f767f2

Please sign in to comment.