Skip to content

Commit

Permalink
Support coercion within union type for ORC format
Browse files Browse the repository at this point in the history
Co-authored-by: Pratham Desai <prathamd94@gmail.com>
  • Loading branch information
Felicity-3786 and phd3 committed Jun 13, 2022
1 parent b8faa60 commit e071da4
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.type.VarcharType;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand Down Expand Up @@ -349,6 +350,11 @@ private static Optional<Function<Block, Block>> createCoercer(TypeManager typeMa
return Optional.of(new MapCoercer(typeManager, fromHiveType, toHiveType));
}
if (isRowType(fromType) && isRowType(toType)) {
if (fromHiveType.getCategory() == ObjectInspector.Category.UNION || toHiveType.getCategory() == ObjectInspector.Category.UNION) {
HiveType fromHiveTypeStruct = HiveType.toHiveType(fromType);
HiveType toHiveTypeStruct = HiveType.toHiveType(toType);
return Optional.of(new StructCoercer(typeManager, fromHiveTypeStruct, toHiveTypeStruct));
}
return Optional.of(new StructCoercer(typeManager, fromHiveType, toHiveType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,22 @@ private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType)
return toType instanceof DecimalType || toHiveType.equals(HIVE_FLOAT) || toHiveType.equals(HIVE_DOUBLE);
}

return canCoerceForList(fromHiveType, toHiveType) || canCoerceForMap(fromHiveType, toHiveType) || canCoerceForStruct(fromHiveType, toHiveType);
return canCoerceForList(fromHiveType, toHiveType)
|| canCoerceForMap(fromHiveType, toHiveType)
|| canCoerceForStruct(fromHiveType, toHiveType)
|| canCoerceForUnionType(fromHiveType, toHiveType);
}

private boolean canCoerceForUnionType(HiveType fromHiveType, HiveType toHiveType)
{
if (fromHiveType.getCategory() != Category.UNION || toHiveType.getCategory() != Category.UNION) {
return false;
}

// Delegate to the struct coercion logic, since Trino sees union types as structs.
HiveType fromHiveTypeStruct = HiveType.toHiveType(fromHiveType.getType(typeManager));
HiveType toHiveTypeStruct = HiveType.toHiveType(toHiveType.getType(typeManager));
return canCoerceForStruct(fromHiveTypeStruct, toHiveTypeStruct);
}

private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ public class TestReadUniontype
extends HiveProductTest
{
private static final String TABLE_NAME = "test_read_uniontype";
private static final String TABLE_NAME_SCHEMA_EVOLUTION = "test_read_uniontype_with_schema_evolution";

@BeforeTestWithContext
@AfterTestWithContext
public void cleanup()
{
onHive().executeQuery(format("DROP TABLE IF EXISTS %s", TABLE_NAME));
onHive().executeQuery(format("DROP TABLE IF EXISTS %s", TABLE_NAME_SCHEMA_EVOLUTION));
}

@DataProvider(name = "storage_formats")
Expand All @@ -49,9 +51,14 @@ public static Object[][] storageFormats()
return new String[][] {{"ORC"}, {"AVRO"}};
}

private void createTestTable(String storageFormat)
@Test(dataProvider = "storage_formats", groups = SMOKE)
public void testReadUniontype(String storageFormat)
{
cleanup();
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}

onHive().executeQuery(format(
"CREATE TABLE %s (id INT,foo UNIONTYPE<" +
"INT," +
Expand All @@ -60,16 +67,7 @@ private void createTestTable(String storageFormat)
"STORED AS %s",
TABLE_NAME,
storageFormat));
}

@Test(dataProvider = "storage_formats", groups = SMOKE)
public void testReadUniontype(String storageFormat)
{
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}
createTestTable(storageFormat);
// Generate a file with rows:
// 0, {0: 36}
// 1, {1: 7.2}
Expand Down Expand Up @@ -139,6 +137,139 @@ public void testReadUniontype(String storageFormat)
}
}

@Test(dataProvider = "storage_formats", groups = SMOKE)
public void testUnionTypeSchemaEvolution(String storageFormat)
{
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}

onHive().executeQuery(format(
"CREATE TABLE %s ("
+ "c0 INT,"
+ "c1 UNIONTYPE<"
+ " STRUCT<a:STRING, b:STRING>, "
+ " STRUCT<c:STRING>>) "
+ "PARTITIONED BY (c2 INT) "
+ "STORED AS %s",
TABLE_NAME_SCHEMA_EVOLUTION,
storageFormat));
switch (storageFormat) {
case "AVRO":
testAvroSchemaEvolution();
break;
case "ORC":
testORCSchemaEvolution();
break;
default:
throw new UnsupportedOperationException("Unsupported table format.");
}
}

