diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java index 079f10631193..4c7218c953c9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java @@ -45,6 +45,7 @@ import io.trino.spi.type.VarcharType; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -349,6 +350,11 @@ private static Optional> createCoercer(TypeManager typeMa return Optional.of(new MapCoercer(typeManager, fromHiveType, toHiveType)); } if (isRowType(fromType) && isRowType(toType)) { + if (fromHiveType.getCategory() == ObjectInspector.Category.UNION || toHiveType.getCategory() == ObjectInspector.Category.UNION) { + HiveType fromHiveTypeStruct = HiveType.toHiveType(fromType); + HiveType toHiveTypeStruct = HiveType.toHiveType(toType); + return Optional.of(new StructCoercer(typeManager, fromHiveTypeStruct, toHiveTypeStruct)); + } return Optional.of(new StructCoercer(typeManager, fromHiveType, toHiveType)); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java index ef16132fea86..40b049df8f38 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java @@ -82,7 +82,22 @@ private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType) return toType instanceof DecimalType || toHiveType.equals(HIVE_FLOAT) || toHiveType.equals(HIVE_DOUBLE); } - return canCoerceForList(fromHiveType, toHiveType) || canCoerceForMap(fromHiveType, toHiveType) || canCoerceForStruct(fromHiveType, toHiveType); + return canCoerceForList(fromHiveType, toHiveType) + || canCoerceForMap(fromHiveType, toHiveType) + || canCoerceForStruct(fromHiveType, toHiveType) + || canCoerceForUnionType(fromHiveType, toHiveType); + } + + private boolean canCoerceForUnionType(HiveType fromHiveType, HiveType toHiveType) + { + if (fromHiveType.getCategory() != Category.UNION || toHiveType.getCategory() != Category.UNION) { + return false; + } + + // Delegate to the struct coercion logic, since Trino sees union types as structs. + HiveType fromHiveTypeStruct = HiveType.toHiveType(fromHiveType.getType(typeManager)); + HiveType toHiveTypeStruct = HiveType.toHiveType(toHiveType.getType(typeManager)); + return canCoerceForStruct(fromHiveTypeStruct, toHiveTypeStruct); } private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java index e6ceb77e9926..ce001d9a7d0b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java @@ -35,12 +35,14 @@ public class TestReadUniontype extends HiveProductTest { private static final String TABLE_NAME = "test_read_uniontype"; + private static final String TABLE_NAME_SCHEMA_EVOLUTION = "test_read_uniontype_with_schema_evolution"; @BeforeTestWithContext @AfterTestWithContext public void cleanup() { onHive().executeQuery(format("DROP TABLE IF EXISTS %s", TABLE_NAME)); + onHive().executeQuery(format("DROP TABLE IF EXISTS %s", TABLE_NAME_SCHEMA_EVOLUTION)); } @DataProvider(name = "storage_formats") @@ -49,9 +51,14 @@ public static Object[][] storageFormats() return new String[][] {{"ORC"}, {"AVRO"}}; } - private void createTestTable(String storageFormat) + @Test(dataProvider = "storage_formats", groups = SMOKE) + public void testReadUniontype(String storageFormat) { - cleanup(); + // According to testing results, the Hive INSERT queries here only work in Hive 1.2 + if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) { + throw new SkipException("This test can only be run with Hive 1.2 (default config)"); + } + onHive().executeQuery(format( "CREATE TABLE %s (id INT,foo UNIONTYPE<" + "INT," + @@ -60,16 +67,7 @@ private void createTestTable(String storageFormat) "STORED AS %s", TABLE_NAME, storageFormat)); - } - @Test(dataProvider = "storage_formats", groups = SMOKE) - public void testReadUniontype(String storageFormat) - { - // According to testing results, the Hive INSERT queries here only work in Hive 1.2 - if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) { - throw new SkipException("This test can only be run with Hive 1.2 (default config)"); - } - createTestTable(storageFormat); // Generate a file with rows: // 0, {0: 36} // 1, {1: 7.2} @@ -139,6 +137,139 @@ public void testReadUniontype(String storageFormat) } } + @Test(dataProvider = "storage_formats", groups = SMOKE) + public void testUnionTypeSchemaEvolution(String storageFormat) + { + // According to testing results, the Hive INSERT queries here only work in Hive 1.2 + if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) { + throw new SkipException("This test can only be run with Hive 1.2 (default config)"); + } + + onHive().executeQuery(format( + "CREATE TABLE %s (" + + "c0 INT," + + "c1 UNIONTYPE<" + + " STRUCT, " + + " STRUCT>) " + + "PARTITIONED BY (c2 INT) " + + "STORED AS %s", + TABLE_NAME_SCHEMA_EVOLUTION, + storageFormat)); + switch (storageFormat) { + case "AVRO": + testAvroSchemaEvolution(); + break; + case "ORC": + testORCSchemaEvolution(); + break; + default: + throw new UnsupportedOperationException("Unsupported table format."); + } + } + + private void testORCSchemaEvolution() + { + // Generate a file with rows: + // 0, {0: } + // 1, {1: } + onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION (c2 = 5) " + + "SELECT 0, create_union(0, named_struct('a', 'a1', 'b', 'b1'), named_struct('c', 'ignore')) " + + "UNION ALL " + + "SELECT 1, create_union(1, named_struct('a', 'ignore', 'b', 'ignore'), named_struct('c', 'c1'))", + TABLE_NAME_SCHEMA_EVOLUTION)); + + // Add a coercible change inside union type column. + onHive().executeQuery(format("ALTER TABLE %S CHANGE COLUMN c1 c1 UNIONTYPE, STRUCT>", + TABLE_NAME_SCHEMA_EVOLUTION)); + + QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c0, c1 FROM %s", TABLE_NAME_SCHEMA_EVOLUTION)); + assertEquals(selectAllResult.rows().size(), 2); + for (List row : selectAllResult.rows()) { + int id = (Integer) row.get(0); + switch (id) { + case 0: + Row rowValueFirst = rowBuilder().addField("a", "a1").addField("b", "b1").build(); + assertStructEquals(row.get(1), new Object[]{(byte) 0, rowValueFirst, null}); + break; + case 1: + Row rowValueSecond = rowBuilder().addField("c", "c1").addField("d", null).build(); + assertStructEquals(row.get(1), new Object[]{(byte) 1, null, rowValueSecond}); + break; + } + } + } + + private void testAvroSchemaEvolution() + { + /** + * The following insertion fails on avro. + * + * hive (default)> INSERT INTO TABLE u_username.test_ut_avro partition (c2 = 5) + * > SELECT 1, create_union(1, named_struct('a', 'ignore', 'b', 'ignore'), named_struct('c', 'c1')); + * + * Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing writable (null) + * at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:179) + * at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) + * at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:459) + * at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) + * at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177) + * at java.security.AccessController.doPrivileged(Native Method) + * at javax.security.auth.Subject.doAs(Subject.java:422) + * at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) + * at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171) + * Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing writable (null) + * at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:505) + * at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170) + * ... 8 more + * Caused by: java.lang.ArrayIndexOutOfBoundsException: 1 + * at org.apache.avro.generic.GenericData$Record.get(GenericData.java:135) + * at org.apache.avro.generic.GenericData.getField(GenericData.java:580) + * at org.apache.avro.generic.GenericData.validate(GenericData.java:373) + * at org.apache.avro.generic.GenericData.validate(GenericData.java:395) + * at org.apache.avro.generic.GenericData.validate(GenericData.java:373) + * at org.apache.hadoop.hive.serde2.avro.AvroSerializer.serialize(AvroSerializer.java:96) + * + * So we try coercion logic on the first struct field inside the union (i.e. only for struct) only. + * + */ + // Generate a file with rows: + // 0, {0: } + // 1, {0: } + onHive().executeQuery(format( + "INSERT INTO TABLE %s PARTITION (c2 = 5) " + + "SELECT 0, create_union(0, named_struct('a', 'a1', 'b', 'b1'), named_struct('c', 'ignore')) " + + "UNION ALL " + + "SELECT 1, create_union(0, named_struct('a', 'a2', 'b', 'b2'), named_struct('c', 'ignore'))", + TABLE_NAME_SCHEMA_EVOLUTION)); + + // Add a coercible change inside union type column. + onHive().executeQuery(format("ALTER TABLE %S CHANGE COLUMN c1 c1 UNIONTYPE, STRUCT>", TABLE_NAME_SCHEMA_EVOLUTION)); + + QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c0, c1 FROM %s", TABLE_NAME_SCHEMA_EVOLUTION)); + assertEquals(selectAllResult.rows().size(), 2); + for (List row : selectAllResult.rows()) { + int id = (Integer) row.get(0); + switch (id) { + case 0: + Row rowValueFirst = rowBuilder() + .addField("a", "a1") + .addField("b", "b1") + .addField("d", null) + .build(); + assertStructEquals(row.get(1), new Object[] {(byte) 0, rowValueFirst, null}); + break; + case 1: + Row rowValueSecond = rowBuilder() + .addField("a", "a2") + .addField("b", "b2") + .addField("d", null) + .build(); + assertStructEquals(row.get(1), new Object[] {(byte) 0, rowValueSecond, null}); + break; + } + } + } + // TODO use Row as expected too, and use tempto QueryAssert private static void assertStructEquals(Object actual, Object[] expected) { @@ -149,4 +280,9 @@ private static void assertStructEquals(Object actual, Object[] expected) assertEquals(actualRow.getFields().get(i).getValue(), expected[i]); } } + + private static io.trino.jdbc.Row.Builder rowBuilder() + { + return io.trino.jdbc.Row.builder(); + } }