Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned table tests and fixed #9757

Merged
merged 8 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,25 @@ private String formatRowElement(Object value)
static AssertProvider<QueryAssert> newQueryAssert(String query, QueryRunner runner, Session session)
{
MaterializedResult result = runner.execute(session, query);
return () -> new QueryAssert(runner, session, query, result);
return () -> new QueryAssert(runner, session, query, result, false, false, false);
}

public QueryAssert(QueryRunner runner, Session session, String query, MaterializedResult actual)
private QueryAssert(
QueryRunner runner,
Session session,
String query,
MaterializedResult actual,
boolean ordered,
boolean skipTypesCheck,
boolean skipResultsCorrectnessCheckForPushdown)
{
super(actual, Object.class);
this.runner = requireNonNull(runner, "runner is null");
this.session = requireNonNull(session, "session is null");
this.query = requireNonNull(query, "query is null");
this.ordered = ordered;
this.skipTypesCheck = skipTypesCheck;
this.skipResultsCorrectnessCheckForPushdown = skipResultsCorrectnessCheckForPushdown;
}

public QueryAssert projected(int... columns)
Expand All @@ -282,7 +292,10 @@ public QueryAssert projected(int... columns)
.collect(toImmutableList()),
IntStream.of(columns)
.mapToObj(actual.getTypes()::get)
.collect(toImmutableList())));
.collect(toImmutableList())),
ordered,
skipTypesCheck,
skipResultsCorrectnessCheckForPushdown);
}

