Skip to content

Commit

Permalink
Support reading non-ISO date partition names in Hive
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 13, 2022
1 parent e06215e commit d96f3e8
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.LocalDateTime;
import org.joda.time.LocalTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
import org.joda.time.format.DateTimeParser;
import org.joda.time.format.DateTimePrinter;
import org.joda.time.format.ISODateTimeFormat;

import javax.annotation.Nullable;

Expand All @@ -94,7 +96,6 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -184,7 +185,8 @@ public final class HiveUtil
public static final String ICEBERG_TABLE_TYPE_NAME = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";

private static final DateTimeFormatter HIVE_DATE_PARSER = ISODateTimeFormat.date().withZoneUTC();
private static final LocalDateTime EPOCH_DAY = new LocalDateTime(1970, 1, 1, 0, 0);
private static final DateTimeFormatter HIVE_DATE_PARSER;
private static final DateTimeFormatter HIVE_TIMESTAMP_PARSER;
private static final Field COMPRESSION_CODECS_FIELD;

Expand All @@ -207,6 +209,7 @@ public final class HiveUtil
};
DateTimePrinter timestampWithoutTimeZonePrinter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getPrinter();
HIVE_TIMESTAMP_PARSER = new DateTimeFormatterBuilder().append(timestampWithoutTimeZonePrinter, timestampWithoutTimeZoneParser).toFormatter().withZoneUTC();
HIVE_DATE_PARSER = new DateTimeFormatterBuilder().append(timestampWithoutTimeZonePrinter, timestampWithoutTimeZoneParser).toFormatter().withZoneUTC();

try {
COMPRESSION_CODECS_FIELD = TextInputFormat.class.getDeclaredField("compressionCodecs");
Expand Down Expand Up @@ -374,8 +377,11 @@ public static String getInputFormatName(Properties schema)

public static long parseHiveDate(String value)
{
long millis = HIVE_DATE_PARSER.parseMillis(value);
return TimeUnit.MILLISECONDS.toDays(millis);
LocalDateTime date = HIVE_DATE_PARSER.parseLocalDateTime(value);
if (!date.toLocalTime().equals(LocalTime.MIDNIGHT)) {
throw new IllegalArgumentException(format("The value should be a whole round date: '%s'", value));
}
return Days.daysBetween(EPOCH_DAY, date).getDays();
}

public static long parseHiveTimestamp(String value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static io.trino.tests.product.TestGroups.HIVE_SPARK;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onHive;
import static io.trino.tests.product.utils.QueryExecutors.onSpark;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
Expand Down Expand Up @@ -423,6 +424,102 @@ private void testReadTrinoCreatedTable(String tableName, String tableFormat)
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testReadSparkdDateAndTimePartitionName()
{
String sparkTableName = "test_trino_reading_spark_date_and_time_type_partitioned_" + randomTableSuffix();
String trinoTableName = format("%s.default.%s", TRINO_CATALOG, sparkTableName);

onSpark().executeQuery(format("CREATE TABLE default.%s (value integer) PARTITIONED BY (dt date)", sparkTableName));

// Spark allows creating partition with time unit
// Hive denies creating such partitions, but allows reading
onSpark().executeQuery(format("INSERT INTO %s PARTITION(dt='2022-04-13 00:00:00.000000000') VALUES (1)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s PARTITION(dt='2022-04-13 00:00:00') VALUES (2)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s PARTITION(dt='2022-04-13 00:00') VALUES (3)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s PARTITION(dt='12345-06-07') VALUES (4)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s PARTITION(dt='123-04-05') VALUES (5)", sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s PARTITION(dt='-0001-01-01') VALUES (6)", sparkTableName));

assertThat(onTrino().executeQuery("SELECT \"$partition\" FROM " + trinoTableName))
.containsOnly(List.of(
row("dt=2022-04-13 00%3A00%3A00.000000000"),
row("dt=2022-04-13 00%3A00%3A00"),
row("dt=2022-04-13 00%3A00"),
row("dt=12345-06-07"),
row("dt=123-04-05"),
row("dt=-0001-01-01")));

// Use date_format function to avoid exception due to java.sql.Date.valueOf() with 5 digit year
assertThat(onSpark().executeQuery("SELECT value, date_format(dt, 'yyyy-MM-dd') FROM " + sparkTableName))
.containsOnly(List.of(
row(1, "2022-04-13"),
row(2, "2022-04-13"),
row(3, "2022-04-13"),
row(4, "+12345-06-07"),
row(5, null),
row(6, "-0001-01-01")));

// Use date_format function to avoid exception due to java.sql.Date.valueOf() with 5 digit year
assertThat(onHive().executeQuery("SELECT value, date_format(dt, 'yyyy-MM-dd') FROM " + sparkTableName))
.containsOnly(List.of(
row(1, "2022-04-13"),
row(2, "2022-04-13"),
row(3, "2022-04-13"),
row(4, "12345-06-07"),
row(5, "0123-04-06"),
row(6, "0002-01-03")));

// Cast to varchar so that we can compare with Spark & Hive easily
assertThat(onTrino().executeQuery("SELECT value, CAST(dt AS VARCHAR) FROM " + trinoTableName))
.containsOnly(List.of(
row(1, "2022-04-13"),
row(2, "2022-04-13"),
row(3, "2022-04-13"),
row(4, "12345-06-07"),
row(5, "0123-04-05"),
row(6, "-0001-01-01")));

onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}, dataProvider = "unsupportedPartitionDates")
public void testReadSparkInvalidDatePartitionName(String inputDate, java.sql.Date outputDate)
{
String sparkTableName = "test_trino_reading_spark_invalid_date_type_partitioned_" + randomTableSuffix();
String trinoTableName = format("%s.default.%s", TRINO_CATALOG, sparkTableName);

onSpark().executeQuery(format("CREATE TABLE default.%s (value integer) PARTITIONED BY (dt date)", sparkTableName));

// Spark allows creating partition with invalid date format
// Hive denies creating such partitions, but allows reading
onSpark().executeQuery(format("INSERT INTO %s PARTITION(dt='%s') VALUES (1)", sparkTableName, inputDate));

// Hive ignores time unit, and return null for invalid dates
assertThat(onHive().executeQuery("SELECT value, dt FROM " + sparkTableName))
.containsOnly(List.of(row(1, outputDate)));

// Trino throws an exception if the date is invalid format or not a whole round date
assertQueryFailure(() -> onTrino().executeQuery("SELECT value, dt FROM " + trinoTableName))
.hasMessageContaining("Invalid partition value");

onTrino().executeQuery("DROP TABLE " + trinoTableName);
}

@DataProvider
public static Object[][] unsupportedPartitionDates()
{
return new Object[][] {
{"1965-09-10 23:59:59.999999999", java.sql.Date.valueOf(LocalDate.of(1965, 9, 10))},
{"1965-09-10 23:59:59", java.sql.Date.valueOf(LocalDate.of(1965, 9, 10))},
{"1965-09-10 23:59", java.sql.Date.valueOf(LocalDate.of(1965, 9, 10))},
{"1965-09-10 00", java.sql.Date.valueOf(LocalDate.of(1965, 9, 10))},
{"2021-02-30", java.sql.Date.valueOf(LocalDate.of(2021, 3, 2))},
{"1965-09-10 invalid", java.sql.Date.valueOf(LocalDate.of(1965, 9, 10))},
{"invalid date", null},
};
}

@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testReadSparkBucketedTable()
{
Expand Down

0 comments on commit d96f3e8

Please sign in to comment.