Skip to content

Commit

Permalink
Add support for varchar to timestamp coercion in hive tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveen2112 committed Jul 27, 2023
1 parent 8b2a819 commit 84a3466
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.HiveTimestampPrecision;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToLongTimestampCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToShortTimestampCoercer;
import io.trino.plugin.hive.type.Category;
import io.trino.plugin.hive.type.ListTypeInfo;
import io.trino.plugin.hive.type.MapTypeInfo;
Expand Down Expand Up @@ -91,6 +93,12 @@ public static Type createTypeFromCoercer(TypeManager typeManager, HiveType fromH
if (fromType instanceof VarcharType fromVarcharType && (toHiveType.equals(HIVE_BYTE) || toHiveType.equals(HIVE_SHORT) || toHiveType.equals(HIVE_INT) || toHiveType.equals(HIVE_LONG))) {
return Optional.of(new VarcharToIntegerNumberCoercer<>(fromVarcharType, toType));
}
if (fromType instanceof VarcharType varcharType && toType instanceof TimestampType timestampType) {
if (timestampType.isShort()) {
return Optional.of(new VarcharToShortTimestampCoercer(varcharType, timestampType));
}
return Optional.of(new VarcharToLongTimestampCoercer(varcharType, timestampType));
}
if (fromType instanceof VarcharType fromVarcharType && toType instanceof VarcharType toVarcharType) {
if (narrowerThan(toVarcharType, fromVarcharType)) {
return Optional.of(new VarcharCoercer(fromVarcharType, toVarcharType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.coercions;

import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
Expand All @@ -25,19 +26,27 @@
import java.time.chrono.IsoChronology;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_TIMESTAMP_COERCION;
import static io.trino.spi.type.TimestampType.MAX_PRECISION;
import static io.trino.spi.type.TimestampType.MAX_SHORT_PRECISION;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_SECOND;
import static io.trino.spi.type.Timestamps.NANOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_NANOSECOND;
import static io.trino.spi.type.Timestamps.SECONDS_PER_DAY;
import static io.trino.spi.type.Timestamps.round;
import static io.trino.spi.type.Timestamps.roundDiv;
import static io.trino.spi.type.Varchars.truncateToLength;
import static java.lang.Math.floorDiv;
import static java.lang.Math.floorMod;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
import static java.time.format.ResolverStyle.STRICT;

public final class TimestampCoercer
{
Expand All @@ -47,6 +56,7 @@ public final class TimestampCoercer
.appendLiteral(' ')
.append(ISO_LOCAL_TIME)
.toFormatter()
.withResolverStyle(STRICT)
.withChronology(IsoChronology.INSTANCE);

// Before 1900, Java Time and Joda Time are not consistent with java.sql.Date and java.util.Calendar
Expand Down Expand Up @@ -83,4 +93,66 @@ protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int pos
toType));
}
}

public static class VarcharToShortTimestampCoercer
extends TypeCoercer<VarcharType, TimestampType>
{
public VarcharToShortTimestampCoercer(VarcharType fromType, TimestampType toType)
{
super(fromType, toType);
checkArgument(toType.isShort(), format("TIMESTAMP precision must be in range [0, %s]: %s", MAX_PRECISION, toType.getPrecision()));
}

@Override
protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int position)
{
try {
Slice value = fromType.getSlice(block, position);
LocalDateTime dateTime = LOCAL_DATE_TIME.parse(value.toStringUtf8(), LocalDateTime::from);
long epochSecond = dateTime.toEpochSecond(UTC);
if (epochSecond < START_OF_MODERN_ERA_SECONDS) {
throw new TrinoException(HIVE_INVALID_TIMESTAMP_COERCION, "Coercion on historical dates is not supported");
}
long roundedNanos = round(dateTime.getNano(), 9 - toType.getPrecision());
long epochMicros = epochSecond * MICROSECONDS_PER_SECOND + roundDiv(roundedNanos, NANOSECONDS_PER_MICROSECOND);
toType.writeLong(blockBuilder, epochMicros);
}
catch (DateTimeParseException ignored) {
// Hive treats invalid String as null instead of propagating exception
// In case of bigger tables with all values being invalid, log output will be huge so avoiding log here.
blockBuilder.appendNull();
}
}
}

public static class VarcharToLongTimestampCoercer
extends TypeCoercer<VarcharType, TimestampType>
{
public VarcharToLongTimestampCoercer(VarcharType fromType, TimestampType toType)
{
super(fromType, toType);
checkArgument(!toType.isShort(), format("Precision must be in the range [%s, %s]", MAX_SHORT_PRECISION + 1, MAX_PRECISION));
}

@Override
protected void applyCoercedValue(BlockBuilder blockBuilder, Block block, int position)
{
try {
Slice value = fromType.getSlice(block, position);
LocalDateTime dateTime = LOCAL_DATE_TIME.parse(value.toStringUtf8(), LocalDateTime::from);
long epochSecond = dateTime.toEpochSecond(UTC);
if (epochSecond < START_OF_MODERN_ERA_SECONDS) {
throw new TrinoException(HIVE_INVALID_TIMESTAMP_COERCION, "Coercion on historical dates is not supported");
}
long epochMicros = epochSecond * MICROSECONDS_PER_SECOND + dateTime.getNano() / NANOSECONDS_PER_MICROSECOND;
int picosOfMicro = (dateTime.getNano() % NANOSECONDS_PER_MICROSECOND) * PICOSECONDS_PER_NANOSECOND;
toType.writeObject(blockBuilder, new LongTimestamp(epochMicros, picosOfMicro));
}
catch (DateTimeParseException ignored) {
// Hive treats invalid String as null instead of propagating exception
// In case of bigger tables with all values being invalid, log output will be huge so avoiding log here.
blockBuilder.appendNull();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@

import io.trino.orc.metadata.OrcType.OrcTypeKind;
import io.trino.plugin.hive.coercions.TimestampCoercer.LongTimestampToVarcharCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToLongTimestampCoercer;
import io.trino.plugin.hive.coercions.TimestampCoercer.VarcharToShortTimestampCoercer;
import io.trino.plugin.hive.coercions.TypeCoercer;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;

import java.util.Optional;

import static io.trino.orc.metadata.OrcType.OrcTypeKind.STRING;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.TIMESTAMP;
import static io.trino.orc.metadata.OrcType.OrcTypeKind.VARCHAR;
import static io.trino.spi.type.TimestampType.TIMESTAMP_NANOS;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;

public final class OrcTypeTranslator
{
Expand All @@ -33,6 +39,17 @@ private OrcTypeTranslator() {}
if (fromOrcType == TIMESTAMP && toTrinoType instanceof VarcharType varcharType) {
return Optional.of(new LongTimestampToVarcharCoercer(TIMESTAMP_NANOS, varcharType));
}
if (isVarcharType(fromOrcType) && toTrinoType instanceof TimestampType timestampType) {
if (timestampType.isShort()) {
return Optional.of(new VarcharToShortTimestampCoercer(createUnboundedVarcharType(), timestampType));
}
return Optional.of(new VarcharToLongTimestampCoercer(createUnboundedVarcharType(), timestampType));
}
return Optional.empty();
}

private static boolean isVarcharType(OrcTypeKind orcTypeKind)
{
return orcTypeKind == STRING || orcTypeKind == VARCHAR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType, HiveTimest
toHiveType.equals(HIVE_BYTE) ||
toHiveType.equals(HIVE_SHORT) ||
toHiveType.equals(HIVE_INT) ||
toHiveType.equals(HIVE_LONG);
toHiveType.equals(HIVE_LONG) ||
toHiveType.equals(HIVE_TIMESTAMP);
}
if (fromType instanceof CharType) {
return toType instanceof CharType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import java.time.LocalDateTime;

import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.hive.HiveTimestampPrecision.MICROSECONDS;
import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
import static io.trino.plugin.hive.HiveType.toHiveType;
import static io.trino.plugin.hive.coercions.CoercionUtils.createCoercer;
import static io.trino.spi.predicate.Utils.blockToNativeValue;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_PICOS;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static io.trino.spi.type.VarcharType.createVarcharType;
Expand All @@ -51,6 +53,22 @@ public void testTimestampToVarchar(String timestampValue, String hiveTimestampVa
assertLongTimestampToVarcharCoercions(TIMESTAMP_PICOS, new LongTimestamp(timestamp.getEpochMicros(), timestamp.getPicosOfMicros()), createUnboundedVarcharType(), hiveTimestampValue);
}

@Test(dataProvider = "timestampValuesProvider")
public void testVarcharToShortTimestamp(String timestampValue, String hiveTimestampValue)
{
LocalDateTime localDateTime = LocalDateTime.parse(timestampValue);
SqlTimestamp timestamp = SqlTimestamp.fromSeconds(TIMESTAMP_MICROS.getPrecision(), localDateTime.toEpochSecond(UTC), localDateTime.get(NANO_OF_SECOND));
assertVarcharToShortTimestampCoercions(createUnboundedVarcharType(), utf8Slice(hiveTimestampValue), TIMESTAMP_MICROS, timestamp.getEpochMicros());
}

@Test(dataProvider = "timestampValuesProvider")
public void testVarcharToLongTimestamp(String timestampValue, String hiveTimestampValue)
{
LocalDateTime localDateTime = LocalDateTime.parse(timestampValue);
SqlTimestamp timestamp = SqlTimestamp.fromSeconds(TIMESTAMP_PICOS.getPrecision(), localDateTime.toEpochSecond(UTC), localDateTime.get(NANO_OF_SECOND));
assertVarcharToLongTimestampCoercions(createUnboundedVarcharType(), utf8Slice(hiveTimestampValue), TIMESTAMP_PICOS, new LongTimestamp(timestamp.getEpochMicros(), timestamp.getPicosOfMicros()));
}

@Test
public void testTimestampToSmallerVarchar()
{
Expand Down Expand Up @@ -93,11 +111,53 @@ public void testHistoricalLongTimestampToVarchar()
{
LocalDateTime localDateTime = LocalDateTime.parse("1899-12-31T23:59:59.999999999");
SqlTimestamp timestamp = SqlTimestamp.fromSeconds(TIMESTAMP_PICOS.getPrecision(), localDateTime.toEpochSecond(UTC), localDateTime.get(NANO_OF_SECOND));
assertThatThrownBy(() -> assertLongTimestampToVarcharCoercions(
TIMESTAMP_PICOS,
new LongTimestamp(timestamp.getEpochMicros(), timestamp.getPicosOfMicros()),
assertThatThrownBy(() ->
assertLongTimestampToVarcharCoercions(
TIMESTAMP_PICOS,
new LongTimestamp(timestamp.getEpochMicros(), timestamp.getPicosOfMicros()),
createUnboundedVarcharType(),
"1899-12-31 23:59:59.999999999"))
.isInstanceOf(TrinoException.class)
.hasMessageContaining("Coercion on historical dates is not supported");
}

@Test(dataProvider = "invalidValue")
public void testInvalidVarcharToShortTimestamp(String invalidValue)
{
assertVarcharToShortTimestampCoercions(createUnboundedVarcharType(), utf8Slice(invalidValue), TIMESTAMP_MICROS, null);
}

@Test(dataProvider = "invalidValue")
public void testInvalidVarcharLongTimestamp(String invalidValue)
{
assertVarcharToLongTimestampCoercions(createUnboundedVarcharType(), utf8Slice(invalidValue), TIMESTAMP_MICROS, null);
}

@Test
public void testHistoricalVarcharToShortTimestamp()
{
LocalDateTime localDateTime = LocalDateTime.parse("1899-12-31T23:59:59.999999");
SqlTimestamp timestamp = SqlTimestamp.fromSeconds(TIMESTAMP_MICROS.getPrecision(), localDateTime.toEpochSecond(UTC), localDateTime.get(NANO_OF_SECOND));
assertThatThrownBy(() ->
assertVarcharToShortTimestampCoercions(
createUnboundedVarcharType(),
utf8Slice("1899-12-31 23:59:59.999999"),
TIMESTAMP_MICROS,
timestamp.getEpochMicros()))
.isInstanceOf(TrinoException.class)
.hasMessageContaining("Coercion on historical dates is not supported");
}

@Test
public void testHistoricalVarcharToLongTimestamp()
{
LocalDateTime localDateTime = LocalDateTime.parse("1899-12-31T23:59:59.999999");
SqlTimestamp timestamp = SqlTimestamp.fromSeconds(TIMESTAMP_PICOS.getPrecision(), localDateTime.toEpochSecond(UTC), localDateTime.get(NANO_OF_SECOND));
assertThatThrownBy(() -> assertVarcharToShortTimestampCoercions(
createUnboundedVarcharType(),
"1899-12-31 23:59:59.999999999"))
utf8Slice("1899-12-31 23:59:59.999999"),
TIMESTAMP_PICOS,
timestamp.getEpochMicros()))
.isInstanceOf(TrinoException.class)
.hasMessageContaining("Coercion on historical dates is not supported");
}
Expand Down Expand Up @@ -129,11 +189,36 @@ public Object[][] timestampValuesProvider()
};
}

@DataProvider
public Object[][] invalidValue()
{
return new Object[][] {
{"Invalid timestamp"}, // Invalid string
{"2022"}, // Partial timestamp value
{"2001-04-01T00:13:42.000"}, // ISOFormat date
{"2001-14-01 00:13:42.000"}, // Invalid month
{"2001-01-32 00:13:42.000"}, // Invalid day
{"2001-04-01 23:59:60.000"}, // Invalid second
{"2001-04-01 23:60:01.000"}, // Invalid minute
{"2001-04-01 27:01:01.000"}, // Invalid hour
};
}

public static void assertLongTimestampToVarcharCoercions(TimestampType fromType, LongTimestamp valueToBeCoerced, VarcharType toType, String expectedValue)
{
assertCoercions(fromType, valueToBeCoerced, toType, utf8Slice(expectedValue), NANOSECONDS);
}

public static void assertVarcharToShortTimestampCoercions(Type fromType, Object valueToBeCoerced, Type toType, Object expectedValue)
{
assertCoercions(fromType, valueToBeCoerced, toType, expectedValue, MICROSECONDS);
}

public static void assertVarcharToLongTimestampCoercions(Type fromType, Object valueToBeCoerced, Type toType, Object expectedValue)
{
assertCoercions(fromType, valueToBeCoerced, toType, expectedValue, NANOSECONDS);
}

public static void assertCoercions(Type fromType, Object valueToBeCoerced, Type toType, Object expectedValue, HiveTimestampPrecision timestampPrecision)
{
Block coercedValue = createCoercer(TESTING_TYPE_MANAGER, toHiveType(fromType), toHiveType(toType), timestampPrecision).orElseThrow()
Expand Down
Loading

0 comments on commit 84a3466

Please sign in to comment.