diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java index f4d63ce765920..784b8e597393b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSource.java @@ -101,7 +101,7 @@ public ParquetPageSource( typesBuilder.add(type); hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex(); - if (getParquetType(column, fileSchema, useParquetColumnNames) == null) { + if (getParquetType(type, fileSchema, useParquetColumnNames, column.getName(), column.getHiveColumnIndex(), column.getHiveType()) == null) { constantBlocks[columnIndex] = RunLengthEncodedBlock.create(type, null, MAX_VECTOR_LENGTH); fieldsBuilder.add(Optional.empty()); } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java index 4530c1b1dbfa1..22e0ad622375b 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetPageSourceFactory.java @@ -17,6 +17,7 @@ import com.facebook.presto.hive.HdfsEnvironment; import com.facebook.presto.hive.HiveBatchPageSourceFactory; import com.facebook.presto.hive.HiveColumnHandle; +import com.facebook.presto.hive.HiveType; import com.facebook.presto.memory.context.AggregatedMemoryContext; import com.facebook.presto.parquet.ParquetCorruptionException; import com.facebook.presto.parquet.ParquetDataSource; @@ -29,6 +30,8 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.spi.type.StandardTypes; +import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeManager; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -42,7 +45,9 @@ import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.joda.time.DateTimeZone; import javax.inject.Inject; @@ -61,6 +66,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_BAD_DATA; import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; import static com.facebook.presto.hive.HiveErrorCode.HIVE_MISSING_DATA; +import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH; import static com.facebook.presto.hive.HiveSessionProperties.isFailOnCorruptedParquetStatistics; import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames; import static com.facebook.presto.hive.HiveUtil.getDeserializerClassName; @@ -71,11 +77,34 @@ import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetTypeByName; import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate; import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches; +import static com.facebook.presto.spi.type.StandardTypes.ARRAY; +import static com.facebook.presto.spi.type.StandardTypes.BIGINT; +import static com.facebook.presto.spi.type.StandardTypes.CHAR; +import static com.facebook.presto.spi.type.StandardTypes.DATE; +import static com.facebook.presto.spi.type.StandardTypes.DECIMAL; +import static com.facebook.presto.spi.type.StandardTypes.INTEGER; +import static com.facebook.presto.spi.type.StandardTypes.MAP; +import static com.facebook.presto.spi.type.StandardTypes.REAL; +import static com.facebook.presto.spi.type.StandardTypes.ROW; +import static com.facebook.presto.spi.type.StandardTypes.SMALLINT; +import static com.facebook.presto.spi.type.StandardTypes.TIMESTAMP; +import static com.facebook.presto.spi.type.StandardTypes.TINYINT; +import static com.facebook.presto.spi.type.StandardTypes.VARBINARY; +import static com.facebook.presto.spi.type.StandardTypes.VARCHAR; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.nullToEmpty; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; public class ParquetPageSourceFactory implements HiveBatchPageSourceFactory @@ -160,7 +189,7 @@ public static ParquetPageSource createParquetPageSource( List fields = columns.stream() .filter(column -> column.getColumnType() == REGULAR) - .map(column -> getParquetType(column, fileSchema, useParquetColumnNames)) + .map(column -> getParquetType(typeManager.getType(column.getTypeSignature()), fileSchema, useParquetColumnNames, column.getName(), column.getHiveColumnIndex(), column.getHiveType())) .filter(Objects::nonNull) .collect(toList()); @@ -249,15 +278,116 @@ public static TupleDomain getParquetTupleDomain(Map columns = ImmutableList.of(partitionColumn, varcharColumn); - HiveErrorCode expectedErrorCode = HiveErrorCode.HIVE_INVALID_PARTITION_VALUE; + HiveErrorCode expectedErrorCode = HIVE_INVALID_PARTITION_VALUE; String expectedMessage = "Invalid partition value 'test' for varchar(3) partition key: partition_column"; assertThatFileFormat(RCTEXT) @@ -510,6 +531,171 @@ public void testFailForLongVarcharPartitionColumn() .isFailingForRecordCursor(new GenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), expectedErrorCode, expectedMessage); } + @Test + public void testSchemaMismatch() + throws Exception + { + TestColumn floatColumn = new TestColumn("column_name", javaFloatObjectInspector, 5.1f, 5.1f); + TestColumn doubleColumn = new TestColumn("column_name", javaDoubleObjectInspector, 5.1, 5.1); + TestColumn booleanColumn = new TestColumn("column_name", javaBooleanObjectInspector, true, true); + TestColumn stringColumn = new TestColumn("column_name", javaStringObjectInspector, "test", utf8Slice("test")); + TestColumn intColumn = new TestColumn("column_name", javaIntObjectInspector, 3, 3); + TestColumn longColumn = new TestColumn("column_name", javaLongObjectInspector, 4L, 4L); + TestColumn mapLongColumn = new TestColumn("column_name", + getStandardMapObjectInspector(javaLongObjectInspector, javaLongObjectInspector), + ImmutableMap.of(4L, 4L), + mapBlockOf(BIGINT, BIGINT, 4L, 4L)); + TestColumn mapDoubleColumn = new TestColumn("column_name", + getStandardMapObjectInspector(javaDoubleObjectInspector, javaDoubleObjectInspector), + ImmutableMap.of(5.1, 5.2), + mapBlockOf(DOUBLE, DOUBLE, 5.1, 5.2)); + TestColumn arrayStringColumn = new TestColumn("column_name", + getStandardListObjectInspector(javaStringObjectInspector), + ImmutableList.of("test"), + arrayBlockOf(createUnboundedVarcharType(), "test")); + TestColumn arrayBooleanColumn = new TestColumn("column_name", + getStandardListObjectInspector(javaBooleanObjectInspector), + ImmutableList.of(true), + arrayBlockOf(BOOLEAN, true)); + TestColumn rowLongColumn = new TestColumn("column_name", + getStandardStructObjectInspector(ImmutableList.of("s_bigint"), ImmutableList.of(javaLongObjectInspector)), + new Long[] {1L}, + rowBlockOf(ImmutableList.of(BIGINT), 1)); + TestColumn nestColumn = new TestColumn("column_name", + getStandardMapObjectInspector( + javaStringObjectInspector, + getStandardListObjectInspector( + getStandardStructObjectInspector( + ImmutableList.of("s_int"), + ImmutableList.of(javaIntObjectInspector)))), + ImmutableMap.of("test", ImmutableList.of(new Integer[] {1})), + mapBlockOf(createUnboundedVarcharType(), new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER))), + "test", arrayBlockOf(RowType.anonymous(ImmutableList.of(INTEGER)), rowBlockOf(ImmutableList.of(INTEGER), 1L)))); + + HiveErrorCode expectedErrorCode = HIVE_PARTITION_SCHEMA_MISMATCH; + String expectedMessageFloatDouble = "The column column_name is declared as type double, but the Parquet file declares the column as type FLOAT"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(floatColumn)) + .withReadColumns(ImmutableList.of(doubleColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageFloatDouble); + + String expectedMessageDoubleLong = "The column column_name is declared as type bigint, but the Parquet file declares the column as type DOUBLE"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(doubleColumn)) + .withReadColumns(ImmutableList.of(longColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageDoubleLong); + + String expectedMessageFloatInt = "The column column_name is declared as type int, but the Parquet file declares the column as type FLOAT"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(floatColumn)) + .withReadColumns(ImmutableList.of(intColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageFloatInt); + + String expectedMessageIntBoolean = "The column column_name is declared as type boolean, but the Parquet file declares the column as type INT32"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(intColumn)) + .withReadColumns(ImmutableList.of(booleanColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageIntBoolean); + + String expectedMessageStringLong = "The column column_name is declared as type string, but the Parquet file declares the column as type INT64"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(longColumn)) + .withReadColumns(ImmutableList.of(stringColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageStringLong); + + String expectedMessageIntString = "The column column_name is declared as type int, but the Parquet file declares the column as type BINARY"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(stringColumn)) + .withReadColumns(ImmutableList.of(intColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageIntString); + + String expectedMessageMapLongLong = "The column column_name is declared as type map, but the Parquet file declares the column as type INT64"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(longColumn)) + .withReadColumns(ImmutableList.of(mapLongColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageMapLongLong); + + String expectedMessageMapLongMapDouble = "The column column_name is declared as type map, but the Parquet file declares the column as type optional group column_name (MAP) {\n" + + " repeated group map (MAP_KEY_VALUE) {\n" + + " required double key;\n" + + " optional double value;\n" + + " }\n" + + "}"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(mapDoubleColumn)) + .withReadColumns(ImmutableList.of(mapLongColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageMapLongMapDouble); + + String expectedMessageArrayStringArrayBoolean = "The column column_name is declared as type array, but the Parquet file declares the column as type optional group column_name (LIST) {\n" + + " repeated group bag {\n" + + " optional boolean array_element;\n" + + " }\n" + + "}"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(arrayBooleanColumn)) + .withReadColumns(ImmutableList.of(arrayStringColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageArrayStringArrayBoolean); + + String expectedMessageBooleanArrayBoolean = "The column column_name is declared as type array, but the Parquet file declares the column as type BOOLEAN"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(booleanColumn)) + .withReadColumns(ImmutableList.of(arrayBooleanColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageBooleanArrayBoolean); + + String expectedMessageRowLongLong = "The column column_name is declared as type bigint, but the Parquet file declares the column as type optional group column_name {\n" + + " optional int64 s_bigint;\n" + + "}"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(rowLongColumn)) + .withReadColumns(ImmutableList.of(longColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageRowLongLong); + + String expectedMessageMapLongRowLong = "The column column_name is declared as type struct, but the Parquet file declares the column as type optional group column_name (MAP) {\n" + + " repeated group map (MAP_KEY_VALUE) {\n" + + " required int64 key;\n" + + " optional int64 value;\n" + + " }\n" + + "}"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(mapLongColumn)) + .withReadColumns(ImmutableList.of(rowLongColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageMapLongRowLong); + + String expectedMessageRowLongNest = "The column column_name is declared as type map>>, but the Parquet file declares the column as type optional group column_name {\n" + + " optional int64 s_bigint;\n" + + "}"; + + assertThatFileFormat(PARQUET) + .withWriteColumns(ImmutableList.of(rowLongColumn)) + .withReadColumns(ImmutableList.of(nestColumn)) + .withSession(parquetPageSourceSession) + .isFailingForPageSource(new ParquetPageSourceFactory(TYPE_MANAGER, HDFS_ENVIRONMENT, STATS), expectedErrorCode, expectedMessageRowLongNest); + } + private void testCursorProvider(HiveRecordCursorProvider cursorProvider, FileSplit split, HiveStorageFormat storageFormat,