Skip to content

Commit

Permalink
Fix handling of Iceberg timestamptz partition key
Browse files Browse the repository at this point in the history
Before the change, partition key of `timestamp with time zone` type was
returned in session zone, unlike regular data columns. This caused
representability  problems for values within session zone's DST change
backwards. This commit makes partition values to be returned with UTC
zone, just like regular data columns are.
  • Loading branch information
findepi committed Oct 25, 2021
1 parent e5198b8 commit 79ab945
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,7 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
Object prestoValue = deserializePartitionValue(
column.getType(),
partitionColumnValueStrings.get(columnId).orElse(null),
column.getName(),
session.getTimeZoneKey());
column.getName());

return NullableValue.of(column.getType(), prestoValue);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.predicate.Utils;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;

import java.io.IOException;
Expand All @@ -44,8 +43,7 @@ public class IcebergPageSource
public IcebergPageSource(
List<IcebergColumnHandle> columns,
Map<Integer, Optional<String>> partitionKeys,
ConnectorPageSource delegate,
TimeZoneKey timeZoneKey)
ConnectorPageSource delegate)
{
int size = requireNonNull(columns, "columns is null").size();
requireNonNull(partitionKeys, "partitionKeys is null");
Expand All @@ -60,7 +58,7 @@ public IcebergPageSource(
if (partitionKeys.containsKey(column.getId())) {
String partitionValue = partitionKeys.get(column.getId()).orElse(null);
Type type = column.getType();
Object prefilledValue = deserializePartitionValue(type, partitionValue, column.getName(), timeZoneKey);
Object prefilledValue = deserializePartitionValue(type, partitionValue, column.getName());
prefilledBlocks[outputIndex] = Utils.nativeValueToBlock(type, prefilledValue);
delegateIndexes[outputIndex] = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public ConnectorPageSource createPageSource(
regularColumns,
effectivePredicate);

return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource);
}

private ConnectorPageSource createDataPageSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public ConnectorSplitSource getSplits(
identityPartitionColumns,
tableScan,
dynamicFilter,
session.getTimeZoneKey(),
dynamicFilteringWaitTimeout);

return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.predicate.ValueSet;
import io.trino.spi.type.TimeZoneKey;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableScan;
Expand Down Expand Up @@ -71,7 +70,6 @@ public class IcebergSplitSource
private final TableScan tableScan;
private final Map<Integer, Type.PrimitiveType> fieldIdToType;
private final DynamicFilter dynamicFilter;
private final TimeZoneKey sessionZone;
private final long dynamicFilteringWaitTimeoutMillis;
private final Stopwatch dynamicFilterWaitStopwatch;

Expand All @@ -84,15 +82,13 @@ public IcebergSplitSource(
Set<IcebergColumnHandle> identityPartitionColumns,
TableScan tableScan,
DynamicFilter dynamicFilter,
TimeZoneKey sessionZone,
Duration dynamicFilteringWaitTimeout)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.identityPartitionColumns = requireNonNull(identityPartitionColumns, "identityPartitionColumns is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
this.fieldIdToType = primitiveFieldTypes(tableScan.schema());
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
this.sessionZone = requireNonNull(sessionZone, "sessionZone is null");
this.dynamicFilteringWaitTimeoutMillis = requireNonNull(dynamicFilteringWaitTimeout, "dynamicFilteringWaitTimeout is null").toMillis();
this.dynamicFilterWaitStopwatch = Stopwatch.createStarted();
}
Expand Down Expand Up @@ -159,8 +155,7 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
if (!partitionMatchesPredicate(
identityPartitionColumns,
icebergSplit.getPartitionKeys(),
dynamicFilterPredicate,
sessionZone)) {
dynamicFilterPredicate)) {
continue;
}
if (!fileMatchesPredicate(
Expand Down Expand Up @@ -265,8 +260,7 @@ else if (upperBound != null) {
static boolean partitionMatchesPredicate(
Set<IcebergColumnHandle> identityPartitionColumns,
Map<Integer, Optional<String>> partitionKeys,
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate,
TimeZoneKey timeZoneKey)
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate)
{
if (dynamicFilterPredicate.isNone()) {
return false;
Expand All @@ -276,7 +270,7 @@ static boolean partitionMatchesPredicate(
for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
Domain allowedDomain = domains.get(partitionColumn);
if (allowedDomain != null) {
Object partitionValue = deserializePartitionValue(partitionColumn.getType(), partitionKeys.get(partitionColumn.getId()).orElse(null), partitionColumn.getName(), timeZoneKey);
Object partitionValue = deserializePartitionValue(partitionColumn.getType(), partitionKeys.get(partitionColumn.getId()).orElse(null), partitionColumn.getName());
if (!allowedDomain.includesNullableValue(partitionValue)) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.UuidType;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -89,7 +88,7 @@ public static Object convertIcebergValueToTrino(Type icebergType, Object value)
if (icebergType instanceof Types.TimestampType) {
long epochMicros = (long) value;
if (((Types.TimestampType) icebergType).shouldAdjustToUTC()) {
return timestampTzFromMicros(epochMicros, TimeZoneKey.UTC_KEY);
return timestampTzFromMicros(epochMicros);
}
return epochMicros;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.UuidType;
Expand Down Expand Up @@ -231,7 +230,7 @@ private static String quotedName(String name)
return '"' + name.replace("\"", "\"\"") + '"';
}

public static Object deserializePartitionValue(Type type, String valueString, String name, TimeZoneKey timeZoneKey)
public static Object deserializePartitionValue(Type type, String valueString, String name)
{
if (valueString == null) {
return null;
Expand Down Expand Up @@ -269,7 +268,7 @@ public static Object deserializePartitionValue(Type type, String valueString, St
return parseLong(valueString);
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return timestampTzFromMicros(parseLong(valueString), timeZoneKey);
return timestampTzFromMicros(parseLong(valueString));
}
if (type instanceof VarcharType) {
Slice value = utf8Slice(valueString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import io.trino.spi.block.Block;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimeZoneKey;

import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
Expand All @@ -34,11 +34,11 @@ public static long timestampTzToMicros(LongTimestampWithTimeZone timestamp)
roundDiv(timestamp.getPicosOfMilli(), PICOSECONDS_PER_MICROSECOND);
}

public static LongTimestampWithTimeZone timestampTzFromMicros(long epochMicros, TimeZoneKey timeZoneKey)
public static LongTimestampWithTimeZone timestampTzFromMicros(long epochMicros)
{
long epochMillis = floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND);
int picosOfMillis = floorMod(epochMicros, MICROSECONDS_PER_MILLISECOND) * PICOSECONDS_PER_MICROSECOND;
return LongTimestampWithTimeZone.fromEpochMillisAndFraction(epochMillis, picosOfMillis, timeZoneKey);
return LongTimestampWithTimeZone.fromEpochMillisAndFraction(epochMillis, picosOfMillis, UTC_KEY);
}

public static LongTimestampWithTimeZone getTimestampTz(Block block, int position)
Expand Down
Loading

0 comments on commit 79ab945

Please sign in to comment.