Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Iceberg OPTIMIZE with WHERE casting timestamp_tz column to a date #12918

Merged
merged 7 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import io.trino.sql.tree.StringLiteral;
import io.trino.sql.tree.TimestampLiteral;

import javax.annotation.Nullable;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -99,7 +101,7 @@ public List<Expression> toExpressions(Session session, List<?> objects, List<? e
return expressions.build();
}

public Expression toExpression(Session session, Object object, Type type)
public Expression toExpression(Session session, @Nullable Object object, Type type)
{
requireNonNull(type, "type is null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,27 +426,28 @@ public static TupleDomain<ColumnHandle> computeEnforced(TupleDomain<ColumnHandle
// 3. When the connector could enforce all of the domains, the unenforced would be TupleDomain.all().

// In all 3 cases shown above, the unenforced is not TupleDomain.none().
checkArgument(!unenforced.isNone());
checkArgument(!unenforced.isNone(), "Unexpected unenforced none tuple domain");

Map<ColumnHandle, Domain> predicateDomains = predicate.getDomains().get();
Map<ColumnHandle, Domain> unenforcedDomains = unenforced.getDomains().get();
ImmutableMap.Builder<ColumnHandle, Domain> enforcedDomainsBuilder = ImmutableMap.builder();
for (Map.Entry<ColumnHandle, Domain> entry : predicateDomains.entrySet()) {
ColumnHandle predicateColumnHandle = entry.getKey();
Domain predicateDomain = entry.getValue();
if (unenforcedDomains.containsKey(predicateColumnHandle)) {
Domain unenforcedDomain = unenforcedDomains.get(predicateColumnHandle);
checkArgument(
entry.getValue().equals(unenforcedDomains.get(predicateColumnHandle)),
"Enforced tuple domain cannot be determined. The connector is expected to enforce the respective domain entirely on none, some, or all of the column.");
predicateDomain.contains(unenforcedDomain),
"Unexpected unenforced domain %s on column %s. Expected all, none, or a domain equal to or narrower than %s",
unenforcedDomain,
predicateColumnHandle,
predicateDomain);
}
else {
enforcedDomainsBuilder.put(predicateColumnHandle, entry.getValue());
enforcedDomainsBuilder.put(predicateColumnHandle, predicateDomain);
}
}
Map<ColumnHandle, Domain> enforcedDomains = enforcedDomainsBuilder.buildOrThrow();
checkArgument(
enforcedDomains.size() + unenforcedDomains.size() == predicateDomains.size(),
"Enforced tuple domain cannot be determined. Connector returned an unenforced TupleDomain that contains columns not in predicate.");
return TupleDomain.withColumnDomains(enforcedDomains);
return TupleDomain.withColumnDomains(enforcedDomainsBuilder.buildOrThrow());
}

private static class SplitExpression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.spi.TrinoException;
import io.trino.spi.function.InvocationConvention;
import io.trino.spi.type.CharType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.LongTimestampWithTimeZone;
Expand Down Expand Up @@ -208,7 +207,7 @@ private Expression unwrapCast(ComparisonExpression expression)
Type targetType = typeAnalyzer.getType(session, types, expression.getRight());

if (sourceType instanceof TimestampType && targetType == DATE) {
return unwrapTimestampToDateCast(session, (TimestampType) sourceType, (DateType) targetType, operator, cast.getExpression(), (long) right).orElse(expression);
return unwrapTimestampToDateCast(session, (TimestampType) sourceType, operator, cast.getExpression(), (long) right).orElse(expression);
}

if (targetType instanceof TimestampWithTimeZoneType) {
Expand Down Expand Up @@ -410,11 +409,11 @@ private Expression unwrapCast(ComparisonExpression expression)
return new ComparisonExpression(operator, cast.getExpression(), literalEncoder.toExpression(session, literalInSourceType, sourceType));
}

