diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 869be0bbafd9..700e3086b73f 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -171,6 +171,16 @@ implementation is used: `query_partition_filter_required` catalog session property for temporary, catalog specific use. - `false` +* - `iceberg.query-partition-filter-required-common-fields` + - Comma-separated list of field names to must be used in the filter of queries, + if those fields are the partition fields of each table. + - `null` +* - `iceberg.query-partition-pruning-required` + - Set to `true` to force a query to use a partition pruning in the query plan. + This can be enabled only with `iceberg.query-partition-filter-required=true`. + You can use the`query_partition_pruning_required` catalog session property, + for temporary catalog specific use. + - `false` ::: ## Type mapping diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 67e4b9675c2a..0aadc7905a35 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -21,6 +21,7 @@ import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionCodec; import jakarta.validation.constraints.AssertFalse; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.DecimalMax; import jakarta.validation.constraints.DecimalMin; import jakarta.validation.constraints.Max; @@ -78,6 +79,8 @@ public class IcebergConfig private Optional materializedViewsStorageSchema = Optional.empty(); private boolean sortedWritingEnabled = true; private boolean queryPartitionFilterRequired; + private boolean queryPartitionPruningRequired; + private Optional queryPartitionFilterRequiredCommonFields = Optional.empty(); public CatalogType getCatalogType() { @@ -413,6 +416,38 @@ public boolean isQueryPartitionFilterRequired() return queryPartitionFilterRequired; } + public Optional getQueryPartitionFilterRequiredCommonFields() + { + return queryPartitionFilterRequiredCommonFields; + } + + @Config("iceberg.query-partition-filter-required-common-fields") + @ConfigDescription("Require filter predicates on all the partition fields declared in this configuration") + public IcebergConfig setQueryPartitionFilterRequiredCommonFields(String queryPartitionFilterRequiredCommonFields) + { + this.queryPartitionFilterRequiredCommonFields = Optional.ofNullable(queryPartitionFilterRequiredCommonFields); + return this; + } + + @Config("iceberg.query-partition-pruning-required") + @ConfigDescription("Require a partition pruning on at least one partition column in the query plan. This can be enabled only with iceberg.query-partition-filter-required=true") + public IcebergConfig setQueryPartitionPruningRequired(boolean queryPartitionPruningRequired) + { + this.queryPartitionPruningRequired = queryPartitionPruningRequired; + return this; + } + + public boolean isQueryPartitionPruningRequired() + { + return queryPartitionPruningRequired; + } + + @AssertTrue(message = "iceberg.query-partition-pruning-required may only be enabled when iceberg.query-partition-filter-required is set to true") + public boolean isQueryPartitionPruningEnabledWhenPartitionFilterIsEnabled() + { + return !queryPartitionPruningRequired || queryPartitionFilterRequired; + } + @AssertFalse(message = "iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false") public boolean isStorageSchemaSetWhenHidingIsEnabled() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 9595bb202cde..40e5f64cc4cd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -186,6 +186,7 @@ import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -219,12 +220,14 @@ import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention; import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName; +import static io.trino.plugin.iceberg.IcebergSessionProperties.getQueryPartitionFilterRequiredCommonFields; import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention; import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite; import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite; import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled; import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionFilterRequired; +import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionPruningRequired; import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled; import static io.trino.plugin.iceberg.IcebergTableName.isDataTable; import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage; @@ -312,6 +315,7 @@ public class IcebergMetadata public static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns"; public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp"; + public static final String QUERY_PARTITION_FILTER_REQUIRED_FIELDS_CONFIG_KEY = "trino.query-partition-filter-required.fields"; public static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES"; private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME); @@ -722,35 +726,75 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable public void validateScan(ConnectorSession session, ConnectorTableHandle handle) { IcebergTableHandle table = (IcebergTableHandle) handle; - if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) { - Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); - Optional partitionSpec = table.getPartitionSpecJson() - .map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson)); - if (partitionSpec.isEmpty() || partitionSpec.get().isUnpartitioned()) { - return; - } - Set columnsWithPredicates = new HashSet<>(); + if (!isQueryPartitionFilterRequired(session) || table.getForAnalyze().orElse(false)) { + return; + } + + Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); + Optional partitionSpec = table.getPartitionSpecJson() + .map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson)); + if (partitionSpec.isEmpty() || partitionSpec.get().isUnpartitioned()) { + return; + } + + Supplier> tableAllPartitionColumnIdStreamSupplier = () -> partitionSpec.get().fields().stream() + .filter(field -> !field.transform().isVoid()) + .map(PartitionField::sourceId); + + Set partitionColumns = tableAllPartitionColumnIdStreamSupplier.get() + .collect(toImmutableSet()); + + Set mandatoryFilterPartitionColumnNames = new HashSet<>(); + if (table.getStorageProperties().containsKey(QUERY_PARTITION_FILTER_REQUIRED_FIELDS_CONFIG_KEY)) { + Splitter.on(",").splitToStream(table.getStorageProperties().get(QUERY_PARTITION_FILTER_REQUIRED_FIELDS_CONFIG_KEY)) + .map(String::trim) + .forEach(mandatoryFilterPartitionColumnNames::add); + } + if (getQueryPartitionFilterRequiredCommonFields(session).isPresent()) { + Splitter.on(",").splitToStream(getQueryPartitionFilterRequiredCommonFields(session).get()) + .map(String::trim) + .forEach(mandatoryFilterPartitionColumnNames::add); + } + + Set mandatoryPartitionColumns = tableAllPartitionColumnIdStreamSupplier.get() + .filter(id -> mandatoryFilterPartitionColumnNames.contains(schema.idToName().get(id))) + .collect(toImmutableSet()); + + Set columnsWithPredicates = new HashSet<>(); + if (!isQueryPartitionPruningRequired(session)) { table.getConstraintColumns().stream() .map(IcebergColumnHandle::getId) .forEach(columnsWithPredicates::add); - table.getUnenforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream() - .map(IcebergColumnHandle::getId) - .forEach(columnsWithPredicates::add)); - Set partitionColumns = partitionSpec.get().fields().stream() - .filter(field -> !field.transform().isVoid()) - .map(PartitionField::sourceId) - .collect(toImmutableSet()); - if (Collections.disjoint(columnsWithPredicates, partitionColumns)) { - String partitionColumnNames = partitionSpec.get().fields().stream() - .filter(field -> !field.transform().isVoid()) - .map(PartitionField::sourceId) + } + table.getEnforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream() + .map(IcebergColumnHandle::getId) + .forEach(columnsWithPredicates::add)); + table.getUnenforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream() + .map(IcebergColumnHandle::getId) + .forEach(columnsWithPredicates::add)); + + String requiredOption = isQueryPartitionPruningRequired(session) ? "Pruning" : "Filter"; + if (!mandatoryPartitionColumns.isEmpty()) { + if (!difference(mandatoryPartitionColumns, columnsWithPredicates).isEmpty()) { + String partitionColumnNames = tableAllPartitionColumnIdStreamSupplier.get() .map(id -> schema.idToName().get(id)) + .filter(mandatoryFilterPartitionColumnNames::contains) .collect(joining(", ")); + throw new TrinoException( QUERY_REJECTED, - format("Filter required for %s on at least one of the partition columns: %s", table.getSchemaTableName(), partitionColumnNames)); + format("%s required for %s on all the mandatory partition columns: %s", requiredOption, table.getSchemaTableName(), partitionColumnNames)); } } + else if (Collections.disjoint(partitionColumns, columnsWithPredicates)) { + String partitionColumnNames = tableAllPartitionColumnIdStreamSupplier.get() + .map(id -> schema.idToName().get(id)) + .collect(joining(", ")); + + throw new TrinoException( + QUERY_REJECTED, + format("%s required for %s on at least one of the partition columns: %s", requiredOption, table.getSchemaTableName(), partitionColumnNames)); + } } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index 613eea702275..01b766d5f9c2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -93,6 +93,8 @@ public final class IcebergSessionProperties private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write"; private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled"; private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; + private static final String QUERY_PARTITION_FILTER_REQUIRED_COMMON_FIELDS = "query-partition-filter-required-common-fields"; + private static final String QUERY_PARTITION_PRUNING_REQUIRED = "query_partition_pruning_required"; private final List> sessionProperties; @@ -327,6 +329,16 @@ public IcebergSessionProperties( "Require filter on partition column", icebergConfig.isQueryPartitionFilterRequired(), false)) + .add(stringProperty( + QUERY_PARTITION_FILTER_REQUIRED_COMMON_FIELDS, + "Require filter predicates on all the partition fields declared in this configuration", + icebergConfig.getQueryPartitionFilterRequiredCommonFields().orElse(null), + false)) + .add(booleanProperty( + QUERY_PARTITION_PRUNING_REQUIRED, + "Require pruning on partition column", + icebergConfig.isQueryPartitionPruningRequired(), + false)) .build(); } @@ -537,4 +549,14 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session) { return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); } + + public static Optional getQueryPartitionFilterRequiredCommonFields(ConnectorSession session) + { + return Optional.ofNullable(session.getProperty(QUERY_PARTITION_FILTER_REQUIRED_COMMON_FIELDS, String.class)); + } + + public static boolean isQueryPartitionPruningRequired(ConnectorSession session) + { + return session.getProperty(QUERY_PARTITION_PRUNING_REQUIRED, Boolean.class); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 4ca2417beba4..0ffe029023ae 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -66,7 +66,9 @@ public void testDefaults() .setMaterializedViewsStorageSchema(null) .setRegisterTableProcedureEnabled(false) .setSortedWritingEnabled(true) - .setQueryPartitionFilterRequired(false)); + .setQueryPartitionFilterRequired(false) + .setQueryPartitionFilterRequiredCommonFields(null) + .setQueryPartitionPruningRequired(false)); } @Test @@ -97,6 +99,8 @@ public void testExplicitPropertyMappings() .put("iceberg.register-table-procedure.enabled", "true") .put("iceberg.sorted-writing-enabled", "false") .put("iceberg.query-partition-filter-required", "true") + .put("iceberg.query-partition-filter-required-common-fields", "log_ts,timestamp") + .put("iceberg.query-partition-pruning-required", "true") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -123,7 +127,9 @@ public void testExplicitPropertyMappings() .setMaterializedViewsStorageSchema("mv_storage_schema") .setRegisterTableProcedureEnabled(true) .setSortedWritingEnabled(false) - .setQueryPartitionFilterRequired(true); + .setQueryPartitionFilterRequired(true) + .setQueryPartitionFilterRequiredCommonFields("log_ts,timestamp") + .setQueryPartitionPruningRequired(true); assertFullMapping(properties, expected); }