Skip to content

Commit

Permalink
Filter out splits that do not match predicates not expressible by tup…
Browse files Browse the repository at this point in the history
…le domain

fixes #9309
fixes #7608
The idea is to push constraint down to split source and not generate splits
that would be filtered out by this constraint's predicate.
  • Loading branch information
homar authored and findepi committed Nov 22, 2021
1 parent 16be328 commit c973251
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ private PlanRoot doPlanQuery()
private void planDistribution(PlanRoot plan)
{
// plan the execution on the active nodes
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService);
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService, typeAnalyzer);
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession());

// ensure split sources are closed
Expand Down
10 changes: 8 additions & 2 deletions core/trino-main/src/main/java/io/trino/split/SplitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public void removeConnectorSplitManager(CatalogName catalogName)
splitManagers.remove(catalogName);
}

public SplitSource getSplits(Session session, TableHandle table, SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter)
public SplitSource getSplits(
Session session,
TableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter,
Constraint constraint)
{
CatalogName catalogName = table.getCatalogName();
ConnectorSplitManager splitManager = getConnectorSplitManager(catalogName);
Expand All @@ -92,7 +97,8 @@ public SplitSource getSplits(Session session, TableHandle table, SplitScheduling
connectorSession,
table.getConnectorHandle(),
splitSchedulingStrategy,
dynamicFilter);
dynamicFilter,
constraint);
}

