Skip to content

Commit

Permalink
Remove unnecessary predication pushdown
Browse files Browse the repository at this point in the history
The parameter constraint in PartitionTable#cursor is targetted at
columns in the final partition table, rather than the original
data table.
  • Loading branch information
lxynov authored and electrum committed Oct 23, 2019
1 parent 3763165 commit f598a5e
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Iceber
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());
switch (table.getTableType()) {
case PARTITIONS:
return Optional.of(new PartitionTable(table, session, typeManager, icebergTable));
return Optional.of(new PartitionTable(table, typeManager, icebergTable));
case HISTORY:
return Optional.of(new HistoryTable(table.getSchemaTableNameWithType(), icebergTable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,25 +107,6 @@ public static List<HiveColumnHandle> getColumns(Schema schema, PartitionSpec spe
return builder.build();
}

public static List<HiveColumnHandle> getPartitionColumns(Schema schema, PartitionSpec spec, TypeManager typeManager)
{
List<PartitionField> partitionFields = ImmutableList.copyOf(getIdentityPartitions(spec).keySet());

int columnIndex = 0;
ImmutableList.Builder<HiveColumnHandle> builder = ImmutableList.builder();

for (PartitionField partitionField : partitionFields) {
Type sourceType = schema.findType(partitionField.sourceId());
Type type = partitionField.transform().getResultType(sourceType);
io.prestosql.spi.type.Type prestoType = toPrestoType(type, typeManager);
HiveType hiveType = toHiveType(TYPE_TRANSLATOR, coerceForHive(prestoType));
HiveColumnHandle columnHandle = new HiveColumnHandle(partitionField.name(), hiveType, prestoType, columnIndex, PARTITION_KEY, Optional.empty());
columnIndex++;
builder.add(columnHandle);
}
return builder.build();
}

public static io.prestosql.spi.type.Type coerceForHive(io.prestosql.spi.type.Type prestoType)
{
if (prestoType.equals(TIMESTAMP_WITH_TIME_ZONE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.classloader.ThreadContextClassLoader;
Expand Down Expand Up @@ -59,23 +58,21 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan;
import static io.prestosql.plugin.iceberg.TypeConveter.toPrestoType;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toSet;

public class PartitionTable
implements SystemTable
{
private final IcebergTableHandle tableHandle;
private final ConnectorSession session;
private final TypeManager typeManager;
private final Table icebergTable;
private Map<Integer, Type.PrimitiveType> idToTypeMapping;
Expand All @@ -84,10 +81,9 @@ public class PartitionTable
private List<io.prestosql.spi.type.Type> resultTypes;
private List<RowType> columnMetricTypes;

public PartitionTable(IcebergTableHandle tableHandle, ConnectorSession session, TypeManager typeManager, Table icebergTable)
public PartitionTable(IcebergTableHandle tableHandle, TypeManager typeManager, Table icebergTable)
{
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
this.session = requireNonNull(session, "session is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
}
Expand Down Expand Up @@ -163,7 +159,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
{
// TODO instead of cursor use pageSource method.
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
TableScan tableScan = getTableScan(constraint);
TableScan tableScan = getTableScan(session, TupleDomain.all(), tableHandle.getSnapshotId(), icebergTable).includeColumnStats();
Map<StructLikeWrapper, Partition> partitions = getPartitions(tableScan);
return buildRecordCursor(partitions, icebergTable.spec().fields());
}
Expand Down Expand Up @@ -269,17 +265,6 @@ private static Block getColumnMetricBlock(RowType columnMetricType, Object min,
return columnMetricType.getObject(rowBlockBuilder, 0);
}

private TableScan getTableScan(TupleDomain<Integer> constraint)
{
List<HiveColumnHandle> partitionColumns = IcebergUtil.getPartitionColumns(icebergTable.schema(), icebergTable.spec(), typeManager);
Map<Integer, HiveColumnHandle> fieldIdToColumnHandle = IntStream.range(0, partitionColumnTypes.size())
.boxed()
.collect(Collectors.toMap(identity(), partitionColumns::get));
TupleDomain<HiveColumnHandle> predicates = constraint.transform(fieldIdToColumnHandle::get);

return IcebergUtil.getTableScan(session, predicates, tableHandle.getSnapshotId(), icebergTable).includeColumnStats();
}

private Map<Integer, Object> toMap(Map<Integer, ByteBuffer> idToMetricMap)
{
ImmutableMap.Builder<Integer, Object> map = ImmutableMap.builder();
Expand Down

0 comments on commit f598a5e

Please sign in to comment.