private Optional<Expression> unwrapTimestampToDateCast(Session session, TimestampType sourceType, DateType targetType, ComparisonExpression.Operator operator, Expression timestampExpression, long date)
private Optional<Expression> unwrapTimestampToDateCast(Session session, TimestampType sourceType, ComparisonExpression.Operator operator, Expression timestampExpression, long date)
{
ResolvedFunction targetToSource;
try {
targetToSource = plannerContext.getMetadata().getCoercion(session, targetType, sourceType);
targetToSource = plannerContext.getMetadata().getCoercion(session, DATE, sourceType);
}
catch (OperatorNotFoundException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.Type;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;

Expand All @@ -33,12 +35,13 @@ public class Constant
/**
* @param value the value encoded using the native "stack" representation for the given type.
*/
public Constant(Object value, Type type)
public Constant(@Nullable Object value, Type type)
{
super(type);
this.value = value;
}

@Nullable
public Object getValue()
{
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public Pattern<Constant> getPattern()
public Optional<String> rewrite(Constant constant, Captures captures, RewriteContext<String> context)
{
Type type = constant.getType();
if (constant.getValue() == null) {
return Optional.empty();
}
if (type == TINYINT || type == SMALLINT || type == INTEGER || type == BIGINT) {
return Optional.of(Long.toString((long) constant.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public Pattern<Constant> getPattern()
@Override
public Optional<String> rewrite(Constant constant, Captures captures, RewriteContext<String> context)
{
if (constant.getValue() == null) {
return Optional.empty();
}
Slice slice = (Slice) constant.getValue();
return Optional.of("'" + slice.toStringUtf8().replace("'", "''") + "'");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.expression.Call;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Constant;
import io.trino.spi.expression.FunctionName;
import io.trino.spi.expression.Variable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.DateType;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;

import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.base.expression.ConnectorExpressions.and;
import static io.trino.plugin.base.expression.ConnectorExpressions.extractConjuncts;
import static io.trino.spi.expression.StandardFunctions.CAST_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.GREATER_THAN_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.GREATER_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.IS_DISTINCT_FROM_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.LESS_THAN_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.LESS_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.expression.StandardFunctions.NOT_EQUAL_OPERATOR_FUNCTION_NAME;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.Timestamps.MILLISECONDS_PER_DAY;
import static java.util.Objects.requireNonNull;

public final class ConstraintExtractor
{
private ConstraintExtractor() {}

public static ExtractionResult extractTupleDomain(Constraint constraint)
{
TupleDomain<IcebergColumnHandle> result = constraint.getSummary()
.transformKeys(IcebergColumnHandle.class::cast);
ImmutableList.Builder<ConnectorExpression> remainingExpressions = ImmutableList.builder();
for (ConnectorExpression conjunct : extractConjuncts(constraint.getExpression())) {
Optional<TupleDomain<IcebergColumnHandle>> converted = toTupleDomain(conjunct, constraint.getAssignments());
if (converted.isEmpty()) {
remainingExpressions.add(conjunct);
}
else {
result = result.intersect(converted.get());
if (result.isNone()) {
return new ExtractionResult(TupleDomain.none(), Constant.TRUE);
}
}
}
return new ExtractionResult(result, and(remainingExpressions.build()));
}

private static Optional<TupleDomain<IcebergColumnHandle>> toTupleDomain(ConnectorExpression expression, Map<String, ColumnHandle> assignments)
{
if (expression instanceof Call) {
return toTupleDomain((Call) expression, assignments);
}
return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> toTupleDomain(Call call, Map<String, ColumnHandle> assignments)
{
if (call.getArguments().size() == 2) {
ConnectorExpression firstArgument = call.getArguments().get(0);
ConnectorExpression secondArgument = call.getArguments().get(1);

// Note: CanonicalizeExpressionRewriter ensures that constants are the second comparison argument.
findepi marked this conversation as resolved.
Show resolved Hide resolved

if (firstArgument instanceof Call && ((Call) firstArgument).getFunctionName().equals(CAST_FUNCTION_NAME) &&
secondArgument instanceof Constant &&
// if type do no match, this cannot be a comparison function
firstArgument.getType().equals(secondArgument.getType())) {
return unwrapCastInComparison(
call.getFunctionName(),
getOnlyElement(((Call) firstArgument).getArguments()),
(Constant) secondArgument,
assignments);
}
}

return Optional.empty();
}

private static Optional<TupleDomain<IcebergColumnHandle>> unwrapCastInComparison(
// upon invocation, we don't know if this really is a comparison
FunctionName functionName,
ConnectorExpression castSource,
Constant constant,
Map<String, ColumnHandle> assignments)
{
if (!(castSource instanceof Variable)) {
// Engine unwraps casts in comparisons in UnwrapCastInComparison. Within a connector we can do more than
// engine only for source columns. We cannot draw many conclusions for intermediate expressions without
// knowing them well.
return Optional.empty();
}

if (constant.getValue() == null) {
// Comparisons with NULL should be simplified by the engine
return Optional.empty();
}

IcebergColumnHandle column = resolve((Variable) castSource, assignments);
if (column.getType() instanceof TimestampWithTimeZoneType) {
// Iceberg supports only timestamp(6) with time zone
checkArgument(((TimestampWithTimeZoneType) column.getType()).getPrecision() == 6, "Unexpected type: %s", column.getType());

if (constant.getType() == DateType.DATE) {
return unwrapTimestampTzToDateCast(column, functionName, (long) constant.getValue())
.map(domain -> TupleDomain.withColumnDomains(ImmutableMap.of(column, domain)));
}
// TODO support timestamp constant
}

return Optional.empty();
}

private static Optional<Domain> unwrapTimestampTzToDateCast(IcebergColumnHandle column, FunctionName functionName, long date)
{
Type type = column.getType();
checkArgument(type.equals(TIMESTAMP_TZ_MICROS), "Column of unexpected type %s: %s ", type, column);

// Verify no overflow. Date values must be in integer range.
verify(date <= Integer.MAX_VALUE, "Date value out of range: %s", date);

// In Iceberg, timestamp with time zone values are all in UTC

LongTimestampWithTimeZone startOfDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction(date * MILLISECONDS_PER_DAY, 0, UTC_KEY);
LongTimestampWithTimeZone startOfNextDate = LongTimestampWithTimeZone.fromEpochMillisAndFraction((date + 1) * MILLISECONDS_PER_DAY, 0, UTC_KEY);

if (functionName.equals(EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.range(type, startOfDate, true, startOfNextDate, false)), false));
}
if (functionName.equals(NOT_EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfDate), Range.greaterThanOrEqual(type, startOfNextDate)), false));
}
if (functionName.equals(LESS_THAN_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfDate)), false));
}
if (functionName.equals(LESS_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfNextDate)), false));
}
if (functionName.equals(GREATER_THAN_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.greaterThanOrEqual(type, startOfNextDate)), false));
}
if (functionName.equals(GREATER_THAN_OR_EQUAL_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.greaterThanOrEqual(type, startOfDate)), false));
}
if (functionName.equals(IS_DISTINCT_FROM_OPERATOR_FUNCTION_NAME)) {
return Optional.of(Domain.create(ValueSet.ofRanges(Range.lessThan(type, startOfDate), Range.greaterThanOrEqual(type, startOfNextDate)), true));
}

return Optional.empty();
}

private static IcebergColumnHandle resolve(Variable variable, Map<String, ColumnHandle> assignments)
findepi marked this conversation as resolved.
Show resolved Hide resolved
{
ColumnHandle columnHandle = assignments.get(variable.getName());
checkArgument(columnHandle != null, "No assignment for %s", variable);
return (IcebergColumnHandle) columnHandle;
}

public static class ExtractionResult
{
private final TupleDomain<IcebergColumnHandle> tupleDomain;
private final ConnectorExpression remainingExpression;

public ExtractionResult(TupleDomain<IcebergColumnHandle> tupleDomain, ConnectorExpression remainingExpression)
{
this.tupleDomain = requireNonNull(tupleDomain, "tupleDomain is null");
this.remainingExpression = requireNonNull(remainingExpression, "remainingExpression is null");
}

public TupleDomain<IcebergColumnHandle> getTupleDomain()
{
return tupleDomain;
}

public ConnectorExpression getRemainingExpression()
{
return remainingExpression;
}
}
}
Loading