Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter out Iceberg splits that do not match predicates not expressible by tuple domain #9830

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public class SqlQueryExecution
private final Slug slug;
private final Metadata metadata;
private final TypeOperators typeOperators;
private final SqlParser sqlParser;
private final SplitManager splitManager;
private final NodePartitioningManager nodePartitioningManager;
private final NodeScheduler nodeScheduler;
Expand All @@ -133,6 +132,7 @@ public class SqlQueryExecution
private final CostCalculator costCalculator;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;
private final TypeAnalyzer typeAnalyzer;

private SqlQueryExecution(
PreparedQuery preparedQuery,
Expand Down Expand Up @@ -161,13 +161,13 @@ private SqlQueryExecution(
CostCalculator costCalculator,
DynamicFilterService dynamicFilterService,
WarningCollector warningCollector,
TableExecuteContextManager tableExecuteContextManager)
TableExecuteContextManager tableExecuteContextManager,
TypeAnalyzer typeAnalyzer)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
this.slug = requireNonNull(slug, "slug is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the sqlParser parameter is unused now, remove it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used, in line 193. it is passed to analyze method

this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
Expand Down Expand Up @@ -217,6 +217,7 @@ private SqlQueryExecution(
});

this.remoteTaskFactory = new MemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine);
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
}
}

Expand Down Expand Up @@ -488,7 +489,7 @@ private PlanRoot doPlanQuery()
idAllocator,
metadata,
typeOperators,
new TypeAnalyzer(sqlParser, metadata),
findepi marked this conversation as resolved.
Show resolved Hide resolved
typeAnalyzer,
statsCalculator,
costCalculator,
stateMachine.getWarningCollector());
Expand All @@ -511,7 +512,7 @@ private PlanRoot doPlanQuery()
private void planDistribution(PlanRoot plan)
{
// plan the execution on the active nodes
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService);
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService, typeAnalyzer);
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession());

// ensure split sources are closed
Expand Down Expand Up @@ -750,6 +751,7 @@ public static class SqlQueryExecutionFactory
private final CostCalculator costCalculator;
private final DynamicFilterService dynamicFilterService;
private final TableExecuteContextManager tableExecuteContextManager;
private final TypeAnalyzer typeAnalyzer;

@Inject
SqlQueryExecutionFactory(
Expand All @@ -775,7 +777,8 @@ public static class SqlQueryExecutionFactory
StatsCalculator statsCalculator,
CostCalculator costCalculator,
DynamicFilterService dynamicFilterService,
TableExecuteContextManager tableExecuteContextManager)
TableExecuteContextManager tableExecuteContextManager,
TypeAnalyzer typeAnalyzer)
{
requireNonNull(config, "config is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
Expand All @@ -801,6 +804,7 @@ public static class SqlQueryExecutionFactory
this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
}

@Override
Expand Down Expand Up @@ -841,7 +845,8 @@ public QueryExecution createQueryExecution(
costCalculator,
dynamicFilterService,
warningCollector,
tableExecuteContextManager);
tableExecuteContextManager,
typeAnalyzer);
}
}
}
10 changes: 8 additions & 2 deletions core/trino-main/src/main/java/io/trino/split/SplitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public void removeConnectorSplitManager(CatalogName catalogName)
splitManagers.remove(catalogName);
}

public SplitSource getSplits(Session session, TableHandle table, SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter)
public SplitSource getSplits(
Session session,
TableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter,
Constraint constraint)
{
CatalogName catalogName = table.getCatalogName();
ConnectorSplitManager splitManager = getConnectorSplitManager(catalogName);
Expand All @@ -92,7 +97,8 @@ public SplitSource getSplits(Session session, TableHandle table, SplitScheduling
connectorSession,
table.getConnectorHandle(),
splitSchedulingStrategy,
dynamicFilter);
dynamicFilter,
constraint);
}

