Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config to reject Iceberg queries without partition pruning #20118

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ implementation is used:
`query_partition_filter_required` catalog session property for temporary,
catalog specific use.
- `false`
* - `iceberg.query-partition-filter-required-common-fields`
- Comma-separated list of field names to must be used in the filter of queries,
if those fields are the partition fields of each table.
- `null`
* - `iceberg.query-partition-pruning-required`
- Set to `true` to force a query to use a partition pruning in the query plan.
This can be enabled only with `iceberg.query-partition-filter-required=true`.
You can use the`query_partition_pruning_required` catalog session property,
for temporary catalog specific use.
- `false`
:::

## Type mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;
import jakarta.validation.constraints.AssertFalse;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.DecimalMax;
import jakarta.validation.constraints.DecimalMin;
import jakarta.validation.constraints.Max;
Expand Down Expand Up @@ -78,6 +79,8 @@ public class IcebergConfig
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
private boolean queryPartitionPruningRequired;
private Optional<String> queryPartitionFilterRequiredCommonFields = Optional.empty();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -413,6 +416,38 @@ public boolean isQueryPartitionFilterRequired()
return queryPartitionFilterRequired;
}

public Optional<String> getQueryPartitionFilterRequiredCommonFields()
{
return queryPartitionFilterRequiredCommonFields;
}

@Config("iceberg.query-partition-filter-required-common-fields")
@ConfigDescription("Require filter predicates on all the partition fields declared in this configuration")
public IcebergConfig setQueryPartitionFilterRequiredCommonFields(String queryPartitionFilterRequiredCommonFields)
{
this.queryPartitionFilterRequiredCommonFields = Optional.ofNullable(queryPartitionFilterRequiredCommonFields);
return this;
}

@Config("iceberg.query-partition-pruning-required")
@ConfigDescription("Require a partition pruning on at least one partition column in the query plan. This can be enabled only with iceberg.query-partition-filter-required=true")
public IcebergConfig setQueryPartitionPruningRequired(boolean queryPartitionPruningRequired)
{
this.queryPartitionPruningRequired = queryPartitionPruningRequired;
return this;
}

public boolean isQueryPartitionPruningRequired()
{
return queryPartitionPruningRequired;
}

@AssertTrue(message = "iceberg.query-partition-pruning-required may only be enabled when iceberg.query-partition-filter-required is set to true")
public boolean isQueryPartitionPruningEnabledWhenPartitionFilterIsEnabled()
{
return !queryPartitionPruningRequired || queryPartitionFilterRequired;
}

@AssertFalse(message = "iceberg.materialized-views.storage-schema may only be set when iceberg.materialized-views.hide-storage-table is set to false")
public boolean isStorageSchemaSetWhenHidingIsEnabled()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -219,12 +220,14 @@
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getQueryPartitionFilterRequiredCommonFields;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionPruningRequired;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergTableName.isDataTable;
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
Expand Down Expand Up @@ -312,6 +315,7 @@ public class IcebergMetadata

public static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";
public static final String QUERY_PARTITION_FILTER_REQUIRED_FIELDS_CONFIG_KEY = "trino.query-partition-filter-required.fields";

public static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES";
private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME);
Expand Down Expand Up @@ -722,35 +726,75 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) {
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Optional<PartitionSpec> partitionSpec = table.getPartitionSpecJson()
.map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson));
if (partitionSpec.isEmpty() || partitionSpec.get().isUnpartitioned()) {
return;
}
Set<Integer> columnsWithPredicates = new HashSet<>();
if (!isQueryPartitionFilterRequired(session) || table.getForAnalyze().orElse(false)) {
return;
}

Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Optional<PartitionSpec> partitionSpec = table.getPartitionSpecJson()
.map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson));
if (partitionSpec.isEmpty() || partitionSpec.get().isUnpartitioned()) {
return;
}

Supplier<Stream<Integer>> tableAllPartitionColumnIdStreamSupplier = () -> partitionSpec.get().fields().stream()
.filter(field -> !field.transform().isVoid())
.map(PartitionField::sourceId);

Set<Integer> partitionColumns = tableAllPartitionColumnIdStreamSupplier.get()
.collect(toImmutableSet());

Set<String> mandatoryFilterPartitionColumnNames = new HashSet<>();
if (table.getStorageProperties().containsKey(QUERY_PARTITION_FILTER_REQUIRED_FIELDS_CONFIG_KEY)) {
Splitter.on(",").splitToStream(table.getStorageProperties().get(QUERY_PARTITION_FILTER_REQUIRED_FIELDS_CONFIG_KEY))
.map(String::trim)
.forEach(mandatoryFilterPartitionColumnNames::add);
}
if (getQueryPartitionFilterRequiredCommonFields(session).isPresent()) {
Splitter.on(",").splitToStream(getQueryPartitionFilterRequiredCommonFields(session).get())
.map(String::trim)
.forEach(mandatoryFilterPartitionColumnNames::add);
}

