Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subfield pruning in Parquet #13271

Merged
merged 1 commit into from
Sep 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ParquetPageSource(
typesBuilder.add(type);
hiveColumnIndexes[columnIndex] = column.getHiveColumnIndex();

if (getParquetType(type, fileSchema, useParquetColumnNames, column.getName(), column.getHiveColumnIndex(), column.getHiveType()).isPresent()) {
if (getParquetType(type, fileSchema, useParquetColumnNames, column).isPresent()) {
String columnName = useParquetColumnNames ? name : fileSchema.getFields().get(column.getHiveColumnIndex()).getName();
fieldsBuilder.add(constructField(type, lookupColumnByName(messageColumnIO, columnName)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveBatchPageSourceFactory;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.memory.context.AggregatedMemoryContext;
import com.facebook.presto.parquet.ParquetCorruptionException;
import com.facebook.presto.parquet.ParquetDataSource;
Expand All @@ -28,6 +27,7 @@
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.Subfield;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.StandardTypes;
Expand Down Expand Up @@ -74,6 +74,7 @@
import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO;
import static com.facebook.presto.parquet.ParquetTypeUtils.getDescriptors;
import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetTypeByName;
import static com.facebook.presto.parquet.ParquetTypeUtils.getSubfieldType;
import static com.facebook.presto.parquet.predicate.PredicateUtils.buildPredicate;
import static com.facebook.presto.parquet.predicate.PredicateUtils.predicateMatches;
import static com.facebook.presto.spi.type.StandardTypes.ARRAY;
Expand All @@ -94,7 +95,6 @@
import static com.google.common.base.Strings.nullToEmpty;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
Expand Down Expand Up @@ -186,14 +186,15 @@ public static ParquetPageSource createParquetPageSource(
MessageType fileSchema = fileMetaData.getSchema();
dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, stats);

List<org.apache.parquet.schema.Type> fields = columns.stream()
Optional<MessageType> message = columns.stream()
.filter(column -> column.getColumnType() == REGULAR)
.map(column -> getParquetType(typeManager.getType(column.getTypeSignature()), fileSchema, useParquetColumnNames, column.getName(), column.getHiveColumnIndex(), column.getHiveType()))
.map(column -> getColumnType(typeManager.getType(column.getTypeSignature()), fileSchema, useParquetColumnNames, column))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toList());
.map(type -> new MessageType(fileSchema.getName(), type))
.reduce(MessageType::union);

MessageType requestedSchema = new MessageType(fileSchema.getName(), fields);
MessageType requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of()));

ImmutableList.Builder<BlockMetaData> footerBlocks = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
Expand Down Expand Up @@ -278,14 +279,14 @@ public static TupleDomain<ColumnDescriptor> getParquetTupleDomain(Map<List<Strin
return TupleDomain.withColumnDomains(predicate.build());
}

public static Optional<org.apache.parquet.schema.Type> getParquetType(Type prestoType, MessageType messageType, boolean useParquetColumnNames, String columnName, int columnHiveIndex, HiveType hiveType)
public static Optional<org.apache.parquet.schema.Type> getParquetType(Type prestoType, MessageType messageType, boolean useParquetColumnNames, HiveColumnHandle column)
{
org.apache.parquet.schema.Type type = null;
if (useParquetColumnNames) {
type = getParquetTypeByName(columnName, messageType);
type = getParquetTypeByName(column.getName(), messageType);
}
else if (columnHiveIndex < messageType.getFieldCount()) {
type = messageType.getType(columnHiveIndex);
else if (column.getHiveColumnIndex() < messageType.getFieldCount()) {
type = messageType.getType(column.getHiveColumnIndex());
}

if (type == null) {
Expand All @@ -304,8 +305,8 @@ else if (columnHiveIndex < messageType.getFieldCount()) {
parquetTypeName = builder.toString();
}
throw new PrestoException(HIVE_PARTITION_SCHEMA_MISMATCH, format("The column %s is declared as type %s, but the Parquet file declares the column as type %s",
columnName,
hiveType,
column.getName(),
column.getHiveType(),
parquetTypeName));
}
return Optional.of(type);
Expand Down Expand Up @@ -390,4 +391,22 @@ private static boolean checkSchemaMatch(org.apache.parquet.schema.Type parquetTy
throw new IllegalArgumentException("Unexpected parquet type name: " + parquetTypeName);
}
}

public static Optional<org.apache.parquet.schema.Type> getColumnType(Type prestoType, MessageType messageType, boolean useParquetColumnNames, HiveColumnHandle column)
{
if (useParquetColumnNames && !column.getRequiredSubfields().isEmpty()) {
MessageType result = null;
for (Subfield subfield : column.getRequiredSubfields()) {
MessageType type = getSubfieldType(messageType, subfield);
if (result == null) {
result = type;
}
else {
result = result.union(type);
}
}
return Optional.of(result);
}
return getParquetType(prestoType, messageType, useParquetColumnNames, column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,19 @@ public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> ta
.put("hive.collect-column-statistics-on-write", "true")
.put("hive.temporary-table-schema", TEMPORARY_TABLE_SCHEMA)
.build();

Map<String, String> storageProperties = extraHiveProperties.containsKey("hive.storage-format") ?
ImmutableMap.copyOf(hiveProperties) :
ImmutableMap.<String, String>builder()
.putAll(hiveProperties)
.put("hive.storage-format", "TEXTFILE")
.put("hive.compression-codec", "NONE")
.build();

Map<String, String> hiveBucketedProperties = ImmutableMap.<String, String>builder()
.putAll(hiveProperties)
.putAll(storageProperties)
.put("hive.max-initial-split-size", "10kB") // so that each bucket has multiple splits
.put("hive.max-split-size", "10kB") // so that each bucket has multiple splits
.put("hive.storage-format", "TEXTFILE") // so that there's no minimum split size for the file
.put("hive.compression-codec", "NONE") // so that the file is splittable
.build();
queryRunner.createCatalog(HIVE_CATALOG, HIVE_CATALOG, hiveProperties);
queryRunner.createCatalog(HIVE_BUCKETED_CATALOG, HIVE_CATALOG, hiveBucketedProperties);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestDistributedQueries;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.hive.HiveQueryRunner.createQueryRunner;
import static com.facebook.presto.sql.tree.ExplainType.Type.LOGICAL;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.tpch.TpchTable.getTables;
import static org.testng.Assert.assertEquals;

public class TestParquetDistributedQueries
extends AbstractTestDistributedQueries
{
protected TestParquetDistributedQueries()
{
super(TestParquetDistributedQueries::createQueryRunner);
}

private static QueryRunner createQueryRunner()
throws Exception
{
Map<String, String> parquetProperties = ImmutableMap.<String, String>builder()
.put("hive.storage-format", "PARQUET")
.put("hive.parquet.use-column-names", "true")
.put("hive.compression-codec", "GZIP")
.build();
return HiveQueryRunner.createQueryRunner(getTables(),
ImmutableMap.of("experimental.pushdown-subfields-enabled", "true"),
"sql-standard",
parquetProperties,
Optional.empty());
}

@Test
public void testSubfieldPruning()
{
getQueryRunner().execute("CREATE TABLE test_subfield_pruning AS " +
"SELECT orderkey, linenumber, shipdate, " +
" CAST(ROW(orderkey, linenumber, ROW(day(shipdate), month(shipdate), year(shipdate))) " +
" AS ROW(orderkey BIGINT, linenumber INTEGER, shipdate ROW(ship_day TINYINT, ship_month TINYINT, ship_year INTEGER))) AS info " +
"FROM lineitem");

try {
assertQuery("SELECT info.orderkey, info.shipdate.ship_month FROM test_subfield_pruning", "SELECT orderkey, month(shipdate) FROM lineitem");

assertQuery("SELECT orderkey FROM test_subfield_pruning WHERE info.shipdate.ship_month % 2 = 0", "SELECT orderkey FROM lineitem WHERE month(shipdate) % 2 = 0");
}
finally {
getQueryRunner().execute("DROP TABLE test_subfield_pruning");
}
}

@Override
protected boolean supportsNotNullColumns()
{
return false;
}

@Override
public void testDelete()
{
// Hive connector currently does not support row-by-row delete
}

@Override
public void testRenameColumn()
{
// Parquet field lookup use column name does not support Rename
}

@Test
public void testExplainOfCreateTableAs()
{
String query = "CREATE TABLE copy_orders AS SELECT * FROM orders";
MaterializedResult result = computeActual("EXPLAIN " + query);
assertEquals(getOnlyElement(result.getOnlyColumnAsSet()), getExplainPlan(query, LOGICAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
*/
package com.facebook.presto.parquet;

import com.facebook.presto.spi.Subfield;
import com.facebook.presto.spi.Subfield.PathElement;
import com.facebook.presto.spi.type.DecimalType;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.ColumnIOFactory;
Expand All @@ -24,6 +27,7 @@
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;

import java.util.Arrays;
Expand All @@ -33,6 +37,7 @@
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.spi.Subfield.NestedField;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
Expand Down Expand Up @@ -184,7 +189,7 @@ public static ParquetEncoding getParquetEncoding(Encoding encoding)
}
}

public static org.apache.parquet.schema.Type getParquetTypeByName(String columnName, MessageType messageType)
public static org.apache.parquet.schema.Type getParquetTypeByName(String columnName, GroupType messageType)
{
if (messageType.containsField(columnName)) {
return messageType.getType(columnName);
Expand Down Expand Up @@ -261,4 +266,38 @@ public static long getShortDecimalValue(byte[] bytes)

return value;
}

public static MessageType getSubfieldType(GroupType baseType, Subfield subfield)
{
checkArgument(subfield.getPath().size() >= 1, "subfield size is less than 1");

ImmutableList.Builder<org.apache.parquet.schema.Type> typeBuilder = ImmutableList.builder();
org.apache.parquet.schema.Type parentType = getParquetTypeByName(subfield.getRootName(), baseType);

for (PathElement field : subfield.getPath()) {
if (field instanceof NestedField) {
NestedField nestedField = (NestedField) field;
org.apache.parquet.schema.Type childType = getParquetTypeByName(nestedField.getName(), parentType.asGroupType());
if (childType != null) {
typeBuilder.add(childType);
parentType = childType;
}
}
else {
typeBuilder.add(parentType.asGroupType().getFields().get(0));
break;
}
}

List<org.apache.parquet.schema.Type> subfieldTypes = typeBuilder.build();
if (subfieldTypes.isEmpty()) {
return new MessageType(subfield.getRootName(), ImmutableList.of());
}
org.apache.parquet.schema.Type type = subfieldTypes.get(subfieldTypes.size() - 1);
for (int i = subfieldTypes.size() - 2; i >= 0; --i) {
GroupType groupType = subfieldTypes.get(i).asGroupType();
type = new MessageType(groupType.getName(), ImmutableList.of(type));
}
return new MessageType(subfield.getRootName(), ImmutableList.of(type));
}
}