SplitSource splitSource = new ConnectorAwareSplitSource(catalogName, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import io.trino.metadata.TableSchema;
import io.trino.operator.StageExecutionDescriptor;
import io.trino.server.DynamicFilterService;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import io.trino.split.SampledSplitSource;
import io.trino.split.SplitManager;
import io.trino.split.SplitSource;
Expand Down Expand Up @@ -67,6 +69,7 @@
import io.trino.sql.planner.plan.UpdateNode;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.sql.planner.plan.WindowNode;
import io.trino.sql.tree.Expression;

import javax.inject.Inject;

Expand All @@ -78,7 +81,9 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.GROUPED_SCHEDULING;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.DynamicFilter.EMPTY;
import static io.trino.sql.ExpressionUtils.filterConjuncts;
import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static java.util.Objects.requireNonNull;

Expand All @@ -89,13 +94,15 @@ public class DistributedExecutionPlanner
private final SplitManager splitManager;
private final Metadata metadata;
private final DynamicFilterService dynamicFilterService;
private final TypeAnalyzer typeAnalyzer;

@Inject
public DistributedExecutionPlanner(SplitManager splitManager, Metadata metadata, DynamicFilterService dynamicFilterService)
public DistributedExecutionPlanner(SplitManager splitManager, Metadata metadata, DynamicFilterService dynamicFilterService, TypeAnalyzer typeAnalyzer)
{
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
}

public StageExecutionPlan plan(SubPlan root, Session session)
Expand Down Expand Up @@ -191,8 +198,10 @@ public Map<PlanNodeId, SplitSource> visitTableScan(TableScanNode node, Void cont

private Map<PlanNodeId, SplitSource> visitScanAndFilter(TableScanNode node, Optional<FilterNode> filter)
{
List<DynamicFilters.Descriptor> dynamicFilters = filter
.map(FilterNode::getPredicate)
Optional<Expression> filterPredicate = filter
.map(FilterNode::getPredicate);

List<DynamicFilters.Descriptor> dynamicFilters = filterPredicate
.map(DynamicFilters::extractDynamicFilters)
.map(DynamicFilters.ExtractResult::getDynamicConjuncts)
.orElse(ImmutableList.of());
Expand All @@ -203,12 +212,19 @@ private Map<PlanNodeId, SplitSource> visitScanAndFilter(TableScanNode node, Opti
dynamicFilter = dynamicFilterService.createDynamicFilter(session.getQueryId(), dynamicFilters, node.getAssignments(), typeProvider);
}

Constraint constraint = filterPredicate
.map(predicate -> filterConjuncts(metadata, predicate, expression -> !DynamicFilters.isDynamicFilter(expression)))
.map(predicate -> new LayoutConstraintEvaluator(metadata, typeAnalyzer, session, typeProvider, node.getAssignments(), predicate))
.map(evaluator -> new Constraint(TupleDomain.all(), evaluator::isCandidate, evaluator.getArguments())) // we are interested only in functional predicate here so we set the summary to ALL.
.orElse(alwaysTrue());

// get dataSource for table
SplitSource splitSource = splitManager.getSplits(
session,
node.getTable(),
stageExecutionDescriptor.isScanGroupedExecution(node.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING,
dynamicFilter);
dynamicFilter,
constraint);

splitSources.add(splitSource);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner;

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.metadata.Metadata;
import io.trino.operator.scalar.TryFunction;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.predicate.NullableValue;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.NullLiteral;

import java.util.Map;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.intersection;
import static java.util.Objects.requireNonNull;

public class LayoutConstraintEvaluator
{
private final Map<Symbol, ColumnHandle> assignments;
private final ExpressionInterpreter evaluator;
private final Set<ColumnHandle> arguments;

public LayoutConstraintEvaluator(Metadata metadata, TypeAnalyzer typeAnalyzer, Session session, TypeProvider types, Map<Symbol, ColumnHandle> assignments, Expression expression)
{
this.assignments = ImmutableMap.copyOf(requireNonNull(assignments, "assignments is null"));
evaluator = new ExpressionInterpreter(expression, metadata, session, typeAnalyzer.getTypes(session, types, expression));
arguments = SymbolsExtractor.extractUnique(expression).stream()
.map(assignments::get)
.collect(toImmutableSet());
}

public Set<ColumnHandle> getArguments()
{
return arguments;
}

public boolean isCandidate(Map<ColumnHandle, NullableValue> bindings)
{
if (intersection(bindings.keySet(), arguments).isEmpty()) {
return true;
}
LookupSymbolResolver inputs = new LookupSymbolResolver(assignments, bindings);

// Skip pruning if evaluation fails in a recoverable way. Failing here can cause
// spurious query failures for partitions that would otherwise be filtered out.
Object optimized = TryFunction.evaluate(() -> evaluator.optimize(inputs), true);

// If any conjuncts evaluate to FALSE or null, then the whole predicate will never be true and so the partition should be pruned
return !(Boolean.FALSE.equals(optimized) || optimized == null || optimized instanceof NullLiteral);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static io.trino.matching.Capture.newCapture;
import static io.trino.spi.StandardErrorCode.INVALID_SPATIAL_PARTITIONING;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.DynamicFilter.EMPTY;
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.trino.spi.type.DoubleType.DOUBLE;
Expand Down Expand Up @@ -467,7 +468,7 @@ private static KdbTree loadKdbTree(String tableName, Session session, Metadata m
ColumnHandle kdbTreeColumn = Iterables.getOnlyElement(visibleColumnHandles);

Optional<KdbTree> kdbTree = Optional.empty();
try (SplitSource splitSource = splitManager.getSplits(session, tableHandle, UNGROUPED_SCHEDULING, EMPTY)) {
try (SplitSource splitSource = splitManager.getSplits(session, tableHandle, UNGROUPED_SCHEDULING, EMPTY, alwaysTrue())) {
while (!Thread.currentThread().isInterrupted()) {
SplitBatch splitBatch = getFutureValue(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1000));
List<Split> splits = splitBatch.getSplits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,29 @@
import io.trino.metadata.TableLayoutResult;
import io.trino.metadata.TableProperties;
import io.trino.metadata.TableProperties.TablePartitioning;
import io.trino.operator.scalar.TryFunction;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.planner.DomainTranslator;
import io.trino.sql.planner.ExpressionInterpreter;
import io.trino.sql.planner.LookupSymbolResolver;
import io.trino.sql.planner.LayoutConstraintEvaluator;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.SymbolAllocator;
import io.trino.sql.planner.SymbolsExtractor;
import io.trino.sql.planner.TypeAnalyzer;
import io.trino.sql.planner.TypeProvider;
import io.trino.sql.planner.iterative.Rule;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.PlanNode;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.planner.plan.ValuesNode;
import io.trino.sql.tree.Expression;
import io.trino.sql.tree.NullLiteral;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Sets.intersection;
import static io.trino.SystemSessionProperties.isAllowPushdownIntoConnectors;
import static io.trino.matching.Capture.newCapture;
import static io.trino.metadata.TableLayoutResult.computeEnforced;
Expand Down Expand Up @@ -346,41 +338,4 @@ static Expression createResultingPredicate(

return expression;
}

private static class LayoutConstraintEvaluator
{
private final Map<Symbol, ColumnHandle> assignments;
private final ExpressionInterpreter evaluator;
private final Set<ColumnHandle> arguments;

public LayoutConstraintEvaluator(Metadata metadata, TypeAnalyzer typeAnalyzer, Session session, TypeProvider types, Map<Symbol, ColumnHandle> assignments, Expression expression)
{
this.assignments = assignments;

evaluator = new ExpressionInterpreter(expression, metadata, session, typeAnalyzer.getTypes(session, types, expression));
arguments = SymbolsExtractor.extractUnique(expression).stream()
.map(assignments::get)
.collect(toImmutableSet());
}

public Set<ColumnHandle> getArguments()
{
return arguments;
}

private boolean isCandidate(Map<ColumnHandle, NullableValue> bindings)
{
if (intersection(bindings.keySet(), arguments).isEmpty()) {
return true;
}
LookupSymbolResolver inputs = new LookupSymbolResolver(assignments, bindings);

// Skip pruning if evaluation fails in a recoverable way. Failing here can cause
// spurious query failures for partitions that would otherwise be filtered out.
Object optimized = TryFunction.evaluate(() -> evaluator.optimize(inputs), true);

// If any conjuncts evaluate to FALSE or null, then the whole predicate will never be true and so the partition should be pruned
return !(Boolean.FALSE.equals(optimized) || optimized == null || optimized instanceof NullLiteral);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.GROUPED_SCHEDULING;
import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING;
import static io.trino.spi.connector.Constraint.alwaysTrue;
import static io.trino.spi.connector.DynamicFilter.EMPTY;
import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED;
import static io.trino.sql.ParameterUtils.parameterExtractor;
Expand Down Expand Up @@ -859,7 +860,8 @@ private List<Driver> createDrivers(Session session, Plan plan, OutputFactory out
session,
table,
stageExecutionDescriptor.isScanGroupedExecution(tableScan.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING,
EMPTY);
EMPTY,
alwaysTrue());

ImmutableSet.Builder<ScheduledSplit> scheduledSplits = ImmutableSet.builder();
while (!splitSource.isFinished()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;

Expand All @@ -43,7 +44,8 @@ public ConnectorSplitSource getSplits(
ConnectorSession session,
ConnectorTableHandle table,
SplitSchedulingStrategy splitSchedulingStrategy,
DynamicFilter dynamicFilter)
DynamicFilter dynamicFilter,
Constraint constraint)
{
return new FixedSplitSource(splits);
}
Expand Down
Loading