From 92cfdd1a3f4fffa6dc2256586d269e9525797dde Mon Sep 17 00:00:00 2001 From: Pratham Desai Date: Sun, 19 Jul 2020 16:10:13 -0700 Subject: [PATCH] Support benchmark reads through HivePageSource --- .../plugin/hive/benchmark/FileFormat.java | 265 +++++++++++++----- 1 file changed, 198 insertions(+), 67 deletions(-) diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java index e7389811f55e..2639d4865a3a 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java @@ -13,7 +13,9 @@ */ package io.prestosql.plugin.hive.benchmark; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.airlift.slice.OutputStreamSliceOutput; import io.prestosql.orc.OrcReaderOptions; import io.prestosql.orc.OrcWriter; @@ -25,19 +27,23 @@ import io.prestosql.parquet.writer.ParquetWriter; import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.FileFormatDataSourceStats; +import io.prestosql.plugin.hive.GenericHiveRecordCursorProvider; import io.prestosql.plugin.hive.HdfsEnvironment; import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.plugin.hive.HiveCompressionCodec; import io.prestosql.plugin.hive.HiveConfig; import io.prestosql.plugin.hive.HivePageSourceFactory; import io.prestosql.plugin.hive.HivePageSourceFactory.ReaderPageSourceWithProjections; +import io.prestosql.plugin.hive.HivePageSourceProvider; import io.prestosql.plugin.hive.HiveRecordCursorProvider; import io.prestosql.plugin.hive.HiveRecordCursorProvider.ReaderRecordCursorWithProjections; +import io.prestosql.plugin.hive.HiveSplit; import io.prestosql.plugin.hive.HiveStorageFormat; +import io.prestosql.plugin.hive.HiveTableHandle; import io.prestosql.plugin.hive.HiveType; import io.prestosql.plugin.hive.HiveTypeName; import io.prestosql.plugin.hive.RecordFileWriter; -import io.prestosql.plugin.hive.benchmark.BenchmarkHiveFileFormat.TestData; +import io.prestosql.plugin.hive.TableToPartitionMapping; import io.prestosql.plugin.hive.orc.OrcPageSourceFactory; import io.prestosql.plugin.hive.parquet.ParquetPageSourceFactory; import io.prestosql.plugin.hive.parquet.ParquetReaderConfig; @@ -49,11 +55,13 @@ import io.prestosql.rcfile.binary.BinaryRcFileEncoding; import io.prestosql.rcfile.text.TextRcFileEncoding; import io.prestosql.spi.Page; +import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorPageSource; import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.connector.RecordPageSource; import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.type.Type; +import io.prestosql.sql.planner.TestingConnectorTransactionHandle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; @@ -61,12 +69,15 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; +import java.util.stream.IntStream; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.prestosql.orc.OrcWriteValidation.OrcWriteValidationMode.BOTH; import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn; @@ -87,10 +98,15 @@ public enum FileFormat { PRESTO_RCBINARY { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() { - HivePageSourceFactory pageSourceFactory = new RcFilePageSourceFactory(TYPE_MANAGER, hdfsEnvironment, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC")); - return createPageSource(pageSourceFactory, session, targetFile, columnNames, columnTypes, HiveStorageFormat.RCBINARY); + return HiveStorageFormat.RCBINARY; + } + + @Override + public Optional getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment) + { + return Optional.of(new RcFilePageSourceFactory(TYPE_MANAGER, hdfsEnvironment, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC"))); } @Override @@ -112,10 +128,15 @@ public FormatWriter createFileFormatWriter( PRESTO_RCTEXT { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() { - HivePageSourceFactory pageSourceFactory = new RcFilePageSourceFactory(TYPE_MANAGER, hdfsEnvironment, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC")); - return createPageSource(pageSourceFactory, session, targetFile, columnNames, columnTypes, HiveStorageFormat.RCTEXT); + return HiveStorageFormat.RCTEXT; + } + + @Override + public Optional getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment) + { + return Optional.of(new RcFilePageSourceFactory(TYPE_MANAGER, hdfsEnvironment, new FileFormatDataSourceStats(), new HiveConfig().setRcfileTimeZone("UTC"))); } @Override @@ -137,10 +158,15 @@ public FormatWriter createFileFormatWriter( PRESTO_ORC { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() { - HivePageSourceFactory pageSourceFactory = new OrcPageSourceFactory(new OrcReaderOptions(), hdfsEnvironment, new FileFormatDataSourceStats(), UTC); - return createPageSource(pageSourceFactory, session, targetFile, columnNames, columnTypes, HiveStorageFormat.ORC); + return HiveStorageFormat.ORC; + } + + @Override + public Optional getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment) + { + return Optional.of(new OrcPageSourceFactory(new OrcReaderOptions(), hdfsEnvironment, new FileFormatDataSourceStats(), UTC)); } @Override @@ -162,10 +188,15 @@ public FormatWriter createFileFormatWriter( PRESTO_PARQUET { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() + { + return HiveStorageFormat.PARQUET; + } + + @Override + public Optional getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment) { - HivePageSourceFactory pageSourceFactory = new ParquetPageSourceFactory(hdfsEnvironment, new FileFormatDataSourceStats(), new ParquetReaderConfig(), new HiveConfig().setParquetTimeZone("UTC")); - return createPageSource(pageSourceFactory, session, targetFile, columnNames, columnTypes, HiveStorageFormat.PARQUET); + return Optional.of(new ParquetPageSourceFactory(hdfsEnvironment, new FileFormatDataSourceStats(), new ParquetReaderConfig(), new HiveConfig().setParquetTimeZone("UTC"))); } @Override @@ -183,10 +214,15 @@ public FormatWriter createFileFormatWriter( HIVE_RCBINARY { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() { - HiveRecordCursorProvider cursorProvider = createGenericHiveRecordCursorProvider(hdfsEnvironment); - return createPageSource(cursorProvider, session, targetFile, columnNames, columnTypes, HiveStorageFormat.RCBINARY); + return HiveStorageFormat.RCBINARY; + } + + @Override + public Optional getHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment) + { + return Optional.of(createGenericHiveRecordCursorProvider(hdfsEnvironment)); } @Override @@ -203,10 +239,15 @@ public FormatWriter createFileFormatWriter( HIVE_RCTEXT { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() { - HiveRecordCursorProvider cursorProvider = createGenericHiveRecordCursorProvider(hdfsEnvironment); - return createPageSource(cursorProvider, session, targetFile, columnNames, columnTypes, HiveStorageFormat.RCTEXT); + return HiveStorageFormat.RCTEXT; + } + + @Override + public Optional getHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment) + { + return Optional.of(createGenericHiveRecordCursorProvider(hdfsEnvironment)); } @Override @@ -223,10 +264,15 @@ public FormatWriter createFileFormatWriter( HIVE_ORC { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() + { + return HiveStorageFormat.ORC; + } + + @Override + public Optional getHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment) { - HiveRecordCursorProvider cursorProvider = createGenericHiveRecordCursorProvider(hdfsEnvironment); - return createPageSource(cursorProvider, session, targetFile, columnNames, columnTypes, HiveStorageFormat.ORC); + return Optional.of(createGenericHiveRecordCursorProvider(hdfsEnvironment)); } @Override @@ -243,10 +289,15 @@ public FormatWriter createFileFormatWriter( HIVE_PARQUET { @Override - public ConnectorPageSource createFileFormatReader(ConnectorSession session, HdfsEnvironment hdfsEnvironment, File targetFile, List columnNames, List columnTypes) + public HiveStorageFormat getFormat() + { + return HiveStorageFormat.PARQUET; + } + + @Override + public Optional getHiveRecordCursorProvider(HdfsEnvironment hdfsEnvironment) { - HivePageSourceFactory pageSourceFactory = new ParquetPageSourceFactory(hdfsEnvironment, new FileFormatDataSourceStats(), new ParquetReaderConfig(), new HiveConfig().setParquetTimeZone("UTC")); - return createPageSource(pageSourceFactory, session, targetFile, columnNames, columnTypes, HiveStorageFormat.PARQUET); + return Optional.of(createGenericHiveRecordCursorProvider(hdfsEnvironment)); } @Override @@ -266,12 +317,7 @@ public boolean supportsDate() return true; } - public abstract ConnectorPageSource createFileFormatReader( - ConnectorSession session, - HdfsEnvironment hdfsEnvironment, - File targetFile, - List columnNames, - List columnTypes); + public abstract HiveStorageFormat getFormat(); public abstract FormatWriter createFileFormatWriter( ConnectorSession session, @@ -281,6 +327,81 @@ public abstract FormatWriter createFileFormatWriter( HiveCompressionCodec compressionCodec) throws IOException; + public Optional getHivePageSourceFactory(HdfsEnvironment environment) + { + return Optional.empty(); + } + + public Optional getHiveRecordCursorProvider(HdfsEnvironment environment) + { + return Optional.empty(); + } + + public final ConnectorPageSource createFileFormatReader( + ConnectorSession session, + HdfsEnvironment hdfsEnvironment, + File targetFile, + List columnNames, + List columnTypes) + { + Optional pageSourceFactory = getHivePageSourceFactory(hdfsEnvironment); + Optional recordCursorProvider = getHiveRecordCursorProvider(hdfsEnvironment); + + checkArgument(pageSourceFactory.isPresent() ^ recordCursorProvider.isPresent()); + + if (pageSourceFactory.isPresent()) { + return createPageSource(pageSourceFactory.get(), session, targetFile, columnNames, columnTypes, getFormat()); + } + + return createPageSource(recordCursorProvider.get(), session, targetFile, columnNames, columnTypes, getFormat()); + } + + public final ConnectorPageSource createGenericReader( + ConnectorSession session, + HdfsEnvironment hdfsEnvironment, + File targetFile, + List readColumns, + List schemaColumnNames, + List schemaColumnTypes) + { + HivePageSourceProvider factory = new HivePageSourceProvider( + TYPE_MANAGER, + hdfsEnvironment, + getHivePageSourceFactory(hdfsEnvironment).map(ImmutableSet::of).orElse(ImmutableSet.of()), + getHiveRecordCursorProvider(hdfsEnvironment).map(ImmutableSet::of).orElse(ImmutableSet.of()), + new GenericHiveRecordCursorProvider(hdfsEnvironment, new HiveConfig())); + + Properties schema = createSchema(getFormat(), schemaColumnNames, schemaColumnTypes); + + HiveSplit split = new HiveSplit( + "schema_name", + "table_name", + "", + targetFile.getPath(), + 0, + targetFile.length(), + targetFile.length(), + targetFile.lastModified(), + schema, + ImmutableList.of(), + ImmutableList.of(), + OptionalInt.empty(), + false, + TableToPartitionMapping.empty(), + Optional.empty(), + false, + Optional.empty()); + + ConnectorPageSource hivePageSource = factory.createPageSource( + TestingConnectorTransactionHandle.INSTANCE, + session, split, + new HiveTableHandle("schema_name", "table_name", ImmutableMap.of(), ImmutableList.of(), Optional.empty()), + readColumns, + TupleDomain.all()); + + return hivePageSource; + } + private static final JobConf conf; static { @@ -295,32 +416,31 @@ public boolean supports(TestData testData) private static ConnectorPageSource createPageSource( HiveRecordCursorProvider cursorProvider, - ConnectorSession session, File targetFile, List columnNames, List columnTypes, HiveStorageFormat format) + ConnectorSession session, + File targetFile, + List columnNames, + List columnTypes, + HiveStorageFormat format) { - List columnHandles = new ArrayList<>(columnNames.size()); - for (int i = 0; i < columnNames.size(); i++) { - String columnName = columnNames.get(i); - Type columnType = columnTypes.get(i); - columnHandles.add(createBaseColumn(columnName, i, toHiveType(columnType), columnType, REGULAR, Optional.empty())); - } - - Optional recordCursorWithProjections = cursorProvider - .createRecordCursor( - conf, - session, - new Path(targetFile.getAbsolutePath()), - 0, - targetFile.length(), - targetFile.length(), - createSchema(format, columnNames, columnTypes), - columnHandles, - TupleDomain.all(), - TYPE_MANAGER, - false); - - checkState(recordCursorWithProjections.isPresent(), "recordCursorWithProjections is not present"); - checkState(recordCursorWithProjections.get().getProjectedReaderColumns().isEmpty(), "projections should not be required"); - + checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes should have the same size"); + + List readColumns = getBaseColumns(columnNames, columnTypes); + + Optional recordCursorWithProjections = cursorProvider.createRecordCursor( + conf, + session, + new Path(targetFile.getAbsolutePath()), + 0, + targetFile.length(), + targetFile.length(), + createSchema(format, columnNames, columnTypes), + readColumns, + TupleDomain.all(), + TYPE_MANAGER, + false); + + checkState(recordCursorWithProjections.isPresent(), "readerPageSourceWithProjections is not present"); + checkState(!recordCursorWithProjections.get().getProjectedReaderColumns().isPresent(), "projection should not be required"); return new RecordPageSource(columnTypes, recordCursorWithProjections.get().getRecordCursor()); } @@ -332,13 +452,11 @@ private static ConnectorPageSource createPageSource( List columnTypes, HiveStorageFormat format) { - List columnHandles = new ArrayList<>(columnNames.size()); - for (int i = 0; i < columnNames.size(); i++) { - String columnName = columnNames.get(i); - Type columnType = columnTypes.get(i); - columnHandles.add(createBaseColumn(columnName, i, toHiveType(columnType), columnType, REGULAR, Optional.empty())); - } + checkArgument(columnNames.size() == columnTypes.size(), "columnNames and columnTypes should have the same size"); + List readColumns = getBaseColumns(columnNames, columnTypes); + + Properties schema = createSchema(format, columnNames, columnTypes); Optional readerPageSourceWithProjections = pageSourceFactory .createPageSource( conf, @@ -347,17 +465,30 @@ private static ConnectorPageSource createPageSource( 0, targetFile.length(), targetFile.length(), - createSchema(format, columnNames, columnTypes), - columnHandles, + schema, + readColumns, TupleDomain.all(), Optional.empty()); checkState(readerPageSourceWithProjections.isPresent(), "readerPageSourceWithProjections is not present"); - checkState(readerPageSourceWithProjections.get().getProjectedReaderColumns().isEmpty(), "projection should not be required"); - + checkState(!readerPageSourceWithProjections.get().getProjectedReaderColumns().isPresent(), "projection should not be required"); return readerPageSourceWithProjections.get().getConnectorPageSource(); } + private static List getBaseColumns(List columnNames, List columnTypes) + { + return IntStream.range(0, columnNames.size()) + .boxed() + .map(index -> createBaseColumn( + columnNames.get(index), + index, + toHiveType(columnTypes.get(index)), + columnTypes.get(index), + REGULAR, + Optional.empty())) + .collect(toImmutableList()); + } + private static class RecordFormatWriter implements FormatWriter {