From b142e8a754c2559f90a720edd9357b8bd834fc22 Mon Sep 17 00:00:00 2001 From: Xingyuan Lin Date: Thu, 19 Sep 2019 16:34:32 -0700 Subject: [PATCH] Decouple OrcPageSource from HiveColumnHandle --- .../plugin/hive/orc/OrcPageSource.java | 43 +++++-------------- .../plugin/hive/orc/OrcPageSourceFactory.java | 11 ++--- 2 files changed, 17 insertions(+), 37 deletions(-) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSource.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSource.java index cf713e86a7a0..0cc760c2c842 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSource.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSource.java @@ -13,13 +13,11 @@ */ package io.prestosql.plugin.hive.orc; -import com.google.common.collect.ImmutableList; import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.orc.OrcCorruptionException; import io.prestosql.orc.OrcDataSource; import io.prestosql.orc.OrcRecordReader; import io.prestosql.plugin.hive.FileFormatDataSourceStats; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.spi.Page; import io.prestosql.spi.PrestoException; import io.prestosql.spi.block.Block; @@ -28,16 +26,14 @@ import io.prestosql.spi.block.RunLengthEncodedBlock; import io.prestosql.spi.connector.ConnectorPageSource; import io.prestosql.spi.type.Type; -import io.prestosql.spi.type.TypeManager; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.List; +import java.util.Map; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static io.prestosql.orc.OrcReader.MAX_BATCH_SIZE; -import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; import static java.lang.String.format; @@ -46,13 +42,9 @@ public class OrcPageSource implements ConnectorPageSource { - private static final int NULL_ENTRY_SIZE = 0; private final OrcRecordReader recordReader; private final OrcDataSource orcDataSource; - private final List columnNames; - private final List types; - private final Block[] constantBlocks; private final int[] hiveColumnIndexes; @@ -66,41 +58,29 @@ public class OrcPageSource public OrcPageSource( OrcRecordReader recordReader, OrcDataSource orcDataSource, - List columns, - TypeManager typeManager, + Map includedColumns, AggregatedMemoryContext systemMemoryContext, FileFormatDataSourceStats stats) { this.recordReader = requireNonNull(recordReader, "recordReader is null"); this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null"); - int size = requireNonNull(columns, "columns is null").size(); + int size = requireNonNull(includedColumns, "includedColumns is null").size(); this.stats = requireNonNull(stats, "stats is null"); this.constantBlocks = new Block[size]; this.hiveColumnIndexes = new int[size]; - ImmutableList.Builder namesBuilder = ImmutableList.builder(); - ImmutableList.Builder typesBuilder = ImmutableList.builder(); - for (int columnIndex = 0; columnIndex < columns.size(); columnIndex++) { - HiveColumnHandle column = columns.get(columnIndex); - checkState(column.getColumnType() == REGULAR, "column type must be regular"); - - String name = column.getName(); - Type type = typeManager.getType(column.getTypeSignature()); - - namesBuilder.add(name); - typesBuilder.add(type); - - hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex(); - - if (!recordReader.isColumnPresent(column.getHiveColumnIndex())) { - constantBlocks[columnIndex] = RunLengthEncodedBlock.create(type, null, MAX_BATCH_SIZE); + int blockIndex = 0; + for (Map.Entry entry : includedColumns.entrySet()) { + hiveColumnIndexes[blockIndex] = entry.getKey(); + if (!recordReader.isColumnPresent(hiveColumnIndexes[blockIndex])) { + Type type = entry.getValue(); + constantBlocks[blockIndex] = RunLengthEncodedBlock.create(type, null, MAX_BATCH_SIZE); } + blockIndex++; } - types = typesBuilder.build(); - columnNames = namesBuilder.build(); this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null"); } @@ -181,8 +161,7 @@ public void close() public String toString() { return toStringHelper(this) - .add("columnNames", columnNames) - .add("types", types) + .add("hiveColumnIndexes", hiveColumnIndexes) .toString(); } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java index ee77536f00a0..bf9ea4ab5909 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcPageSourceFactory.java @@ -190,20 +190,22 @@ public static OrcPageSource createOrcPageSource( OrcReader reader = new OrcReader(orcDataSource, maxMergeDistance, tinyStripeThreshold, maxReadBlockSize); List physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader, path); - ImmutableMap.Builder includedColumns = ImmutableMap.builder(); + ImmutableMap.Builder includedColumnsBuilder = ImmutableMap.builder(); ImmutableList.Builder> columnReferences = ImmutableList.builder(); for (HiveColumnHandle column : physicalColumns) { if (column.getColumnType() == REGULAR) { Type type = typeManager.getType(column.getTypeSignature()); - includedColumns.put(column.getHiveColumnIndex(), type); + includedColumnsBuilder.put(column.getHiveColumnIndex(), type); columnReferences.add(new ColumnReference<>(column, column.getHiveColumnIndex(), type)); } } + ImmutableMap includedColumns = includedColumnsBuilder.build(); + OrcPredicate predicate = new TupleDomainOrcPredicate<>(effectivePredicate, columnReferences.build(), orcBloomFiltersEnabled); OrcRecordReader recordReader = reader.createRecordReader( - includedColumns.build(), + includedColumns, predicate, start, length, @@ -214,8 +216,7 @@ public static OrcPageSource createOrcPageSource( return new OrcPageSource( recordReader, orcDataSource, - physicalColumns, - typeManager, + includedColumns, systemMemoryUsage, stats); }