Skip to content

Commit

Permalink
Unwrap timestamptz cast to date in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jun 21, 2022
1 parent d4c7da8 commit d6f2da1
Show file tree
Hide file tree
Showing 4 changed files with 573 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.expression.FunctionName;
import io.trino.spi.expression.Variable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DateType;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;

import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.base.expression.ConnectorExpressions.and;
import static io.trino.plugin.base.expression.ConnectorExpressions.extractConjuncts;
import static io.trino.spi.expression.StandardFunctions.CAST_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.GREATER_THAN_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.GREATER_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.IS_DISTINCT_FROM_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.LESS_THAN_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.LESS_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.NOT_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_DAY;
import static java.util.Objects.requireNonNull;

public final class ConstraintExtractor
{
private ConstraintExtractor() {}

public static ExtractionResult extractTupleDomain(Constraint constraint)
{
TupleDomain<IcebergColumnHandle> result = constraint.getSummary()
.transformKeys(IcebergColumnHandle.class::cast);
ImmutableList.Builder<ConnectorExpression> remainingExpressions = ImmutableList.builder();
for (ConnectorExpression conjunct : extractConjuncts(constraint.getExpression())) {
Optional<TupleDomain<IcebergColumnHandle>> converted = toTupleDomain(conjunct, constraint.getAssignments());
if (converted.isEmpty()) {
remainingExpressions.add(conjunct);
}
else {
result = result.intersect(converted.get());
if (result.isNone()) {
return new ExtractionResult(TupleDomain.none(), Constant.TRUE);
}
}
}
return new ExtractionResult(result, and(remainingExpressions.build()));
}

private static Optional<TupleDomain<IcebergColumnHandle>> toTupleDomain(ConnectorExpression expression, Map<String, ColumnHandle> assignments)
{
if (expression instanceof Call) {
return toTupleDomain((Call) expression, assignments);
}
return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> toTupleDomain(Call call, Map<String, ColumnHandle> assignments)
{
if (call.getArguments().size() == 2) {
ConnectorExpression firstArgument = call.getArguments().get(0);
ConnectorExpression secondArgument = call.getArguments().get(1);

// Note: CanonicalizeExpressionRewriter ensures that constants are the second comparison argument.

if (firstArgument instanceof Call && ((Call) firstArgument).getFunctionName().equals(CAST_FUNCTION_NAME) &&
secondArgument instanceof Constant &&
// if type do no match, this cannot be a comparison function
firstArgument.getType().equals(secondArgument.getType())) {
return unwrapCastInComparison(
call.getFunctionName(),
getOnlyElement(((Call) firstArgument).getArguments()),
(Constant) secondArgument,
assignments);
}
}

return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> unwrapCastInComparison(
// upon invocation, we don't know if this really is a comparison
FunctionName functionName,
ConnectorExpression castSource,
Constant constant,
Map<String, ColumnHandle> assignments)
{
if (!(castSource instanceof Variable)) {
// Engine unwraps casts in comparisons in UnwrapCastInComparison. Within a connector we can do more than
// engine only for source columns. We cannot draw many conclusions for intermediate expressions without
// knowing them well.
return Optional.empty();
}

if (constant.getValue() == null) {
// Comparisons with NULL should be simplified by the engine
return Optional.empty();
}

IcebergColumnHandle column = resolve((Variable) castSource, assignments);
if (column.getType() instanceof TimestampWithTimeZoneType) {
// Iceberg supports only timestamp(6) with time zone
checkArgument(((TimestampWithTimeZoneType) column.getType()).getPrecision() == 6, "Unexpected type: %s", column.getType());

if (constant.getType() == DateType.DATE) {
return unwrapTimestampTzToDateCast(column, functionName, (long) constant.getValue())
.map(domain -> TupleDomain.withColumnDomains(ImmutableMap.of(column, domain)));
}
// TODO support timestamp constant
}

return Optional.empty();
}

private static Optional<Domain> unwrapTimestampTzToDateCast(IcebergColumnHandle column, FunctionName functionName, long date)
{
Type type = column.getType();
checkArgument(type.equals(TIMESTAMP_TZ_MICROS), "Column of unexpected type %s: %s ", type, column);

// Verify no overflow. Date values must be in integer range.
verify(date <= Integer.MAX_VALUE, "Date value out of range: %s", date);

// In Iceberg, timestamp with time zone values are all in UTC

LongTimestampWithTimeZone startOfDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction(date * MILLISECONDS_PER_DAY, 0, UTC_KEY);
LongTimestampWithTimeZone startOfNextDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction((date + 1) * MILLISECONDS_PER_DAY, 0, UTC_KEY);

if (functionName.equals(EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.range(type, startOfDate, true, startOfNextDate, false)), false));
}
if (functionName.equals(NOT_EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfDate), Range.greaterThanOrEqual(type, startOfNextDate)), false));
}
if (functionName.equals(LESS_THAN_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfDate)), false));
}
if (functionName.equals(LESS_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfNextDate)), false));
}
if (functionName.equals(GREATER_THAN_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.greaterThanOrEqual(type, startOfNextDate)), false));
}
if (functionName.equals(GREATER_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.greaterThanOrEqual(type, startOfDate)), false));
}
if (functionName.equals(IS_DISTINCT_FROM_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfDate), Range.greaterThanOrEqual(type, startOfNextDate)), true));
}

return Optional.empty();
}

private static IcebergColumnHandle resolve(Variable variable, Map<String, ColumnHandle> assignments)
{
ColumnHandle columnHandle = assignments.get(variable.getName());
checkArgument(columnHandle != null, "No assignment for %s", variable);
return (IcebergColumnHandle) columnHandle;
}