public QueryAssert matches(BiFunction<Session, QueryRunner, MaterializedResult> evaluator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.airlift.slice.Slices;
import io.trino.spi.block.Block;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.LongTimestamp;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;

Expand Down Expand Up @@ -307,6 +309,12 @@ else if (value instanceof Block) {
else if (value instanceof Slice) {
completedBytes += ((Slice) value).length();
}
else if (value instanceof LongTimestamp) {
completedBytes += LongTimestamp.INSTANCE_SIZE;
}
else if (value instanceof LongTimestampWithTimeZone) {
completedBytes += LongTimestampWithTimeZone.INSTANCE_SIZE;
}
else {
throw new IllegalArgumentException("Unknown type: " + value.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
*/
package io.trino.spi.type;

import org.openjdk.jol.info.ClassLayout;

import java.util.Objects;

import static io.trino.spi.type.Timestamps.formatTimestamp;

public final class LongTimestamp
implements Comparable<LongTimestamp>
{
public static final int INSTANCE_SIZE = ClassLayout.parseClass(LongTimestamp.class).instanceSize();

private static final int PICOSECONDS_PER_MICROSECOND = 1_000_000;

private final long epochMicros;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
*/
package io.trino.spi.type;

import org.openjdk.jol.info.ClassLayout;

import java.util.Objects;

import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MILLISECOND;

public final class LongTimestampWithTimeZone
implements Comparable<LongTimestampWithTimeZone>
{
public static final int INSTANCE_SIZE = ClassLayout.parseClass(LongTimestampWithTimeZone.class).instanceSize();

private final long epochMillis;
private final int picosOfMilli; // number of picoseconds of the millisecond corresponding to epochMillis
private final short timeZoneKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
Object prestoValue = deserializePartitionValue(
column.getType(),
partitionColumnValueStrings.get(columnId).orElse(null),
column.getName(),
session.getTimeZoneKey());
column.getName());

return NullableValue.of(column.getType(), prestoValue);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;

import java.io.IOException;
Expand All @@ -44,8 +43,7 @@ public class IcebergPageSource
public IcebergPageSource(
List<IcebergColumnHandle> columns,
Map<Integer, Optional<String>> partitionKeys,
ConnectorPageSource delegate,
TimeZoneKey timeZoneKey)
ConnectorPageSource delegate)
{
int size = requireNonNull(columns, "columns is null").size();
requireNonNull(partitionKeys, "partitionKeys is null");
Expand All @@ -60,7 +58,7 @@ public IcebergPageSource(
if (partitionKeys.containsKey(column.getId())) {
String partitionValue = partitionKeys.get(column.getId()).orElse(null);
Type type = column.getType();
Object prefilledValue = deserializePartitionValue(type, partitionValue, column.getName(), timeZoneKey);
Object prefilledValue = deserializePartitionValue(type, partitionValue, column.getName());
prefilledBlocks[outputIndex] = Utils.nativeValueToBlock(type, prefilledValue);
delegateIndexes[outputIndex] = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public ConnectorPageSource createPageSource(
regularColumns,
effectivePredicate);

return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource);
}

private ConnectorPageSource createDataPageSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public ConnectorSplitSource getSplits(
identityPartitionColumns,
tableScan,
dynamicFilter,
session.getTimeZoneKey(),
dynamicFilteringWaitTimeout);

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableScan;
Expand All @@ -49,6 +48,7 @@

import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergSplitManager.ICEBERG_DOMAIN_COMPACTION_THRESHOLD;
import static io.trino.plugin.iceberg.IcebergTypes.convertIcebergValueToTrino;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
Expand All @@ -70,7 +70,6 @@ public class IcebergSplitSource
private final TableScan tableScan;
private final Map<Integer, Type.PrimitiveType> fieldIdToType;
private final DynamicFilter dynamicFilter;
private final TimeZoneKey sessionZone;
private final long dynamicFilteringWaitTimeoutMillis;
private final Stopwatch dynamicFilterWaitStopwatch;

Expand All @@ -83,15 +82,13 @@ public IcebergSplitSource(
Set<IcebergColumnHandle> identityPartitionColumns,
TableScan tableScan,
DynamicFilter dynamicFilter,
TimeZoneKey sessionZone,
Duration dynamicFilteringWaitTimeout)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.identityPartitionColumns = requireNonNull(identityPartitionColumns, "identityPartitionColumns is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
this.fieldIdToType = primitiveFieldTypes(tableScan.schema());
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.sessionZone = requireNonNull(sessionZone, "sessionZone is null");
this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis();
this.dynamicFilterWaitStopwatch = Stopwatch.createStarted();
}
Expand Down Expand Up @@ -158,8 +155,7 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
if (!partitionMatchesPredicate(
identityPartitionColumns,
icebergSplit.getPartitionKeys(),
dynamicFilterPredicate,
sessionZone)) {
dynamicFilterPredicate)) {
continue;
}
if (!fileMatchesPredicate(
Expand Down Expand Up @@ -246,16 +242,16 @@ private static Domain domainForStatistics(io.trino.spi.type.Type type, Object lo
if (lowerBound != null && upperBound != null) {
statisticsRange = Range.range(
type,
PartitionTable.convert(lowerBound, icebergType),
convertIcebergValueToTrino(icebergType, lowerBound),
true,
PartitionTable.convert(upperBound, icebergType),
convertIcebergValueToTrino(icebergType, upperBound),
true);
}
else if (upperBound != null) {
statisticsRange = Range.lessThanOrEqual(type, PartitionTable.convert(upperBound, icebergType));
statisticsRange = Range.lessThanOrEqual(type, convertIcebergValueToTrino(icebergType, upperBound));
}
else {
statisticsRange = Range.greaterThanOrEqual(type, PartitionTable.convert(lowerBound, icebergType));
statisticsRange = Range.greaterThanOrEqual(type, convertIcebergValueToTrino(icebergType, lowerBound));
}
return Domain.create(ValueSet.ofRanges(statisticsRange), containsNulls);
}
Expand All @@ -264,8 +260,7 @@ else if (upperBound != null) {
static boolean partitionMatchesPredicate(
Set<IcebergColumnHandle> identityPartitionColumns,
Map<Integer, Optional<String>> partitionKeys,
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate,
TimeZoneKey timeZoneKey)
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate)
{
if (dynamicFilterPredicate.isNone()) {
return false;
Expand All @@ -275,7 +270,7 @@ static boolean partitionMatchesPredicate(
for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
Domain allowedDomain = domains.get(partitionColumn);
if (allowedDomain != null) {
Object partitionValue = deserializePartitionValue(partitionColumn.getType(), partitionKeys.get(partitionColumn.getId()).orElse(null), partitionColumn.getName(), timeZoneKey);
Object partitionValue = deserializePartitionValue(partitionColumn.getType(), partitionKeys.get(partitionColumn.getId()).orElse(null), partitionColumn.getName());
if (!allowedDomain.includesNullableValue(partitionValue)) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.UuidType;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.UUID;

import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzFromMicros;
import static io.trino.spi.type.Decimals.isShortDecimal;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;

public final class IcebergTypes
{
private IcebergTypes() {}

/**
* Convert value from Iceberg representation to Trino representation.
*/
public static Object convertIcebergValueToTrino(Type icebergType, Object value)
{
if (value == null) {
return null;
}
if (icebergType instanceof Types.BooleanType) {
//noinspection RedundantCast
return (Boolean) value;
}
if (icebergType instanceof Types.IntegerType) {
return ((Integer) value).longValue();
}
if (icebergType instanceof Types.LongType) {
//noinspection RedundantCast
return (Long) value;
}
if (icebergType instanceof Types.FloatType) {
return (long) Float.floatToIntBits((Float) value);
}
if (icebergType instanceof Types.DoubleType) {
//noinspection RedundantCast
return (Double) value;
}
if (icebergType instanceof Types.DecimalType) {
Types.DecimalType icebergDecimalType = (Types.DecimalType) icebergType;
DecimalType trinoDecimalType = DecimalType.createDecimalType(icebergDecimalType.precision(), icebergDecimalType.scale());
if (isShortDecimal(trinoDecimalType)) {
return Decimals.encodeShortScaledValue((BigDecimal) value, trinoDecimalType.getScale());
}
return Decimals.encodeScaledValue((BigDecimal) value, trinoDecimalType.getScale());
}
if (icebergType instanceof Types.StringType) {
// Partition values are passed as String, but min/max values are passed as a CharBuffer
if (value instanceof CharBuffer) {
value = new String(((CharBuffer) value).array());
}
return utf8Slice(((String) value));
}
if (icebergType instanceof Types.BinaryType) {
// TODO the client sees the bytearray's tostring ouput instead of seeing actual bytes, needs to be fixed.
// TODO return Slice
return ((ByteBuffer) value).array().clone();
}
if (icebergType instanceof Types.DateType) {
return ((Integer) value).longValue();
}
if (icebergType instanceof Types.TimeType) {
return Math.multiplyExact((Long) value, PICOSECONDS_PER_MICROSECOND);
}
if (icebergType instanceof Types.TimestampType) {
long epochMicros = (long) value;
if (((Types.TimestampType) icebergType).shouldAdjustToUTC()) {
return timestampTzFromMicros(epochMicros);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does shouldAdjustToUTC mean here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it means "this is timestamptz, not timestamp type".
seems the two are conflated into Types.TimestampType class

}
return epochMicros;
}
if (icebergType instanceof Types.UUIDType) {
return UuidType.javaUuidToTrinoUuid((UUID) value);
}

throw new UnsupportedOperationException("Unsupported iceberg type: " + icebergType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.UuidType;
Expand Down Expand Up @@ -231,7 +230,7 @@ private static String quotedName(String name)
return '"' + name.replace("\"", "\"\"") + '"';
}

public static Object deserializePartitionValue(Type type, String valueString, String name, TimeZoneKey timeZoneKey)
public static Object deserializePartitionValue(Type type, String valueString, String name)
{
if (valueString == null) {
return null;
Expand Down Expand Up @@ -269,7 +268,7 @@ public static Object deserializePartitionValue(Type type, String valueString, St
return parseLong(valueString);
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return timestampTzFromMicros(parseLong(valueString), timeZoneKey);
return timestampTzFromMicros(parseLong(valueString));
}
if (type instanceof VarcharType) {
Slice value = utf8Slice(valueString);
Expand Down
Loading