Skip to content

Commit

Permalink
Convert discrete domain to Iceberg IN expression
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Nov 23, 2021
1 parent 646591b commit 0281721
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
Expand All @@ -36,13 +35,15 @@
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
Expand All @@ -59,11 +60,11 @@
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.greaterThan;
import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.in;
import static org.apache.iceberg.expressions.Expressions.isNull;
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.not;
import static org.apache.iceberg.expressions.Expressions.or;

public final class ExpressionConverter
{
Expand Down Expand Up @@ -106,20 +107,22 @@ private static Expression toIcebergExpression(String columnName, Type type, Doma
throw new UnsupportedOperationException("Unsupported type for expression: " + type);
}

ValueSet domainValues = domain.getValues();
Expression expression = null;
if (domain.isNullAllowed()) {
expression = isNull(columnName);
}

if (type.isOrderable()) {
List<Range> orderedRanges = domainValues.getRanges().getOrderedRanges();
expression = firstNonNull(expression, alwaysFalse());

List<Range> orderedRanges = domain.getValues().getRanges().getOrderedRanges();
List<Object> icebergValues = new ArrayList<>();
List<Expression> rangeExpressions = new ArrayList<>();
for (Range range : orderedRanges) {
expression = or(expression, toIcebergExpression(columnName, range));
if (range.isSingleValue()) {
icebergValues.add(getIcebergLiteralValue(type, range.getLowBoundedValue()));
}
else {
rangeExpressions.add(toIcebergExpression(columnName, range));
}
}
return expression;
Expression ranges = or(rangeExpressions);
Expression values = icebergValues.isEmpty() ? alwaysFalse() : in(columnName, icebergValues);
Expression nullExpression = domain.isNullAllowed() ? isNull(columnName) : alwaysFalse();
return or(nullExpression, or(values, ranges));
}

throw new VerifyException(format("Unsupported type %s with domain values %s", type, domain));
Expand Down Expand Up @@ -228,4 +231,21 @@ private static Object getIcebergLiteralValue(Type type, Object trinoNativeValue)

throw new UnsupportedOperationException("Unsupported type: " + type);
}

private static Expression or(Expression left, Expression right)
{
return Expressions.or(left, right);
}

private static Expression or(List<Expression> expressions)
{
if (expressions.isEmpty()) {
return alwaysFalse();
}
if (expressions.size() == 1) {
return getOnlyElement(expressions);
}
int mid = expressions.size() / 2;
return or(or(expressions.subList(0, mid)), or(expressions.subList(mid, expressions.size())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,6 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
Set<Integer> partitionSourceIds = identityPartitionColumnsInAllSpecs(icebergTable);
BiPredicate<IcebergColumnHandle, Domain> isIdentityPartition = (column, domain) -> partitionSourceIds.contains(column.getId());

// TODO: Avoid enforcing the constraint when partition filters have large IN expressions, since iceberg cannot
// support it. Such large expressions cannot be simplified since simplification changes the filtered set.
TupleDomain<IcebergColumnHandle> newEnforcedConstraint = constraint.getSummary()
.transformKeys(IcebergColumnHandle.class::cast)
.filter(isIdentityPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ public void testSchemaEvolution()
}

@Test
public void testLargeInFailureOnPartitionedColumns()
public void testLargeInOnPartitionedColumns()
{
assertUpdate("CREATE TABLE test_large_in_failure (col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col2'])");
assertUpdate("INSERT INTO test_large_in_failure VALUES (1, 10)", 1L);
Expand All @@ -1075,11 +1075,9 @@ public void testLargeInFailureOnPartitionedColumns()
List<String> predicates = IntStream.range(0, 25_000).boxed()
.map(Object::toString)
.collect(toImmutableList());

String filter = format("col2 IN (%s)", join(",", predicates));
assertThatThrownBy(() -> getQueryRunner().execute(format("SELECT * FROM test_large_in_failure WHERE %s", filter)))
.isInstanceOf(RuntimeException.class)
.hasMessage("java.lang.StackOverflowError");
assertThat(query("SELECT * FROM test_large_in_failure WHERE " + filter))
.matches("TABLE test_large_in_failure");

dropTable("test_large_in_failure");
}
Expand Down

0 comments on commit 0281721

Please sign in to comment.