public static class ExtractionResult
{
private final TupleDomain<IcebergColumnHandle> tupleDomain;
private final ConnectorExpression remainingExpression;

public ExtractionResult(TupleDomain<IcebergColumnHandle> tupleDomain, ConnectorExpression remainingExpression)
{
this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
this.remainingExpression = requireNonNull(remainingExpression, "remainingExpression is null");
}

public TupleDomain<IcebergColumnHandle> getTupleDomain()
{
return tupleDomain;
}

public ConnectorExpression getRemainingExpression()
{
return remainingExpression;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import static io.trino.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns;
import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables;
import static io.trino.plugin.hive.util.HiveUtil.isStructuralType;
import static io.trino.plugin.iceberg.ConstraintExtractor.extractTupleDomain;
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_ID;
import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_UPDATE_ROW_ID_COLUMN_NAME;
Expand Down Expand Up @@ -1719,43 +1720,54 @@ public void rollback()
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
TupleDomain<ColumnHandle> predicate = constraint.getSummary();
ConstraintExtractor.ExtractionResult extractionResult = extractTupleDomain(constraint);
TupleDomain<IcebergColumnHandle> predicate = extractionResult.getTupleDomain();
if (predicate.isAll()) {
return Optional.empty();
}

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

Map<IcebergColumnHandle, Domain> unsupported = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newEnforced = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newUnenforced = new LinkedHashMap<>();
Map<ColumnHandle, Domain> domains = predicate.getDomains().orElseThrow(() -> new IllegalArgumentException("constraint summary is NONE"));
domains.forEach((column, domain) -> {
IcebergColumnHandle columnHandle = (IcebergColumnHandle) column;
// Iceberg metadata columns can not be used to filter a table scan in Iceberg library
// TODO (https://github.com/trinodb/trino/issues/8759) structural types cannot be used to filter a table scan in Iceberg library.
if (isMetadataColumnId(columnHandle.getId()) || isStructuralType(columnHandle.getType()) ||
// Iceberg orders UUID values differently than Trino (perhaps due to https://bugs.openjdk.org/browse/JDK-7025832), so allow only IS NULL / IS NOT NULL checks
(columnHandle.getType() == UUID && !(domain.isOnlyNull() || domain.getValues().isAll()))) {
unsupported.put(columnHandle, domain);
}
else if (canEnforceColumnConstraintInAllSpecs(typeOperators, icebergTable, columnHandle, domain)) {
newEnforced.put(columnHandle, domain);
}
else {
newUnenforced.put(columnHandle, domain);
}
});
TupleDomain<IcebergColumnHandle> newEnforcedConstraint;
TupleDomain<IcebergColumnHandle> newUnenforcedConstraint;
TupleDomain<IcebergColumnHandle> remainingConstraint;
if (predicate.isNone()) {
// Engine does not pass none Constraint.summary. It can become none when combined with the expression and connector's domain knowledge.
newEnforcedConstraint = TupleDomain.none();
newUnenforcedConstraint = TupleDomain.all();
remainingConstraint = TupleDomain.all();
}
else {
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());

Map<IcebergColumnHandle, Domain> unsupported = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newEnforced = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> newUnenforced = new LinkedHashMap<>();
Map<IcebergColumnHandle, Domain> domains = predicate.getDomains().orElseThrow(() -> new VerifyException("No domains"));
domains.forEach((columnHandle, domain) -> {
// Iceberg metadata columns can not be used to filter a table scan in Iceberg library
// TODO (https://github.com/trinodb/trino/issues/8759) structural types cannot be used to filter a table scan in Iceberg library.
if (isMetadataColumnId(columnHandle.getId()) || isStructuralType(columnHandle.getType()) ||
// Iceberg orders UUID values differently than Trino (perhaps due to https://bugs.openjdk.org/browse/JDK-7025832), so allow only IS NULL / IS NOT NULL checks
(columnHandle.getType() == UUID && !(domain.isOnlyNull() || domain.getValues().isAll()))) {
unsupported.put(columnHandle, domain);
}
else if (canEnforceColumnConstraintInAllSpecs(typeOperators, icebergTable, columnHandle, domain)) {
newEnforced.put(columnHandle, domain);
}
else {
newUnenforced.put(columnHandle, domain);
}
});

TupleDomain<IcebergColumnHandle> newEnforcedConstraint = TupleDomain.withColumnDomains(newEnforced).intersect(table.getEnforcedPredicate());
TupleDomain<IcebergColumnHandle> newUnenforcedConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(table.getUnenforcedPredicate());
newEnforcedConstraint = TupleDomain.withColumnDomains(newEnforced).intersect(table.getEnforcedPredicate());
newUnenforcedConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(table.getUnenforcedPredicate());
remainingConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(TupleDomain.withColumnDomains(unsupported));
}

if (newEnforcedConstraint.equals(table.getEnforcedPredicate())
&& newUnenforcedConstraint.equals(table.getUnenforcedPredicate())) {
return Optional.empty();
}

TupleDomain<IcebergColumnHandle> remainingConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(TupleDomain.withColumnDomains(unsupported));
return Optional.of(new ConstraintApplicationResult<>(
new IcebergTableHandle(
table.getSchemaName(),
Expand All @@ -1776,6 +1788,7 @@ else if (canEnforceColumnConstraintInAllSpecs(typeOperators, icebergTable, colum
table.isRecordScannedFiles(),
table.getMaxScannedFileSize()),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
extractionResult.getRemainingExpression(),
false));
}

Expand Down
Loading

0 comments on commit d6f2da1

Please sign in to comment.