Skip to content

Commit

Permalink
Subsume non-identity partition predicate in Iceberg
Browse files Browse the repository at this point in the history
Before the change, Iceberg connector accepted all predicates expressible
as `TupleDomain` on primitive columns and they were used to filter data
files during split generation. However, only predicates defined on
identity partitioning columns were subsumed into connector.

This commit extends Iceberg capabilities to subsume predicates on
partitioning columns. Besides subsuming predicates on identity
partitioning columns, it also subsumes predicates if they align with
partitioning boundaries. For example, for `truncate(col, 2)` (round to
100s) partitioning, predicates `col >= 1200` OR `col > 1199` are
subsumed, while `col > 1200` or `col > 1250` are not.

This change is especially important for Iceberg OPTIMIZE table
procedure, which requires the `WHERE` condition to be fully subsumed
into the connector. It is also helpful for `DELETE`, as it allows to do
metadata-only delete in more cases, where we don't really needing to do
a row-level delete.
  • Loading branch information
findepi committed Jun 20, 2022
1 parent 5ab1f58 commit d4a743c
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -128,13 +129,13 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -174,6 +175,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInAllSpecs;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.getColumns;
Expand Down Expand Up @@ -230,6 +232,7 @@ public class IcebergMetadata
public static final String ORC_BLOOM_FILTER_FPP_KEY = "orc.bloom.filter.fpp";

private final TypeManager typeManager;
private final TypeOperators typeOperators;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalog catalog;
private final HdfsEnvironment hdfsEnvironment;
Expand All @@ -240,11 +243,13 @@ public class IcebergMetadata

