Skip to content

Commit

Permalink
Decouple OrcPageSource from HiveColumnHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
lxynov authored and electrum committed Sep 23, 2019
1 parent eb8a065 commit b142e8a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
*/
package io.prestosql.plugin.hive.orc;

import com.google.common.collect.ImmutableList;
import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.orc.OrcCorruptionException;
import io.prestosql.orc.OrcDataSource;
import io.prestosql.orc.OrcRecordReader;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
Expand All @@ -28,16 +26,14 @@
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static io.prestosql.orc.OrcReader.MAX_BATCH_SIZE;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR;
import static java.lang.String.format;
Expand All @@ -46,13 +42,9 @@
public class OrcPageSource
implements ConnectorPageSource
{
private static final int NULL_ENTRY_SIZE = 0;
private final OrcRecordReader recordReader;
private final OrcDataSource orcDataSource;

private final List<String> columnNames;
private final List<Type> types;

private final Block[] constantBlocks;
private final int[] hiveColumnIndexes;

Expand All @@ -66,41 +58,29 @@ public class OrcPageSource
public OrcPageSource(
OrcRecordReader recordReader,
OrcDataSource orcDataSource,
List<HiveColumnHandle> columns,
TypeManager typeManager,
Map<Integer, Type> includedColumns,
AggregatedMemoryContext systemMemoryContext,
FileFormatDataSourceStats stats)
{
this.recordReader = requireNonNull(recordReader, "recordReader is null");
this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null");

int size = requireNonNull(columns, "columns is null").size();
int size = requireNonNull(includedColumns, "includedColumns is null").size();

this.stats = requireNonNull(stats, "stats is null");

this.constantBlocks = new Block[size];
this.hiveColumnIndexes = new int[size];

ImmutableList.Builder<String> namesBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
for (int columnIndex = 0; columnIndex < columns.size(); columnIndex++) {
HiveColumnHandle column = columns.get(columnIndex);
checkState(column.getColumnType() == REGULAR, "column type must be regular");

String name = column.getName();
Type type = typeManager.getType(column.getTypeSignature());

namesBuilder.add(name);
typesBuilder.add(type);

hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex();

if (!recordReader.isColumnPresent(column.getHiveColumnIndex())) {
constantBlocks[columnIndex] = RunLengthEncodedBlock.create(type, null, MAX_BATCH_SIZE);
int blockIndex = 0;
for (Map.Entry<Integer, Type> entry : includedColumns.entrySet()) {
hiveColumnIndexes[blockIndex] = entry.getKey();
if (!recordReader.isColumnPresent(hiveColumnIndexes[blockIndex])) {
Type type = entry.getValue();
constantBlocks[blockIndex] = RunLengthEncodedBlock.create(type, null, MAX_BATCH_SIZE);
}
blockIndex++;
}
types = typesBuilder.build();
columnNames = namesBuilder.build();

this.systemMemoryContext = requireNonNull(systemMemoryContext, "systemMemoryContext is null");
}
Expand Down Expand Up @@ -181,8 +161,7 @@ public void close()
public String toString()
{
return toStringHelper(this)
.add("columnNames", columnNames)
.add("types", types)
.add("hiveColumnIndexes", hiveColumnIndexes)
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,20 +190,22 @@ public static OrcPageSource createOrcPageSource(
OrcReader reader = new OrcReader(orcDataSource, maxMergeDistance, tinyStripeThreshold, maxReadBlockSize);

List<HiveColumnHandle> physicalColumns = getPhysicalHiveColumnHandles(columns, useOrcColumnNames, reader, path);
ImmutableMap.Builder<Integer, Type> includedColumns = ImmutableMap.builder();
ImmutableMap.Builder<Integer, Type> includedColumnsBuilder = ImmutableMap.builder();
ImmutableList.Builder<ColumnReference<HiveColumnHandle>> columnReferences = ImmutableList.builder();
for (HiveColumnHandle column : physicalColumns) {
if (column.getColumnType() == REGULAR) {
Type type = typeManager.getType(column.getTypeSignature());
includedColumns.put(column.getHiveColumnIndex(), type);
includedColumnsBuilder.put(column.getHiveColumnIndex(), type);
columnReferences.add(new ColumnReference<>(column, column.getHiveColumnIndex(), type));
}
}

ImmutableMap<Integer, Type> includedColumns = includedColumnsBuilder.build();

OrcPredicate predicate = new TupleDomainOrcPredicate<>(effectivePredicate, columnReferences.build(), orcBloomFiltersEnabled);

OrcRecordReader recordReader = reader.createRecordReader(
includedColumns.build(),
includedColumns,
predicate,
start,
length,
Expand All @@ -214,8 +216,7 @@ public static OrcPageSource createOrcPageSource(
return new OrcPageSource(
recordReader,
orcDataSource,
physicalColumns,
typeManager,
includedColumns,
systemMemoryUsage,
stats);
}
Expand Down

0 comments on commit b142e8a

Please sign in to comment.