SplitSource splitSource = new ConnectorAwareSplitSource(catalogName, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.trino.metadata.TableSchema;
import io.trino.operator.StageExecutionDescriptor;
import io.trino.server.DynamicFilterService;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import io.trino.split.SampledSplitSource;
import io.trino.split.SplitManager;
import io.trino.split.SplitSource;
Expand Down Expand Up @@ -67,6 +69,7 @@
import io.trino.sql.planner.plan.UpdateNode;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.sql.planner.plan.WindowNode;
import io.trino.sql.tree.Expression;

import javax.inject.Inject;

Expand All @@ -78,7 +81,9 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.GROUPED_SCHEDULING;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.DynamicFilter.EMPTY;
import static io.trino.sql.ExpressionUtils.filterConjuncts;
import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static java.util.Objects.requireNonNull;

Expand All @@ -89,13 +94,15 @@ public class DistributedExecutionPlanner
private final SplitManager splitManager;
private final Metadata metadata;
private final DynamicFilterService dynamicFilterService;
private final TypeAnalyzer typeAnalyzer;

@Inject
public DistributedExecutionPlanner(SplitManager splitManager, Metadata metadata, DynamicFilterService dynamicFilterService)
public DistributedExecutionPlanner(SplitManager splitManager, Metadata metadata, DynamicFilterService dynamicFilterService, TypeAnalyzer typeAnalyzer)
{
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
}

public StageExecutionPlan plan(SubPlan root, Session session)
Expand Down Expand Up @@ -191,8 +198,10 @@ public Map<PlanNodeId, SplitSource> visitTableScan(TableScanNode node, Void cont

private Map<PlanNodeId, SplitSource> visitScanAndFilter(TableScanNode node, Optional<FilterNode> filter)
{
List<DynamicFilters.Descriptor> dynamicFilters = filter
.map(FilterNode::getPredicate)
Optional<Expression> filterPredicate = filter
.map(FilterNode::getPredicate);

List<DynamicFilters.Descriptor> dynamicFilters = filterPredicate
.map(DynamicFilters::extractDynamicFilters)
.map(DynamicFilters.ExtractResult::getDynamicConjuncts)
.orElse(ImmutableList.of());
Expand All @@ -203,12 +212,19 @@ private Map<PlanNodeId, SplitSource> visitScanAndFilter(TableScanNode node, Opti
dynamicFilter = dynamicFilterService.createDynamicFilter(session.getQueryId(), dynamicFilters, node.getAssignments(), typeProvider);
}

Constraint constraint = filterPredicate
.map(predicate -> filterConjuncts(metadata, predicate, expression -> !DynamicFilters.isDynamicFilter(expression)))
.map(predicate -> new LayoutConstraintEvaluator(metadata, typeAnalyzer, session, typeProvider, node.getAssignments(), predicate))
.map(evaluator -> new Constraint(TupleDomain.all(), evaluator::isCandidate, evaluator.getArguments())) // we are interested only in functional predicate here so we set the summary to ALL.
.orElse(alwaysTrue());

// get dataSource for table
SplitSource splitSource = splitManager.getSplits(
session,
node.getTable(),
stageExecutionDescriptor.isScanGroupedExecution(node.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING,
dynamicFilter);
dynamicFilter,
constraint);

splitSources.add(splitSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static io.trino.matching.Capture.newCapture;
import static io.trino.spi.StandardErrorCode.INVALID_SPATIAL_PARTITIONING;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.DynamicFilter.EMPTY;
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.trino.spi.type.DoubleType.DOUBLE;
Expand Down Expand Up @@ -467,7 +468,7 @@ private static KdbTree loadKdbTree(String tableName, Session session, Metadata m
ColumnHandle kdbTreeColumn = Iterables.getOnlyElement(visibleColumnHandles);

Optional<KdbTree> kdbTree = Optional.empty();
try (SplitSource splitSource = splitManager.getSplits(session, tableHandle, UNGROUPED_SCHEDULING, EMPTY)) {
try (SplitSource splitSource = splitManager.getSplits(session, tableHandle, UNGROUPED_SCHEDULING, EMPTY, alwaysTrue())) {
while (!Thread.currentThread().isInterrupted()) {
SplitBatch splitBatch = getFutureValue(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1000));
List<Split> splits = splitBatch.getSplits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.GROUPED_SCHEDULING;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.DynamicFilter.EMPTY;
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.trino.sql.ParameterUtils.parameterExtractor;
Expand Down Expand Up @@ -861,7 +862,8 @@ private List<Driver> createDrivers(Session session, Plan plan, OutputFactory out
session,
table,
stageExecutionDescriptor.isScanGroupedExecution(tableScan.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING,
EMPTY);
EMPTY,
alwaysTrue());

ImmutableSet.Builder<ScheduledSplit> scheduledSplits = ImmutableSet.builder();
while (!splitSource.isFinished()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;

Expand All @@ -43,7 +44,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
return new FixedSplitSource(splits);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ default ConnectorSplitSource getSplits(
throw new UnsupportedOperationException();
}

@Deprecated
default ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
Expand All @@ -35,6 +36,17 @@ default ConnectorSplitSource getSplits(
throw new UnsupportedOperationException();
}

default ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter,
Constraint constraint)
{
return getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter);
}

enum SplitSchedulingStrategy
{
UNGROUPED_SCHEDULING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableLayoutHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;

import javax.inject.Inject;
Expand Down Expand Up @@ -54,4 +55,18 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
return delegate.getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter);
}
}

@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter,
Constraint constraint)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
return delegate.getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter, constraint);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.type.TypeManager;
Expand Down Expand Up @@ -59,7 +60,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle handle,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
IcebergTableHandle table = (IcebergTableHandle) handle;

Expand All @@ -84,7 +86,8 @@ public ConnectorSplitSource getSplits(
identityPartitionColumns,
tableScan,
dynamicFilter,
dynamicFilteringWaitTimeout);
dynamicFilteringWaitTimeout,
constraint);

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.google.common.collect.Streams;
import io.airlift.units.Duration;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPartitionHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
Expand All @@ -42,12 +45,16 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static com.google.common.base.Suppliers.memoize;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Sets.intersection;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
Expand All @@ -74,6 +81,7 @@ public class IcebergSplitSource
private final DynamicFilter dynamicFilter;
private final long dynamicFilteringWaitTimeoutMillis;
private final Stopwatch dynamicFilterWaitStopwatch;
private final Constraint constraint;

private CloseableIterable<CombinedScanTask> combinedScanIterable;
private Iterator<FileScanTask> fileScanIterator;
Expand All @@ -84,7 +92,8 @@ public IcebergSplitSource(
Set<IcebergColumnHandle> identityPartitionColumns,
TableScan tableScan,
DynamicFilter dynamicFilter,
Duration dynamicFilteringWaitTimeout)
Duration dynamicFilteringWaitTimeout,
Constraint constraint)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.identityPartitionColumns = requireNonNull(identityPartitionColumns, "identityPartitionColumns is null");
Expand All @@ -93,6 +102,7 @@ public IcebergSplitSource(
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis();
this.dynamicFilterWaitStopwatch = Stopwatch.createStarted();
this.constraint = requireNonNull(constraint, "constraint is null");
}