private void testORCSchemaEvolution()
{
// Generate a file with rows:
// 0, {0: <a="a1",b="b1">}
// 1, {1: <c="c1">}
onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION (c2 = 5) "
+ "SELECT 0, create_union(0, named_struct('a', 'a1', 'b', 'b1'), named_struct('c', 'ignore')) "
+ "UNION ALL "
+ "SELECT 1, create_union(1, named_struct('a', 'ignore', 'b', 'ignore'), named_struct('c', 'c1'))",
TABLE_NAME_SCHEMA_EVOLUTION));

// Add a coercible change inside union type column.
onHive().executeQuery(format("ALTER TABLE %S CHANGE COLUMN c1 c1 UNIONTYPE<STRUCT<a:STRING, b:STRING>, STRUCT<c:STRING, d:STRING>>",
TABLE_NAME_SCHEMA_EVOLUTION));

QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c0, c1 FROM %s", TABLE_NAME_SCHEMA_EVOLUTION));
assertEquals(selectAllResult.rows().size(), 2);
for (List<?> row : selectAllResult.rows()) {
int id = (Integer) row.get(0);
switch (id) {
case 0:
Row rowValueFirst = rowBuilder().addField("a", "a1").addField("b", "b1").build();
assertStructEquals(row.get(1), new Object[]{(byte) 0, rowValueFirst, null});
break;
case 1:
Row rowValueSecond = rowBuilder().addField("c", "c1").addField("d", null).build();
assertStructEquals(row.get(1), new Object[]{(byte) 1, null, rowValueSecond});
break;
}
}
}

private void testAvroSchemaEvolution()
{
/**
* The following insertion fails on avro.
*
* hive (default)> INSERT INTO TABLE u_username.test_ut_avro partition (c2 = 5)
* > SELECT 1, create_union(1, named_struct('a', 'ignore', 'b', 'ignore'), named_struct('c', 'c1'));
*
* Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing writable (null)
* at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:179)
* at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
* at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:459)
* at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
* at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
* at java.security.AccessController.doPrivileged(Native Method)
* at javax.security.auth.Subject.doAs(Subject.java:422)
* at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
* at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
* Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing writable (null)
* at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:505)
* at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170)
* ... 8 more
* Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
* at org.apache.avro.generic.GenericData$Record.get(GenericData.java:135)
* at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
* at org.apache.avro.generic.GenericData.validate(GenericData.java:373)
* at org.apache.avro.generic.GenericData.validate(GenericData.java:395)
* at org.apache.avro.generic.GenericData.validate(GenericData.java:373)
* at org.apache.hadoop.hive.serde2.avro.AvroSerializer.serialize(AvroSerializer.java:96)
*
* So we try coercion logic on the first struct field inside the union (i.e. only for <a,b> struct) only.
*
*/
// Generate a file with rows:
// 0, {0: <a="a1",b="b1">}
// 1, {0: <a="a2",b="b2">}
onHive().executeQuery(format(
"INSERT INTO TABLE %s PARTITION (c2 = 5) "
+ "SELECT 0, create_union(0, named_struct('a', 'a1', 'b', 'b1'), named_struct('c', 'ignore')) "
+ "UNION ALL "
+ "SELECT 1, create_union(0, named_struct('a', 'a2', 'b', 'b2'), named_struct('c', 'ignore'))",
TABLE_NAME_SCHEMA_EVOLUTION));

// Add a coercible change inside union type column.
onHive().executeQuery(format("ALTER TABLE %S CHANGE COLUMN c1 c1 UNIONTYPE<STRUCT<a:STRING, b:STRING, d:STRING>, STRUCT<c:STRING>>", TABLE_NAME_SCHEMA_EVOLUTION));

QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c0, c1 FROM %s", TABLE_NAME_SCHEMA_EVOLUTION));
assertEquals(selectAllResult.rows().size(), 2);
for (List<?> row : selectAllResult.rows()) {
int id = (Integer) row.get(0);
switch (id) {
case 0:
Row rowValueFirst = rowBuilder()
.addField("a", "a1")
.addField("b", "b1")
.addField("d", null)
.build();
assertStructEquals(row.get(1), new Object[] {(byte) 0, rowValueFirst, null});
break;
case 1:
Row rowValueSecond = rowBuilder()
.addField("a", "a2")
.addField("b", "b2")
.addField("d", null)
.build();
assertStructEquals(row.get(1), new Object[] {(byte) 0, rowValueSecond, null});
break;
}
}
}

// TODO use Row as expected too, and use tempto QueryAssert
private static void assertStructEquals(Object actual, Object[] expected)
{
Expand All @@ -149,4 +280,9 @@ private static void assertStructEquals(Object actual, Object[] expected)
assertEquals(actualRow.getFields().get(i).getValue(), expected[i]);
}
}

private static io.trino.jdbc.Row.Builder rowBuilder()
{
return io.trino.jdbc.Row.builder();
}
}

0 comments on commit e071da4

Please sign in to comment.