Skip to content

Commit

Permalink
Add hidden $partition column to Hive connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 27, 2020
1 parent 8444bd0 commit 5b6eaa9
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 5 deletions.
3 changes: 3 additions & 0 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,9 @@ directly or used in conditional statements.
* ``$file_size``
Size of the file for this row.

* ``$partition``
Partition name for this row.

Special Tables
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public class HiveColumnHandle
public static final HiveType FILE_MODIFIED_TIME_TYPE = HiveType.HIVE_TIMESTAMP;
public static final Type FILE_MODIFIED_TIME_TYPE_SIGNATURE = TIMESTAMP_WITH_TIME_ZONE;

public static final int PARTITION_COLUMN_INDEX = -15;
public static final String PARTITION_COLUMN_NAME = "$partition";
public static final HiveType PARTITION_HIVE_TYPE = HIVE_STRING;
public static final Type PARTITION_TYPE_SIGNATURE = VARCHAR;

private static final String UPDATE_ROW_ID_COLUMN_NAME = "$shard_row_id";

public enum ColumnType
Expand Down Expand Up @@ -274,6 +279,11 @@ public static HiveColumnHandle fileModifiedTimeColumnHandle()
return createBaseColumn(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_INDEX, FILE_MODIFIED_TIME_TYPE, FILE_MODIFIED_TIME_TYPE_SIGNATURE, SYNTHESIZED, Optional.empty());
}

public static HiveColumnHandle partitionColumnHandle()
{
return createBaseColumn(PARTITION_COLUMN_NAME, PARTITION_COLUMN_INDEX, PARTITION_HIVE_TYPE, PARTITION_TYPE_SIGNATURE, SYNTHESIZED, Optional.empty());
}