Set<Integer> mandatoryPartitionColumns = tableAllPartitionColumnIdStreamSupplier.get()
.filter(id -> mandatoryFilterPartitionColumnNames.contains(schema.idToName().get(id)))
.collect(toImmutableSet());

Set<Integer> columnsWithPredicates = new HashSet<>();
if (!isQueryPartitionPruningRequired(session)) {
table.getConstraintColumns().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add);
table.getUnenforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add));
Set<Integer> partitionColumns = partitionSpec.get().fields().stream()
.filter(field -> !field.transform().isVoid())
.map(PartitionField::sourceId)
.collect(toImmutableSet());
if (Collections.disjoint(columnsWithPredicates, partitionColumns)) {
String partitionColumnNames = partitionSpec.get().fields().stream()
.filter(field -> !field.transform().isVoid())
.map(PartitionField::sourceId)
}
table.getEnforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add));
table.getUnenforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add));

String requiredOption = isQueryPartitionPruningRequired(session) ? "Pruning" : "Filter";
if (!mandatoryPartitionColumns.isEmpty()) {
if (!difference(mandatoryPartitionColumns, columnsWithPredicates).isEmpty()) {
String partitionColumnNames = tableAllPartitionColumnIdStreamSupplier.get()
.map(id -> schema.idToName().get(id))
.filter(mandatoryFilterPartitionColumnNames::contains)
.collect(joining(", "));

throw new TrinoException(
QUERY_REJECTED,
format("Filter required for %s on at least one of the partition columns: %s", table.getSchemaTableName(), partitionColumnNames));
format("%s required for %s on all the mandatory partition columns: %s", requiredOption, table.getSchemaTableName(), partitionColumnNames));
}
}
else if (Collections.disjoint(partitionColumns, columnsWithPredicates)) {
String partitionColumnNames = tableAllPartitionColumnIdStreamSupplier.get()
.map(id -> schema.idToName().get(id))
.collect(joining(", "));

throw new TrinoException(
QUERY_REJECTED,
format("%s required for %s on at least one of the partition columns: %s", requiredOption, table.getSchemaTableName(), partitionColumnNames));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public final class IcebergSessionProperties
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String QUERY_PARTITION_FILTER_REQUIRED_COMMON_FIELDS = "query-partition-filter-required-common-fields";
private static final String QUERY_PARTITION_PRUNING_REQUIRED = "query_partition_pruning_required";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -327,6 +329,16 @@ public IcebergSessionProperties(
"Require filter on partition column",
icebergConfig.isQueryPartitionFilterRequired(),
false))
.add(stringProperty(
QUERY_PARTITION_FILTER_REQUIRED_COMMON_FIELDS,
"Require filter predicates on all the partition fields declared in this configuration",
icebergConfig.getQueryPartitionFilterRequiredCommonFields().orElse(null),
false))
.add(booleanProperty(
QUERY_PARTITION_PRUNING_REQUIRED,
"Require pruning on partition column",
icebergConfig.isQueryPartitionPruningRequired(),
false))
.build();
}

Expand Down Expand Up @@ -537,4 +549,14 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

public static Optional<String> getQueryPartitionFilterRequiredCommonFields(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(QUERY_PARTITION_FILTER_REQUIRED_COMMON_FIELDS, String.class));
}

public static boolean isQueryPartitionPruningRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_PRUNING_REQUIRED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public void testDefaults()
.setMaterializedViewsStorageSchema(null)
.setRegisterTableProcedureEnabled(false)
.setSortedWritingEnabled(true)
.setQueryPartitionFilterRequired(false));
.setQueryPartitionFilterRequired(false)
.setQueryPartitionFilterRequiredCommonFields(null)
.setQueryPartitionPruningRequired(false));
}

@Test
Expand Down Expand Up @@ -97,6 +99,8 @@ public void testExplicitPropertyMappings()
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.sorted-writing-enabled", "false")
.put("iceberg.query-partition-filter-required", "true")
.put("iceberg.query-partition-filter-required-common-fields", "log_ts,timestamp")
.put("iceberg.query-partition-pruning-required", "true")
.buildOrThrow();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -123,7 +127,9 @@ public void testExplicitPropertyMappings()
.setMaterializedViewsStorageSchema("mv_storage_schema")
.setRegisterTableProcedureEnabled(true)
.setSortedWritingEnabled(false)
.setQueryPartitionFilterRequired(true);
.setQueryPartitionFilterRequired(true)
.setQueryPartitionFilterRequiredCommonFields("log_ts,timestamp")
.setQueryPartitionPruningRequired(true);

assertFullMapping(properties, expected);
}
Expand Down
Loading