@Override
Expand Down Expand Up @@ -153,10 +163,23 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan

IcebergSplit icebergSplit = toIcebergSplit(scanTask);

Supplier<Map<ColumnHandle, NullableValue>> partitionValues = memoize(() -> {
Map<ColumnHandle, NullableValue> bindings = new HashMap<>();
for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
Object partitionValue = deserializePartitionValue(
partitionColumn.getType(),
icebergSplit.getPartitionKeys().get(partitionColumn.getId()).orElse(null),
partitionColumn.getName());
NullableValue bindingValue = new NullableValue(partitionColumn.getType(), partitionValue);
bindings.put(partitionColumn, bindingValue);
}
return bindings;
});

if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) {
if (!partitionMatchesPredicate(
identityPartitionColumns,
icebergSplit.getPartitionKeys(),
partitionValues,
dynamicFilterPredicate)) {
continue;
}
Expand All @@ -169,6 +192,9 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
continue;
}
}
if (!partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint)) {
continue;
}
splits.add(icebergSplit);
}
return completedFuture(new ConnectorSplitBatch(splits.build(), isFinished()));
Expand Down Expand Up @@ -268,10 +294,25 @@ else if (upperBound != null) {
return Domain.create(ValueSet.ofRanges(statisticsRange), mayContainNulls);
}

static boolean partitionMatchesConstraint(
Set<IcebergColumnHandle> identityPartitionColumns,
Supplier<Map<ColumnHandle, NullableValue>> partitionValues,
Constraint constraint)
{
// We use Constraint just to pass functional predicate here from DistributedExecutionPlanner
verify(constraint.getSummary().isAll());

if (constraint.predicate().isEmpty() ||
constraint.getPredicateColumns().map(predicateColumns -> intersection(predicateColumns, identityPartitionColumns).isEmpty()).orElse(false)) {
return true;
}
return constraint.predicate().get().test(partitionValues.get());
}

@VisibleForTesting
static boolean partitionMatchesPredicate(
Set<IcebergColumnHandle> identityPartitionColumns,
Map<Integer, Optional<String>> partitionKeys,
Supplier<Map<ColumnHandle, NullableValue>> partitionValues,
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate)
{
if (dynamicFilterPredicate.isNone()) {
Expand All @@ -282,8 +323,7 @@ static boolean partitionMatchesPredicate(
for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
Domain allowedDomain = domains.get(partitionColumn);
if (allowedDomain != null) {
Object partitionValue = deserializePartitionValue(partitionColumn.getType(), partitionKeys.get(partitionColumn.getId()).orElse(null), partitionColumn.getName());
if (!allowedDomain.includesNullableValue(partitionValue)) {
if (!allowedDomain.includesNullableValue(partitionValues.get().get(partitionColumn).getValue())) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2443,9 +2443,7 @@ public void testSplitPruningForFilterOnPartitionColumn()
verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey < 2", 2);
verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey < 0", 0);
verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey > 1 AND regionkey < 4", 2);

// TODO(https://github.com/trinodb/trino/issues/9309) should be 1
verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 5);
verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 1);

assertUpdate("DROP TABLE " + tableName);
}
Expand Down
Loading

0 comments on commit c973251

Please sign in to comment.