public static boolean isPathColumnHandle(HiveColumnHandle column)
{
return column.getBaseHiveColumnIndex() == PATH_COLUMN_INDEX;
Expand All @@ -293,4 +303,9 @@ public static boolean isFileModifiedTimeColumnHandle(HiveColumnHandle column)
{
return column.getBaseHiveColumnIndex() == FILE_MODIFIED_TIME_COLUMN_INDEX;
}

public static boolean isPartitionColumnHandle(HiveColumnHandle column)
{
return column.getBaseHiveColumnIndex() == PARTITION_COLUMN_INDEX;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.SYNTHESIZED;
import static io.prestosql.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.PARTITION_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.prestosql.plugin.hive.HiveColumnHandle.updateRowIdHandle;
Expand Down Expand Up @@ -2595,6 +2596,9 @@ private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(T
}
builder.put(FILE_SIZE_COLUMN_NAME, Optional.empty());
builder.put(FILE_MODIFIED_TIME_COLUMN_NAME, Optional.empty());
if (!table.getPartitionColumns().isEmpty()) {
builder.put(PARTITION_COLUMN_NAME, Optional.empty());
}

Map<String, Optional<String>> columnComment = builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
hiveSplit.getSchema(),
hiveTable.getCompactEffectivePredicate().intersect(dynamicFilter.transform(HiveColumnHandle.class::cast).simplify()),
hiveColumns,
hiveSplit.getPartitionName(),
hiveSplit.getPartitionKeys(),
hiveStorageTimeZone,
typeManager,
Expand Down Expand Up @@ -143,6 +144,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
Properties schema,
TupleDomain<HiveColumnHandle> effectivePredicate,
List<HiveColumnHandle> columns,
String partitionName,
List<HivePartitionKey> partitionKeys,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
Expand All @@ -156,6 +158,7 @@ public static Optional<ConnectorPageSource> createHivePageSource(
}

List<ColumnMapping> columnMappings = ColumnMapping.buildColumnMappings(
partitionName,
partitionKeys,
columns,
bucketConversion.map(BucketConversion::getBucketColumnHandles).orElse(ImmutableList.of()),
Expand Down Expand Up @@ -344,6 +347,7 @@ public Optional<HiveType> getBaseTypeCoercionFrom()
}

public static List<ColumnMapping> buildColumnMappings(
String partitionName,
List<HivePartitionKey> partitionKeys,
List<HiveColumnHandle> columns,
List<HiveColumnHandle> requiredInterimColumns,
Expand Down Expand Up @@ -388,7 +392,7 @@ public static List<ColumnMapping> buildColumnMappings(
else {
columnMappings.add(prefilled(
column,
getPrefilledColumnValue(column, partitionKeysByName.get(column.getName()), path, bucketNumber, fileSize, fileModifiedTime, hiveStorageTimeZone),
getPrefilledColumnValue(column, partitionKeysByName.get(column.getName()), path, bucketNumber, fileSize, fileModifiedTime, hiveStorageTimeZone, partitionName),
baseTypeCoercionFrom));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.isBucketColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.isFileModifiedTimeColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.isFileSizeColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.isPartitionColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.isPathColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.partitionColumnHandle;
import static io.prestosql.plugin.hive.HiveColumnHandle.pathColumnHandle;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
Expand Down Expand Up @@ -900,6 +902,9 @@ public static List<HiveColumnHandle> hiveColumnHandles(Table table, TypeManager
}
columns.add(fileSizeColumnHandle());
columns.add(fileModifiedTimeColumnHandle());
if (!table.getPartitionColumns().isEmpty()) {
columns.add(partitionColumnHandle());
}

return columns.build();
}
Expand Down Expand Up @@ -980,7 +985,8 @@ public static String getPrefilledColumnValue(
OptionalInt bucketNumber,
long fileSize,
long fileModifiedTime,
DateTimeZone hiveStorageTimeZone)
DateTimeZone hiveStorageTimeZone,
String partitionName)
{
if (partitionKey != null) {
return partitionKey.getValue();
Expand All @@ -997,6 +1003,9 @@ public static String getPrefilledColumnValue(
if (isFileModifiedTimeColumnHandle(columnHandle)) {
return HIVE_TIMESTAMP_PARSER.withZone(hiveStorageTimeZone).print(fileModifiedTime);
}
if (isPartitionColumnHandle(columnHandle)) {
return partitionName;
}
throw new PrestoException(NOT_SUPPORTED, "unsupported hidden column: " + columnHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static io.prestosql.plugin.hive.HiveTestUtils.getHiveSession;
import static io.prestosql.plugin.hive.HiveTestUtils.getTypes;
import static io.prestosql.testing.StructuralTestUtil.rowBlockOf;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT;
Expand Down Expand Up @@ -914,6 +915,10 @@ private ConnectorPageSource createPageSourceFromCursorProvider(
.map(input -> new HivePartitionKey(input.getName(), (String) input.getWriteValue()))
.collect(toList());

String partitionName = String.join("/", partitionKeys.stream()
.map(partitionKey -> format("%s=%s", partitionKey.getName(), partitionKey.getValue()))
.collect(toImmutableList()));

Configuration configuration = new Configuration(false);
configuration.set("io.compression.codecs", LzoCodec.class.getName() + "," + LzopCodec.class.getName());
Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
Expand All @@ -930,6 +935,7 @@ private ConnectorPageSource createPageSourceFromCursorProvider(
splitProperties,
TupleDomain.all(),
getColumnHandles(testReadColumns),
partitionName,
partitionKeys,
DateTimeZone.getDefault(),
TYPE_MANAGER,
Expand Down Expand Up @@ -976,6 +982,10 @@ private void testPageSourceFactory(
.map(input -> new HivePartitionKey(input.getName(), (String) input.getWriteValue()))
.collect(toList());

String partitionName = String.join("/", partitionKeys.stream()
.map(partitionKey -> format("%s=%s", partitionKey.getName(), partitionKey.getValue()))
.collect(toImmutableList()));

List<HiveColumnHandle> columnHandles = getColumnHandles(testReadColumns);

Optional<ConnectorPageSource> pageSource = HivePageSourceProvider.createHivePageSource(
Expand All @@ -992,6 +1002,7 @@ private void testPageSourceFactory(
splitProperties,
TupleDomain.all(),
columnHandles,
partitionName,
partitionKeys,
DateTimeZone.getDefault(),
TYPE_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.FILE_MODIFIED_TIME_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.FILE_SIZE_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.PARTITION_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveColumnHandle.PATH_COLUMN_NAME;
import static io.prestosql.plugin.hive.HiveQueryRunner.HIVE_CATALOG;
import static io.prestosql.plugin.hive.HiveQueryRunner.TPCH_SCHEMA;
Expand Down Expand Up @@ -3611,7 +3612,7 @@ private void testPathHiddenColumn(Session session, HiveStorageFormat storageForm
TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, "test_path");
assertEquals(tableMetadata.getMetadata().getProperties().get(STORAGE_FORMAT_PROPERTY), storageFormat);

List<String> columnNames = ImmutableList.of("col0", "col1", PATH_COLUMN_NAME, FILE_SIZE_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_NAME);
List<String> columnNames = ImmutableList.of("col0", "col1", PATH_COLUMN_NAME, FILE_SIZE_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_NAME, PARTITION_COLUMN_NAME);
List<ColumnMetadata> columnMetadatas = tableMetadata.getColumns();
assertEquals(columnMetadatas.size(), columnNames.size());
for (int i = 0; i < columnMetadatas.size(); i++) {
Expand Down Expand Up @@ -3719,7 +3720,7 @@ public void testFileSizeHiddenColumn()

TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, "test_file_size");

List<String> columnNames = ImmutableList.of("col0", "col1", PATH_COLUMN_NAME, FILE_SIZE_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_NAME);
List<String> columnNames = ImmutableList.of("col0", "col1", PATH_COLUMN_NAME, FILE_SIZE_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_NAME, PARTITION_COLUMN_NAME);
List<ColumnMetadata> columnMetadatas = tableMetadata.getColumns();
assertEquals(columnMetadatas.size(), columnNames.size());
for (int i = 0; i < columnMetadatas.size(); i++) {
Expand Down Expand Up @@ -3772,7 +3773,7 @@ public void testFileModifiedTimeHiddenColumn()

TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, "test_file_modified_time");

List<String> columnNames = ImmutableList.of("col0", "col1", PATH_COLUMN_NAME, FILE_SIZE_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_NAME);
List<String> columnNames = ImmutableList.of("col0", "col1", PATH_COLUMN_NAME, FILE_SIZE_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_NAME, PARTITION_COLUMN_NAME);
List<ColumnMetadata> columnMetadatas = tableMetadata.getColumns();
assertEquals(columnMetadatas.size(), columnNames.size());
for (int i = 0; i < columnMetadatas.size(); i++) {
Expand Down Expand Up @@ -3806,6 +3807,47 @@ public void testFileModifiedTimeHiddenColumn()
assertUpdate("DROP TABLE test_file_modified_time");
}

@Test
public void testPartitionHiddenColumn()
{
@Language("SQL") String createTable = "CREATE TABLE test_partition_hidden_column " +
"WITH (" +
"partitioned_by = ARRAY['col1', 'col2']" +
") AS " +
"SELECT * FROM (VALUES " +
"(0, 11, 21), (1, 12, 22), (2, 13, 23), " +
"(3, 14, 24), (4, 15, 25), (5, 16, 26), " +
"(6, 17, 27), (7, 18, 28), (8, 19, 29)" +
" ) t (col0, col1, col2) ";
assertUpdate(createTable, 9);
assertTrue(getQueryRunner().tableExists(getSession(), "test_partition_hidden_column"));

TableMetadata tableMetadata = getTableMetadata(catalog, TPCH_SCHEMA, "test_partition_hidden_column");
assertEquals(tableMetadata.getMetadata().getProperties().get(PARTITIONED_BY_PROPERTY), ImmutableList.of("col1", "col2"));

List<String> columnNames = ImmutableList.of("col0", "col1", "col2", PATH_COLUMN_NAME, FILE_SIZE_COLUMN_NAME, FILE_MODIFIED_TIME_COLUMN_NAME, PARTITION_COLUMN_NAME);
List<ColumnMetadata> columnMetadatas = tableMetadata.getColumns();
assertEquals(columnMetadatas.size(), columnNames.size());
for (int i = 0; i < columnMetadatas.size(); i++) {
ColumnMetadata columnMetadata = columnMetadatas.get(i);
assertEquals(columnMetadata.getName(), columnNames.get(i));
if (columnMetadata.getName().equals(PARTITION_COLUMN_NAME)) {
assertTrue(columnMetadata.isHidden());
}
}
assertEquals(getPartitions("test_partition_hidden_column").size(), 9);

MaterializedResult results = computeActual(format("SELECT *, \"%s\" FROM test_partition_hidden_column", PARTITION_COLUMN_NAME));
for (MaterializedRow row : results.getMaterializedRows()) {
String actualPartition = (String) row.getField(3);
String expectedPartition = format("col1=%s/col2=%s", row.getField(1), row.getField(2));
assertEquals(actualPartition, expectedPartition);
}
assertEquals(results.getRowCount(), 9);

assertUpdate("DROP TABLE test_partition_hidden_column");
}

@Test
public void testDeleteAndInsert()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static io.prestosql.testing.TestingHandles.TEST_TABLE_HANDLE;
import static io.prestosql.testing.TestingSession.testSessionBuilder;
import static io.prestosql.testing.TestingTaskContext.createTaskContext;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
Expand Down Expand Up @@ -412,6 +413,7 @@ private class TestPreparer
private final Properties schema;
private final List<HiveColumnHandle> columns;
private final List<Type> types;
private final String partitonName;
private final List<HivePartitionKey> partitionKeys;
private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("test-executor-%s"));
private final ScheduledExecutorService scheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed("test-scheduledExecutor-%s"));
Expand Down Expand Up @@ -443,6 +445,10 @@ public TestPreparer(String tempFilePath, List<TestColumn> testColumns, int numRo
.map(input -> new HivePartitionKey(input.getName(), (String) input.getWriteValue()))
.collect(toList());

partitonName = String.join("/", partitionKeys.stream()
.map(partitionKey -> format("%s=%s", partitionKey.getName(), partitionKey.getValue()))
.collect(toImmutableList()));

ImmutableList.Builder<HiveColumnHandle> columnsBuilder = ImmutableList.builder();
ImmutableList.Builder<Type> typesBuilder = ImmutableList.builder();
int nextHiveColumnIndex = 0;
Expand Down Expand Up @@ -490,6 +496,7 @@ public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, Connec
schema,
TupleDomain.all(),
columns,
partitonName,
partitionKeys,
DateTimeZone.UTC,
TYPE_MANAGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveStorageFormat.ORC;
import static io.prestosql.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.prestosql.plugin.hive.HiveTestUtils.TYPE_MANAGER;
import static io.prestosql.plugin.hive.HiveTestUtils.getHiveSession;
import static io.prestosql.plugin.hive.parquet.ParquetTester.HIVE_STORAGE_TIME_ZONE;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.testing.StructuralTestUtil.rowBlockOf;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
Expand Down Expand Up @@ -187,6 +189,10 @@ private ConnectorPageSource createPageSource(
.map(input -> new HivePartitionKey(input.getName(), (String) input.getWriteValue()))
.collect(toList());

String partitionName = String.join("/", partitionKeys.stream()
.map(partitionKey -> format("%s=%s", partitionKey.getName(), partitionKey.getValue()))
.collect(toImmutableList()));

List<HiveColumnHandle> columnHandles = getColumnHandles(columnsToRead);

TupleDomain<HiveColumnHandle> predicate = effectivePredicate.transform(testColumn -> {
Expand All @@ -212,6 +218,7 @@ private ConnectorPageSource createPageSource(
splitProperties,
predicate,
columnHandles,
partitionName,
partitionKeys,
DateTimeZone.getDefault(),
TYPE_MANAGER,
Expand Down

0 comments on commit 5b6eaa9

Please sign in to comment.