public IcebergMetadata(
TypeManager typeManager,
TypeOperators typeOperators,
JsonCodec<CommitTaskData> commitTaskCodec,
TrinoCatalog catalog,
HdfsEnvironment hdfsEnvironment)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.typeOperators = requireNonNull(typeOperators, "typeOperators is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalog = requireNonNull(catalog, "catalog is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand Down Expand Up @@ -1715,32 +1720,34 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
IcebergTableHandle table = (IcebergTableHandle) handle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

Set<Integer> partitionSourceIds = identityPartitionColumnsInAllSpecs(icebergTable);
BiPredicate<IcebergColumnHandle, Domain> isIdentityPartition = (column, domain) -> partitionSourceIds.contains(column.getId());
// Iceberg metadata columns can not be used in table scans
BiPredicate<IcebergColumnHandle, Domain> isMetadataColumn = (column, domain) -> isMetadataColumnId(column.getId());

TupleDomain<IcebergColumnHandle> newEnforcedConstraint = constraint.getSummary()
.transformKeys(IcebergColumnHandle.class::cast)
.filter(isIdentityPartition)
.intersect(table.getEnforcedPredicate());

TupleDomain<IcebergColumnHandle> remainingConstraint = constraint.getSummary()
.transformKeys(IcebergColumnHandle.class::cast)
.filter(isIdentityPartition.negate());

TupleDomain<IcebergColumnHandle> newUnenforcedConstraint = remainingConstraint
// TODO: Remove after completing https://github.com/trinodb/trino/issues/8759
// Only applies to the unenforced constraint because structural types cannot be partition keys
.filter((columnHandle, predicate) -> !isStructuralType(columnHandle.getType()))
.filter(isMetadataColumn.negate())
.intersect(table.getUnenforcedPredicate());
Map<IcebergColumnHandle, Domain> unsupported = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newEnforced = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newUnenforced = new LinkedHashMap<>();
Map<ColumnHandle, Domain> domains = constraint.getSummary().getDomains().orElseThrow(() -> new IllegalArgumentException("constraint summary is NONE"));
domains.forEach((column, domain) -> {
IcebergColumnHandle columnHandle = (IcebergColumnHandle) column;
// Iceberg metadata columns can not be used to filter a table scan in Iceberg library
// TODO (https://github.com/trinodb/trino/issues/8759) structural types cannot be used to filter a table scan in Iceberg library.
if (isMetadataColumnId(columnHandle.getId()) || isStructuralType(columnHandle.getType())) {
unsupported.put(columnHandle, domain);
}
else if (canEnforceColumnConstraintInAllSpecs(typeOperators, icebergTable, columnHandle, domain)) {
newEnforced.put(columnHandle, domain);
}
else {
newUnenforced.put(columnHandle, domain);
}
});

TupleDomain<IcebergColumnHandle> newEnforcedConstraint = TupleDomain.withColumnDomains(newEnforced).intersect(table.getEnforcedPredicate());
TupleDomain<IcebergColumnHandle> newUnenforcedConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(table.getUnenforcedPredicate());

if (newEnforcedConstraint.equals(table.getEnforcedPredicate())
&& newUnenforcedConstraint.equals(table.getUnenforcedPredicate())) {
return Optional.empty();
}

TupleDomain<IcebergColumnHandle> remainingConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(TupleDomain.withColumnDomains(unsupported));
return Optional.of(new ConstraintApplicationResult<>(
new IcebergTableHandle(
table.getSchemaName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;

import javax.inject.Inject;

Expand All @@ -26,6 +27,7 @@
public class IcebergMetadataFactory
{
private final TypeManager typeManager;
private final TypeOperators typeOperators;
private final JsonCodec<CommitTaskData> commitTaskCodec;
private final TrinoCatalogFactory catalogFactory;
private final HdfsEnvironment hdfsEnvironment;
Expand All @@ -38,13 +40,15 @@ public IcebergMetadataFactory(
HdfsEnvironment hdfsEnvironment)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
// TODO consider providing TypeOperators in ConnectorContext to increase cache reuse
this.typeOperators = new TypeOperators();
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
}

public IcebergMetadata create(ConnectorIdentity identity)
{
return new IcebergMetadata(typeManager, commitTaskCodec, catalogFactory.create(identity), hdfsEnvironment);
return new IcebergMetadata(typeManager, typeOperators, commitTaskCodec, catalogFactory.create(identity), hdfsEnvironment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.slice.Slice;
import io.airlift.slice.SliceUtf8;
import io.airlift.slice.Slices;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
Expand All @@ -30,10 +31,15 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.function.InvocationConvention;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Int128;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeOperators;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand All @@ -55,6 +61,7 @@
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;

import java.lang.invoke.MethodHandle;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
Expand All @@ -72,6 +79,7 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand All @@ -95,13 +103,18 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields;
import static io.trino.plugin.iceberg.TrinoTypes.getNextValue;
import static io.trino.plugin.iceberg.TrinoTypes.getPreviousValue;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergType;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.NEVER_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateType.DATE;
Expand All @@ -121,6 +134,7 @@
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.LocationProviders.locationsFor;
Expand Down Expand Up @@ -304,6 +318,109 @@ private static String quotedName(String name)
return '"' + name.replace("\"", "\"\"") + '"';
}

public static boolean canEnforceColumnConstraintInAllSpecs(TypeOperators typeOperators, Table table, IcebergColumnHandle columnHandle, Domain domain)
{
return table.specs().values().stream()
.allMatch(spec -> canEnforceConstraintWithinPartitioningSpec(typeOperators, spec, columnHandle, domain));
}

private static boolean canEnforceConstraintWithinPartitioningSpec(TypeOperators typeOperators, PartitionSpec spec, IcebergColumnHandle column, Domain domain)
{
for (PartitionField field : spec.getFieldsBySourceId(column.getId())) {
if (canEnforceConstraintWithPartitionField(typeOperators, field, column, domain)) {
return true;
}
}
return false;
}

private static boolean canEnforceConstraintWithPartitionField(TypeOperators typeOperators, PartitionField field, IcebergColumnHandle column, Domain domain)
{
if (field.transform().toString().equals("void")) {
// Useless for filtering.
return false;
}
if (field.transform().isIdentity()) {
// A predicate on an identity partitioning column can always be enforced.
return true;
}

ColumnTransform transform = PartitionTransforms.getColumnTransform(field, column.getType());
if (transform.preservesNonNull()) {
// Partitioning transform must return NULL for NULL input.
// Below we assume it never returns NULL for non-NULL input,
// so NULL values and non-NULL values are always segregated.
// In practice, this condition matches the void transform only,
// which isn't useful for filtering anyway.
return false;
}
ValueSet valueSet = domain.getValues();

boolean canEnforce = valueSet.getValuesProcessor().transform(
ranges -> {
MethodHandle targetTypeEqualOperator = typeOperators.getEqualOperator(
transform.getType(), InvocationConvention.simpleConvention(FAIL_ON_NULL, NEVER_NULL, NEVER_NULL));
for (Range range : ranges.getOrderedRanges()) {
if (!canEnforceRangeWithPartitioningField(field, transform, range, targetTypeEqualOperator)) {
return false;
}
}
return true;
},
discreteValues -> false,
allOrNone -> true);
return canEnforce;
}

private static boolean canEnforceRangeWithPartitioningField(PartitionField field, ColumnTransform transform, Range range, MethodHandle targetTypeEqualOperator)
{
if (!transform.isMonotonic()) {
// E.g. bucketing transform
return false;
}
io.trino.spi.type.Type type = range.getType();
if (!type.isOrderable()) {
return false;
}
if (!range.isLowUnbounded()) {
Object boundedValue = range.getLowBoundedValue();
Optional<Object> adjacentValue = range.isLowInclusive() ? getPreviousValue(type, boundedValue) : getNextValue(type, boundedValue);
if (adjacentValue.isEmpty() || yieldSamePartitioningValue(field, transform, type, boundedValue, adjacentValue.get(), targetTypeEqualOperator)) {
return false;
}
}
if (!range.isHighUnbounded()) {
Object boundedValue = range.getHighBoundedValue();
Optional<Object> adjacentValue = range.isHighInclusive() ? getNextValue(type, boundedValue) : getPreviousValue(type, boundedValue);
if (adjacentValue.isEmpty() || yieldSamePartitioningValue(field, transform, type, boundedValue, adjacentValue.get(), targetTypeEqualOperator)) {
return false;
}
}
return true;
}

private static boolean yieldSamePartitioningValue(
PartitionField field,
ColumnTransform transform,
io.trino.spi.type.Type sourceType,
Object first,
Object second,
MethodHandle targetTypeEqualOperator)
{
requireNonNull(first, "first is null");
requireNonNull(second, "second is null");
Object firstTransformed = transform.getValueTransform().apply(nativeValueToBlock(sourceType, first), 0);
Object secondTransformed = transform.getValueTransform().apply(nativeValueToBlock(sourceType, second), 0);
// The pushdown logic assumes NULLs and non-NULLs are segregated, so that we have to think about non-null values only.
verify(firstTransformed != null && secondTransformed != null, "Transform for %s returned null for non-null input", field);
try {
return (boolean) targetTypeEqualOperator.invoke(firstTransformed, secondTransformed);
}
catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}

public static Object deserializePartitionValue(Type type, String valueString, String name)
{
if (valueString == null) {
Expand Down
Loading

0 comments on commit d4a743c

Please sign in to comment.