From 7caeba9c8cb723db3a3d24a1ebe679691074437c Mon Sep 17 00:00:00 2001 From: Xingyuan Lin Date: Tue, 21 Jan 2020 22:34:40 -0800 Subject: [PATCH 1/3] Update to Iceberg 0.7.0 --- presto-iceberg/pom.xml | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index a95615925dfc..a91043d23e53 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -14,17 +14,9 @@ ${project.parent.basedir} - com.github.apache.incubator-iceberg - 77a456a + 0.7.0-incubating - - - jitpack.io - https://jitpack.io - - - io.prestosql.hadoop @@ -146,13 +138,13 @@ - ${dep.iceberg.groupId} + org.apache.iceberg iceberg-api ${dep.iceberg.version} - ${dep.iceberg.groupId} + org.apache.iceberg iceberg-core ${dep.iceberg.version} @@ -164,13 +156,13 @@ - ${dep.iceberg.groupId} + org.apache.iceberg iceberg-hive ${dep.iceberg.version} - ${dep.iceberg.groupId} + org.apache.iceberg iceberg-orc ${dep.iceberg.version} @@ -182,7 +174,7 @@ - ${dep.iceberg.groupId} + org.apache.iceberg iceberg-parquet ${dep.iceberg.version} From 26d47326bf1690764cd7e9c20d1012086c6b53ba Mon Sep 17 00:00:00 2001 From: Xingyuan Lin Date: Wed, 4 Dec 2019 15:55:30 -0800 Subject: [PATCH 2/3] Add support for Iceberg manifests tables --- .../plugin/iceberg/IcebergErrorCode.java | 1 + .../plugin/iceberg/IcebergMetadata.java | 2 + .../plugin/iceberg/ManifestsTable.java | 144 ++++++++++++++++++ .../prestosql/plugin/iceberg/TableType.java | 2 +- .../plugin/iceberg/util/PageListBuilder.java | 12 +- .../iceberg/TestIcebergSystemTables.java | 15 ++ 6 files changed, 172 insertions(+), 4 deletions(-) create mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java index 7428ec034efb..a639a8b2c64d 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java @@ -35,6 +35,7 @@ public enum IcebergErrorCode ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL), ICEBERG_CURSOR_ERROR(9, EXTERNAL), ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR), + ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR), /**/; private final ErrorCode errorCode; diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java index 2a7432dafa61..92cb1920bcc0 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java @@ -174,6 +174,8 @@ private Optional getRawSystemTable(ConnectorSession session, Iceber return Optional.of(new HistoryTable(table.getSchemaTableNameWithType(), icebergTable)); case SNAPSHOTS: return Optional.of(new SnapshotsTable(table.getSchemaTableNameWithType(), typeManager, icebergTable)); + case MANIFESTS: + return Optional.of(new ManifestsTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId())); } return Optional.empty(); } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java new file mode 100644 index 000000000000..91e2019e1fd2 --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ManifestsTable.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.prestosql.plugin.iceberg.util.PageListBuilder; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.BlockBuilder; +import io.prestosql.spi.connector.ColumnMetadata; +import io.prestosql.spi.connector.ConnectorPageSource; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableMetadata; +import io.prestosql.spi.connector.ConnectorTransactionHandle; +import io.prestosql.spi.connector.FixedPageSource; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.connector.SystemTable; +import io.prestosql.spi.predicate.TupleDomain; +import io.prestosql.spi.type.RowType; +import org.apache.iceberg.ManifestFile.PartitionFieldSummary; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; +import static io.prestosql.spi.type.BigintType.BIGINT; +import static io.prestosql.spi.type.BooleanType.BOOLEAN; +import static io.prestosql.spi.type.IntegerType.INTEGER; +import static io.prestosql.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class ManifestsTable + implements SystemTable +{ + private final ConnectorTableMetadata tableMetadata; + private final Table icebergTable; + private final Optional snapshotId; + + public ManifestsTable(SchemaTableName tableName, Table icebergTable, Optional snapshotId) + { + this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); + + tableMetadata = new ConnectorTableMetadata( + tableName, + ImmutableList.builder() + .add(new ColumnMetadata("path", VARCHAR)) + .add(new ColumnMetadata("length", BIGINT)) + .add(new ColumnMetadata("partition_spec_id", INTEGER)) + .add(new ColumnMetadata("added_snapshot_id", BIGINT)) + .add(new ColumnMetadata("added_data_files_count", INTEGER)) + .add(new ColumnMetadata("existing_data_files_count", INTEGER)) + .add(new ColumnMetadata("deleted_data_files_count", INTEGER)) + .add(new ColumnMetadata("partitions", RowType.rowType( + RowType.field("contains_null", BOOLEAN), + RowType.field("lower_bound", VARCHAR), + RowType.field("upper_bound", VARCHAR)))) + .build()); + this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); + } + + @Override + public Distribution getDistribution() + { + return Distribution.SINGLE_COORDINATOR; + } + + @Override + public ConnectorTableMetadata getTableMetadata() + { + return tableMetadata; + } + + @Override + public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + { + return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId)); + } + + private static List buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional snapshotId) + { + PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); + + Snapshot snapshot = snapshotId.map(icebergTable::snapshot) + .orElseGet(icebergTable::currentSnapshot); + if (snapshot == null) { + if (snapshotId.isPresent()) { + throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Invalid snapshot ID: " + snapshotId.get()); + } + throw new PrestoException(ICEBERG_INVALID_METADATA, "There's no snapshot associated with table " + tableMetadata.getTable().toString()); + } + Map partitionSpecsById = icebergTable.specs(); + + snapshot.manifests().forEach(file -> { + pagesBuilder.beginRow(); + pagesBuilder.appendVarchar(file.path()); + pagesBuilder.appendBigint(file.length()); + pagesBuilder.appendInteger(file.partitionSpecId()); + pagesBuilder.appendBigint(file.snapshotId()); + pagesBuilder.appendInteger(file.addedFilesCount()); + pagesBuilder.appendInteger(file.existingFilesCount()); + pagesBuilder.appendInteger(file.deletedFilesCount()); + writePartitionSummaries(pagesBuilder.nextColumn(), file.partitions(), partitionSpecsById.get(file.partitionSpecId())); + pagesBuilder.endRow(); + }); + + return pagesBuilder.build(); + } + + private static void writePartitionSummaries(BlockBuilder blockBuilder, List summaries, PartitionSpec partitionSpec) + { + for (int i = 0; i < summaries.size(); i++) { + PartitionFieldSummary summary = summaries.get(i); + PartitionField field = partitionSpec.fields().get(i); + Type nestedType = partitionSpec.partitionType().fields().get(i).type(); + + BlockBuilder rowBuilder = blockBuilder.beginBlockEntry(); + BOOLEAN.writeBoolean(rowBuilder, summary.containsNull()); + VARCHAR.writeString(rowBuilder, field.transform().toHumanString( + Conversions.fromByteBuffer(nestedType, summary.lowerBound()))); + VARCHAR.writeString(rowBuilder, field.transform().toHumanString( + Conversions.fromByteBuffer(nestedType, summary.upperBound()))); + blockBuilder.closeEntry(); + } + } +} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java index cb7671230468..bb8ab8bffc98 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java @@ -18,6 +18,6 @@ public enum TableType DATA, HISTORY, SNAPSHOTS, - MANIFESTS, // TODO: to be implemented + MANIFESTS, PARTITIONS, } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java index be39be0409f3..0c3967d790f1 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java @@ -19,7 +19,6 @@ import io.prestosql.spi.block.BlockBuilder; import io.prestosql.spi.connector.ColumnMetadata; import io.prestosql.spi.connector.ConnectorTableMetadata; -import io.prestosql.spi.type.BigintType; import io.prestosql.spi.type.Type; import java.util.List; @@ -27,6 +26,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.spi.type.BigintType.BIGINT; +import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.TimestampType.TIMESTAMP; import static io.prestosql.spi.type.VarcharType.VARCHAR; @@ -84,9 +85,14 @@ public void appendNull() nextColumn().appendNull(); } + public void appendInteger(int value) + { + INTEGER.writeLong(nextColumn(), value); + } + public void appendBigint(long value) { - BigintType.BIGINT.writeLong(nextColumn(), value); + BIGINT.writeLong(nextColumn(), value); } public void appendTimestamp(long value) @@ -120,7 +126,7 @@ public void appendVarcharVarcharMap(Map values) column.closeEntry(); } - private BlockBuilder nextColumn() + public BlockBuilder nextColumn() { int currentChannel = channel; channel++; diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java index 9d31056d4b43..fbd1b5459a9f 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java @@ -134,6 +134,21 @@ public void testSnapshotsTable() assertQuery("SELECT summary['total-records'] FROM test_schema.\"test_table$snapshots\"", "VALUES '0', '3', '6'"); } + @Test + public void testManifestsTable() + { + assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$manifests\"", + "VALUES ('path', 'varchar', '', '')," + + "('length', 'bigint', '', '')," + + "('partition_spec_id', 'integer', '', '')," + + "('added_snapshot_id', 'bigint', '', '')," + + "('added_data_files_count', 'integer', '', '')," + + "('existing_data_files_count', 'integer', '', '')," + + "('deleted_data_files_count', 'integer', '', '')," + + "('partitions', 'row(contains_null boolean, lower_bound varchar, upper_bound varchar)', '', '')"); + assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$manifests\""); + } + @AfterClass(alwaysRun = true) public void tearDown() { From d512db76ef75973517b2b2bc3417830dd78fedb8 Mon Sep 17 00:00:00 2001 From: Xingyuan Lin Date: Thu, 5 Dec 2019 18:45:58 -0800 Subject: [PATCH 3/3] Add support for Iceberg files tables --- .../prestosql/plugin/iceberg/FilesTable.java | 157 ++++++++++++++++++ .../plugin/iceberg/IcebergMetadata.java | 2 + .../plugin/iceberg/IcebergTableHandle.java | 2 +- .../prestosql/plugin/iceberg/TableType.java | 1 + .../plugin/iceberg/util/PageListBuilder.java | 39 +++++ .../iceberg/TestIcebergSystemTables.java | 18 ++ 6 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java new file mode 100644 index 000000000000..33756148cb8b --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/FilesTable.java @@ -0,0 +1,157 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slices; +import io.prestosql.plugin.iceberg.util.PageListBuilder; +import io.prestosql.spi.Page; +import io.prestosql.spi.connector.ColumnMetadata; +import io.prestosql.spi.connector.ConnectorPageSource; +import io.prestosql.spi.connector.ConnectorSession; +import io.prestosql.spi.connector.ConnectorTableMetadata; +import io.prestosql.spi.connector.ConnectorTransactionHandle; +import io.prestosql.spi.connector.FixedPageSource; +import io.prestosql.spi.connector.SchemaTableName; +import io.prestosql.spi.connector.SystemTable; +import io.prestosql.spi.predicate.TupleDomain; +import io.prestosql.spi.type.ArrayType; +import io.prestosql.spi.type.TypeManager; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan; +import static io.prestosql.spi.type.BigintType.BIGINT; +import static io.prestosql.spi.type.IntegerType.INTEGER; +import static io.prestosql.spi.type.TypeSignature.mapType; +import static io.prestosql.spi.type.VarbinaryType.VARBINARY; +import static io.prestosql.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class FilesTable + implements SystemTable +{ + private final ConnectorTableMetadata tableMetadata; + private final Table icebergTable; + private final Optional snapshotId; + + public FilesTable(SchemaTableName tableName, Table icebergTable, Optional snapshotId, TypeManager typeManager) + { + this.icebergTable = requireNonNull(icebergTable, "icebergTable is null"); + + tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), + ImmutableList.builder() + .add(new ColumnMetadata("file_path", VARCHAR)) + .add(new ColumnMetadata("file_format", VARCHAR)) + .add(new ColumnMetadata("record_count", BIGINT)) + .add(new ColumnMetadata("file_size_in_bytes", BIGINT)) + .add(new ColumnMetadata("column_sizes", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature())))) + .add(new ColumnMetadata("value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature())))) + .add(new ColumnMetadata("null_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature())))) + .add(new ColumnMetadata("lower_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature())))) + .add(new ColumnMetadata("upper_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature())))) + .add(new ColumnMetadata("key_metadata", VARBINARY)) + .add(new ColumnMetadata("split_offsets", new ArrayType(BIGINT))) + .build()); + this.snapshotId = requireNonNull(snapshotId, "snapshotId is null"); + } + + @Override + public Distribution getDistribution() + { + return Distribution.SINGLE_COORDINATOR; + } + + @Override + public ConnectorTableMetadata getTableMetadata() + { + return tableMetadata; + } + + @Override + public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + { + return new FixedPageSource(buildPages(tableMetadata, session, icebergTable, snapshotId)); + } + + private static List buildPages(ConnectorTableMetadata tableMetadata, ConnectorSession session, Table icebergTable, Optional snapshotId) + { + PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); + TableScan tableScan = getTableScan(session, TupleDomain.all(), snapshotId, icebergTable).includeColumnStats(); + Map idToTypeMapping = icebergTable.schema().columns().stream() + .collect(toImmutableMap(Types.NestedField::fieldId, column -> column.type().asPrimitiveType())); + + tableScan.planFiles().forEach(fileScanTask -> { + DataFile dataFile = fileScanTask.file(); + + pagesBuilder.beginRow(); + pagesBuilder.appendVarchar(dataFile.path().toString()); + pagesBuilder.appendVarchar(dataFile.format().name()); + pagesBuilder.appendBigint(dataFile.recordCount()); + pagesBuilder.appendBigint(dataFile.fileSizeInBytes()); + if (checkNonNull(dataFile.columnSizes(), pagesBuilder)) { + pagesBuilder.appendIntegerBigintMap(dataFile.columnSizes()); + } + if (checkNonNull(dataFile.valueCounts(), pagesBuilder)) { + pagesBuilder.appendIntegerBigintMap(dataFile.valueCounts()); + } + if (checkNonNull(dataFile.nullValueCounts(), pagesBuilder)) { + pagesBuilder.appendIntegerBigintMap(dataFile.nullValueCounts()); + } + if (checkNonNull(dataFile.lowerBounds(), pagesBuilder)) { + pagesBuilder.appendIntegerVarcharMap(dataFile.lowerBounds().entrySet().stream() + .collect(toImmutableMap( + Map.Entry::getKey, + entry -> Transforms.identity(idToTypeMapping.get(entry.getKey())).toHumanString( + Conversions.fromByteBuffer(idToTypeMapping.get(entry.getKey()), entry.getValue()))))); + } + if (checkNonNull(dataFile.upperBounds(), pagesBuilder)) { + pagesBuilder.appendIntegerVarcharMap(dataFile.upperBounds().entrySet().stream() + .collect(toImmutableMap( + Map.Entry::getKey, + entry -> Transforms.identity(idToTypeMapping.get(entry.getKey())).toHumanString( + Conversions.fromByteBuffer(idToTypeMapping.get(entry.getKey()), entry.getValue()))))); + } + if (checkNonNull(dataFile.keyMetadata(), pagesBuilder)) { + pagesBuilder.appendVarbinary(Slices.wrappedBuffer(dataFile.keyMetadata())); + } + if (checkNonNull(dataFile.splitOffsets(), pagesBuilder)) { + pagesBuilder.appendBigintArray(dataFile.splitOffsets()); + } + pagesBuilder.endRow(); + }); + + return pagesBuilder.build(); + } + + private static boolean checkNonNull(Object object, PageListBuilder pagesBuilder) + { + if (object == null) { + pagesBuilder.appendNull(); + return false; + } + return true; + } +} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java index 92cb1920bcc0..2996ca3910bd 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java @@ -176,6 +176,8 @@ private Optional getRawSystemTable(ConnectorSession session, Iceber return Optional.of(new SnapshotsTable(table.getSchemaTableNameWithType(), typeManager, icebergTable)); case MANIFESTS: return Optional.of(new ManifestsTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId())); + case FILES: + return Optional.of(new FilesTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId(), typeManager)); } return Optional.empty(); } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java index bd1ccfbe082b..4b3320b94009 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java @@ -128,7 +128,7 @@ public static IcebergTableHandle from(SchemaTableName name) } Optional version = Optional.empty(); - if (type == TableType.DATA || type == TableType.PARTITIONS || type == TableType.MANIFESTS) { + if (type == TableType.DATA || type == TableType.PARTITIONS || type == TableType.MANIFESTS || type == TableType.FILES) { if (ver1 != null && ver2 != null) { throw new PrestoException(NOT_SUPPORTED, "Invalid Iceberg table name (cannot specify two @ versions): " + name); } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java index bb8ab8bffc98..e637a0b24e6e 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TableType.java @@ -20,4 +20,5 @@ public enum TableType SNAPSHOTS, MANIFESTS, PARTITIONS, + FILES, } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java index 0c3967d790f1..0ca240a633b5 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PageListBuilder.java @@ -14,6 +14,7 @@ package io.prestosql.plugin.iceberg.util; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; import io.prestosql.spi.Page; import io.prestosql.spi.PageBuilder; import io.prestosql.spi.block.BlockBuilder; @@ -29,6 +30,7 @@ import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.TimestampType.TIMESTAMP; +import static io.prestosql.spi.type.VarbinaryType.VARBINARY; import static io.prestosql.spi.type.VarcharType.VARCHAR; public final class PageListBuilder @@ -105,6 +107,21 @@ public void appendVarchar(String value) VARCHAR.writeString(nextColumn(), value); } + public void appendVarbinary(Slice value) + { + VARBINARY.writeSlice(nextColumn(), value); + } + + public void appendBigintArray(Iterable values) + { + BlockBuilder column = nextColumn(); + BlockBuilder array = column.beginBlockEntry(); + for (Long value : values) { + BIGINT.writeLong(array, value); + } + column.closeEntry(); + } + public void appendVarcharArray(Iterable values) { BlockBuilder column = nextColumn(); @@ -126,6 +143,28 @@ public void appendVarcharVarcharMap(Map values) column.closeEntry(); } + public void appendIntegerBigintMap(Map values) + { + BlockBuilder column = nextColumn(); + BlockBuilder map = column.beginBlockEntry(); + values.forEach((key, value) -> { + INTEGER.writeLong(map, key); + BIGINT.writeLong(map, value); + }); + column.closeEntry(); + } + + public void appendIntegerVarcharMap(Map values) + { + BlockBuilder column = nextColumn(); + BlockBuilder map = column.beginBlockEntry(); + values.forEach((key, value) -> { + INTEGER.writeLong(map, key); + VARCHAR.writeString(map, value); + }); + column.closeEntry(); + } + public BlockBuilder nextColumn() { int currentChannel = channel; diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java index fbd1b5459a9f..11a7b9d715a6 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java @@ -149,6 +149,24 @@ public void testManifestsTable() assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$manifests\""); } + @Test + public void testFilesTable() + { + assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$files\"", + "VALUES ('file_path', 'varchar', '', '')," + + "('file_format', 'varchar', '', '')," + + "('record_count', 'bigint', '', '')," + + "('file_size_in_bytes', 'bigint', '', '')," + + "('column_sizes', 'map(integer, bigint)', '', '')," + + "('value_counts', 'map(integer, bigint)', '', '')," + + "('null_value_counts', 'map(integer, bigint)', '', '')," + + "('lower_bounds', 'map(integer, varchar)', '', '')," + + "('upper_bounds', 'map(integer, varchar)', '', '')," + + "('key_metadata', 'varbinary', '', '')," + + "('split_offsets', 'array(bigint)', '', '')"); + assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\""); + } + @AfterClass(alwaysRun = true) public void tearDown() {