From ca0addcffefad5a185ab0594d871d0097d974fd5 Mon Sep 17 00:00:00 2001 From: Konrad Dziedzic Date: Tue, 9 Nov 2021 12:53:46 +0100 Subject: [PATCH] Filter out splits that do not match predicates not expressible by tuple domain fixes #9309 fixes #7608 The idea is to push constraint down to split source and not generate splits that would be filtered out by this constraint's predicate. --- .../io/trino/execution/SqlQueryExecution.java | 2 +- .../java/io/trino/split/SplitManager.java | 10 +++- .../planner/DistributedExecutionPlanner.java | 24 +++++++-- .../iterative/rule/ExtractSpatialJoins.java | 3 +- .../io/trino/testing/LocalQueryRunner.java | 4 +- .../io/trino/testing/TestingSplitManager.java | 4 +- .../spi/connector/ConnectorSplitManager.java | 12 +++++ .../ClassLoaderSafeConnectorSplitManager.java | 15 ++++++ .../plugin/iceberg/IcebergSplitManager.java | 7 ++- .../plugin/iceberg/IcebergSplitSource.java | 52 ++++++++++++++++--- .../iceberg/BaseIcebergConnectorTest.java | 4 +- .../iceberg/TestIcebergSplitSource.java | 10 ++-- .../TestKuduIntegrationDynamicFilter.java | 3 +- .../benchmark/AbstractOperatorBenchmark.java | 3 +- 14 files changed, 126 insertions(+), 27 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index 70a40668c77d..80d26e94502f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -512,7 +512,7 @@ private PlanRoot doPlanQuery() private void planDistribution(PlanRoot plan) { // plan the execution on the active nodes - DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService); + DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService, typeAnalyzer); StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession()); // ensure split sources are closed diff --git a/core/trino-main/src/main/java/io/trino/split/SplitManager.java b/core/trino-main/src/main/java/io/trino/split/SplitManager.java index ecf7784445ee..ab5b9bb77ea3 100644 --- a/core/trino-main/src/main/java/io/trino/split/SplitManager.java +++ b/core/trino-main/src/main/java/io/trino/split/SplitManager.java @@ -66,7 +66,12 @@ public void removeConnectorSplitManager(CatalogName catalogName) splitManagers.remove(catalogName); } - public SplitSource getSplits(Session session, TableHandle table, SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter) + public SplitSource getSplits( + Session session, + TableHandle table, + SplitSchedulingStrategy splitSchedulingStrategy, + DynamicFilter dynamicFilter, + Constraint constraint) { CatalogName catalogName = table.getCatalogName(); ConnectorSplitManager splitManager = getConnectorSplitManager(catalogName); @@ -92,7 +97,8 @@ public SplitSource getSplits(Session session, TableHandle table, SplitScheduling connectorSession, table.getConnectorHandle(), splitSchedulingStrategy, - dynamicFilter); + dynamicFilter, + constraint); } SplitSource splitSource = new ConnectorAwareSplitSource(catalogName, source); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java index 2519425634ac..fb6b13d4fe4b 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/DistributedExecutionPlanner.java @@ -23,7 +23,9 @@ import io.trino.metadata.TableSchema; import io.trino.operator.StageExecutionDescriptor; import io.trino.server.DynamicFilterService; +import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.predicate.TupleDomain; import io.trino.split.SampledSplitSource; import io.trino.split.SplitManager; import io.trino.split.SplitSource; @@ -67,6 +69,7 @@ import io.trino.sql.planner.plan.UpdateNode; import io.trino.sql.planner.plan.ValuesNode; import io.trino.sql.planner.plan.WindowNode; +import io.trino.sql.tree.Expression; import javax.inject.Inject; @@ -78,7 +81,9 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.GROUPED_SCHEDULING; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; +import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.connector.DynamicFilter.EMPTY; +import static io.trino.sql.ExpressionUtils.filterConjuncts; import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static java.util.Objects.requireNonNull; @@ -89,13 +94,15 @@ public class DistributedExecutionPlanner private final SplitManager splitManager; private final Metadata metadata; private final DynamicFilterService dynamicFilterService; + private final TypeAnalyzer typeAnalyzer; @Inject - public DistributedExecutionPlanner(SplitManager splitManager, Metadata metadata, DynamicFilterService dynamicFilterService) + public DistributedExecutionPlanner(SplitManager splitManager, Metadata metadata, DynamicFilterService dynamicFilterService, TypeAnalyzer typeAnalyzer) { this.splitManager = requireNonNull(splitManager, "splitManager is null"); this.metadata = requireNonNull(metadata, "metadata is null"); this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null"); + this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null"); } public StageExecutionPlan plan(SubPlan root, Session session) @@ -191,8 +198,10 @@ public Map visitTableScan(TableScanNode node, Void cont private Map visitScanAndFilter(TableScanNode node, Optional filter) { - List dynamicFilters = filter - .map(FilterNode::getPredicate) + Optional filterPredicate = filter + .map(FilterNode::getPredicate); + + List dynamicFilters = filterPredicate .map(DynamicFilters::extractDynamicFilters) .map(DynamicFilters.ExtractResult::getDynamicConjuncts) .orElse(ImmutableList.of()); @@ -203,12 +212,19 @@ private Map visitScanAndFilter(TableScanNode node, Opti dynamicFilter = dynamicFilterService.createDynamicFilter(session.getQueryId(), dynamicFilters, node.getAssignments(), typeProvider); } + Constraint constraint = filterPredicate + .map(predicate -> filterConjuncts(metadata, predicate, expression -> !DynamicFilters.isDynamicFilter(expression))) + .map(predicate -> new LayoutConstraintEvaluator(metadata, typeAnalyzer, session, typeProvider, node.getAssignments(), predicate)) + .map(evaluator -> new Constraint(TupleDomain.all(), evaluator::isCandidate, evaluator.getArguments())) // we are interested only in functional predicate here so we set the summary to ALL. + .orElse(alwaysTrue()); + // get dataSource for table SplitSource splitSource = splitManager.getSplits( session, node.getTable(), stageExecutionDescriptor.isScanGroupedExecution(node.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING, - dynamicFilter); + dynamicFilter, + constraint); splitSources.add(splitSource); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java index e4efd3b15231..3846762c9485 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java @@ -80,6 +80,7 @@ import static io.trino.matching.Capture.newCapture; import static io.trino.spi.StandardErrorCode.INVALID_SPATIAL_PARTITIONING; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; +import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.connector.DynamicFilter.EMPTY; import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static io.trino.spi.type.DoubleType.DOUBLE; @@ -467,7 +468,7 @@ private static KdbTree loadKdbTree(String tableName, Session session, Metadata m ColumnHandle kdbTreeColumn = Iterables.getOnlyElement(visibleColumnHandles); Optional kdbTree = Optional.empty(); - try (SplitSource splitSource = splitManager.getSplits(session, tableHandle, UNGROUPED_SCHEDULING, EMPTY)) { + try (SplitSource splitSource = splitManager.getSplits(session, tableHandle, UNGROUPED_SCHEDULING, EMPTY, alwaysTrue())) { while (!Thread.currentThread().isInterrupted()) { SplitBatch splitBatch = getFutureValue(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1000)); List splits = splitBatch.getSplits(); diff --git a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java index c318b3e46b69..e1f7dbc3b9ba 100644 --- a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java +++ b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java @@ -227,6 +227,7 @@ import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.GROUPED_SCHEDULING; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; +import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.connector.DynamicFilter.EMPTY; import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static io.trino.sql.ParameterUtils.parameterExtractor; @@ -861,7 +862,8 @@ private List createDrivers(Session session, Plan plan, OutputFactory out session, table, stageExecutionDescriptor.isScanGroupedExecution(tableScan.getId()) ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING, - EMPTY); + EMPTY, + alwaysTrue()); ImmutableSet.Builder scheduledSplits = ImmutableSet.builder(); while (!splitSource.isFinished()) { diff --git a/core/trino-main/src/main/java/io/trino/testing/TestingSplitManager.java b/core/trino-main/src/main/java/io/trino/testing/TestingSplitManager.java index 0324bd6adf03..29c5573e3228 100644 --- a/core/trino-main/src/main/java/io/trino/testing/TestingSplitManager.java +++ b/core/trino-main/src/main/java/io/trino/testing/TestingSplitManager.java @@ -20,6 +20,7 @@ import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; @@ -43,7 +44,8 @@ public ConnectorSplitSource getSplits( ConnectorSession session, ConnectorTableHandle table, SplitSchedulingStrategy splitSchedulingStrategy, - DynamicFilter dynamicFilter) + DynamicFilter dynamicFilter, + Constraint constraint) { return new FixedSplitSource(splits); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java index 79f7518eb77c..9887b7debdf0 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java @@ -25,6 +25,7 @@ default ConnectorSplitSource getSplits( throw new UnsupportedOperationException(); } + @Deprecated default ConnectorSplitSource getSplits( ConnectorTransactionHandle transaction, ConnectorSession session, @@ -35,6 +36,17 @@ default ConnectorSplitSource getSplits( throw new UnsupportedOperationException(); } + default ConnectorSplitSource getSplits( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorTableHandle table, + SplitSchedulingStrategy splitSchedulingStrategy, + DynamicFilter dynamicFilter, + Constraint constraint) + { + return getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter); + } + enum SplitSchedulingStrategy { UNGROUPED_SCHEDULING, diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java index def2e09b21e5..f47281251348 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java @@ -20,6 +20,7 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableLayoutHandle; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; import javax.inject.Inject; @@ -54,4 +55,18 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co return delegate.getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter); } } + + @Override + public ConnectorSplitSource getSplits( + ConnectorTransactionHandle transaction, + ConnectorSession session, + ConnectorTableHandle table, + SplitSchedulingStrategy splitSchedulingStrategy, + DynamicFilter dynamicFilter, + Constraint constraint) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.getSplits(transaction, session, table, splitSchedulingStrategy, dynamicFilter, constraint); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java index 14d54ba80476..4fdd9c6b0c74 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java @@ -21,6 +21,7 @@ import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.FixedSplitSource; import io.trino.spi.type.TypeManager; @@ -59,7 +60,8 @@ public ConnectorSplitSource getSplits( ConnectorSession session, ConnectorTableHandle handle, SplitSchedulingStrategy splitSchedulingStrategy, - DynamicFilter dynamicFilter) + DynamicFilter dynamicFilter, + Constraint constraint) { IcebergTableHandle table = (IcebergTableHandle) handle; @@ -84,7 +86,8 @@ public ConnectorSplitSource getSplits( identityPartitionColumns, tableScan, dynamicFilter, - dynamicFilteringWaitTimeout); + dynamicFilteringWaitTimeout, + constraint); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index b78614aeaa14..a9e8b31af5f7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -20,11 +20,14 @@ import com.google.common.collect.Streams; import io.airlift.units.Duration; import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPartitionHandle; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.predicate.ValueSet; @@ -42,12 +45,16 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import static com.google.common.base.Suppliers.memoize; +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.Sets.intersection; import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD; import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino; @@ -74,6 +81,7 @@ public class IcebergSplitSource private final DynamicFilter dynamicFilter; private final long dynamicFilteringWaitTimeoutMillis; private final Stopwatch dynamicFilterWaitStopwatch; + private final Constraint constraint; private CloseableIterable combinedScanIterable; private Iterator fileScanIterator; @@ -84,7 +92,8 @@ public IcebergSplitSource( Set identityPartitionColumns, TableScan tableScan, DynamicFilter dynamicFilter, - Duration dynamicFilteringWaitTimeout) + Duration dynamicFilteringWaitTimeout, + Constraint constraint) { this.tableHandle = requireNonNull(tableHandle, "tableHandle is null"); this.identityPartitionColumns = requireNonNull(identityPartitionColumns, "identityPartitionColumns is null"); @@ -93,6 +102,7 @@ public IcebergSplitSource( this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis(); this.dynamicFilterWaitStopwatch = Stopwatch.createStarted(); + this.constraint = requireNonNull(constraint, "constraint is null"); } @Override @@ -153,10 +163,23 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan IcebergSplit icebergSplit = toIcebergSplit(scanTask); + Supplier> partitionValues = memoize(() -> { + Map bindings = new HashMap<>(); + for (IcebergColumnHandle partitionColumn : identityPartitionColumns) { + Object partitionValue = deserializePartitionValue( + partitionColumn.getType(), + icebergSplit.getPartitionKeys().get(partitionColumn.getId()).orElse(null), + partitionColumn.getName()); + NullableValue bindingValue = new NullableValue(partitionColumn.getType(), partitionValue); + bindings.put(partitionColumn, bindingValue); + } + return bindings; + }); + if (!dynamicFilterPredicate.isAll() && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) { if (!partitionMatchesPredicate( identityPartitionColumns, - icebergSplit.getPartitionKeys(), + partitionValues, dynamicFilterPredicate)) { continue; } @@ -169,6 +192,9 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan continue; } } + if (!partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint)) { + continue; + } splits.add(icebergSplit); } return completedFuture(new ConnectorSplitBatch(splits.build(), isFinished())); @@ -268,10 +294,25 @@ else if (upperBound != null) { return Domain.create(ValueSet.ofRanges(statisticsRange), mayContainNulls); } + static boolean partitionMatchesConstraint( + Set identityPartitionColumns, + Supplier> partitionValues, + Constraint constraint) + { + // We use Constraint just to pass functional predicate here from DistributedExecutionPlanner + verify(constraint.getSummary().isAll()); + + if (constraint.predicate().isEmpty() || + constraint.getPredicateColumns().map(predicateColumns -> intersection(predicateColumns, identityPartitionColumns).isEmpty()).orElse(false)) { + return true; + } + return constraint.predicate().get().test(partitionValues.get()); + } + @VisibleForTesting static boolean partitionMatchesPredicate( Set identityPartitionColumns, - Map> partitionKeys, + Supplier> partitionValues, TupleDomain dynamicFilterPredicate) { if (dynamicFilterPredicate.isNone()) { @@ -282,8 +323,7 @@ static boolean partitionMatchesPredicate( for (IcebergColumnHandle partitionColumn : identityPartitionColumns) { Domain allowedDomain = domains.get(partitionColumn); if (allowedDomain != null) { - Object partitionValue = deserializePartitionValue(partitionColumn.getType(), partitionKeys.get(partitionColumn.getId()).orElse(null), partitionColumn.getName()); - if (!allowedDomain.includesNullableValue(partitionValue)) { + if (!allowedDomain.includesNullableValue(partitionValues.get().get(partitionColumn).getValue())) { return false; } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 2f891b6d811c..7cf980c8556d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -2443,9 +2443,7 @@ public void testSplitPruningForFilterOnPartitionColumn() verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey < 2", 2); verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey < 0", 0); verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey > 1 AND regionkey < 4", 2); - - // TODO(https://github.com/trinodb/trino/issues/9309) should be 1 - verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 5); + verifySplitCount("SELECT * FROM " + tableName + " WHERE regionkey % 5 = 3", 1); assertUpdate("DROP TABLE " + tableName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 1a1279458159..36b319fbbee5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -58,6 +58,7 @@ import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.tpch.TpchTable.NATION; @@ -155,7 +156,8 @@ public TupleDomain getCurrentPredicate() return TupleDomain.all(); } }, - new Duration(2, SECONDS)); + new Duration(2, SECONDS), + alwaysTrue()); ImmutableList.Builder splits = ImmutableList.builder(); while (!splitSource.isFinished()) { @@ -183,15 +185,15 @@ public void testBigintPartitionPruning() Optional.empty()); assertFalse(IcebergSplitSource.partitionMatchesPredicate( ImmutableSet.of(bigintColumn), - ImmutableMap.of(1, Optional.of("1000")), + () -> ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 1000L)), TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 100L))))); assertTrue(IcebergSplitSource.partitionMatchesPredicate( ImmutableSet.of(bigintColumn), - ImmutableMap.of(1, Optional.of("1000")), + () -> ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 1000L)), TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 1000L))))); assertFalse(IcebergSplitSource.partitionMatchesPredicate( ImmutableSet.of(bigintColumn), - ImmutableMap.of(1, Optional.of("1000")), + () -> ImmutableMap.of(bigintColumn, NullableValue.of(BIGINT, 1000L)), TupleDomain.fromFixedValues(ImmutableMap.of(bigintColumn, NullableValue.asNull(BIGINT))))); } diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java index 145636e92b4c..b25f6f06151a 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestKuduIntegrationDynamicFilter.java @@ -53,6 +53,7 @@ import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; import static io.trino.plugin.kudu.KuduQueryRunnerFactory.createKuduQueryRunnerTpch; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; +import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static io.trino.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST; import static io.trino.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.NONE; @@ -101,7 +102,7 @@ public void testIncompleteDynamicFilterTimeout() Optional tableHandle = runner.getMetadata().getTableHandle(session, tableName); assertTrue(tableHandle.isPresent()); SplitSource splitSource = runner.getSplitManager() - .getSplits(session, tableHandle.get(), UNGROUPED_SCHEDULING, new IncompleteDynamicFilter()); + .getSplits(session, tableHandle.get(), UNGROUPED_SCHEDULING, new IncompleteDynamicFilter(), alwaysTrue()); List splits = new ArrayList<>(); while (!splitSource.isFinished()) { splits.addAll(splitSource.getNextBatch(NOT_PARTITIONED, Lifespan.taskWide(), 1000).get().getSplits()); diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java index 977d4a34cccb..e04c4bd33fb5 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java @@ -80,6 +80,7 @@ import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount; import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageSize; import static io.trino.spi.connector.ConnectorSplitManager.SplitSchedulingStrategy.UNGROUPED_SCHEDULING; +import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.connector.DynamicFilter.EMPTY; import static io.trino.spi.connector.NotPartitionedPartitionHandle.NOT_PARTITIONED; import static io.trino.spi.type.BigintType.BIGINT; @@ -199,7 +200,7 @@ public OperatorFactory duplicate() private Split getLocalQuerySplit(Session session, TableHandle handle) { - SplitSource splitSource = localQueryRunner.getSplitManager().getSplits(session, handle, UNGROUPED_SCHEDULING, EMPTY); + SplitSource splitSource = localQueryRunner.getSplitManager().getSplits(session, handle, UNGROUPED_SCHEDULING, EMPTY, alwaysTrue()); List splits = new ArrayList<>(); while (!splitSource.isFinished()) { splits.addAll(getNextBatch(splitSource));