Skip to content

Commit

Permalink
Add support for Iceberg files tables
Browse files Browse the repository at this point in the history
  • Loading branch information
lxynov committed Jan 28, 2020
1 parent 26d4732 commit d512db7
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slices;
import io.prestosql.plugin.iceberg.util.PageListBuilder;
import io.prestosql.spi.Page;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedPageSource;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SystemTable;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.TypeManager;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.TypeSignature.mapType;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class FilesTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;
private final Optional<Long> snapshotId;

public FilesTable(SchemaTableName tableName, Table icebergTable, Optional<Long> snapshotId, TypeManager typeManager)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("file_path", VARCHAR))
.add(new ColumnMetadata("file_format", VARCHAR))
.add(new ColumnMetadata("record_count", BIGINT))
.add(new ColumnMetadata("file_size_in_bytes", BIGINT))
.add(new ColumnMetadata("column_sizes", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("null_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("lower_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("upper_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("key_metadata", VARBINARY))
.add(new ColumnMetadata("split_offsets", new ArrayType(BIGINT)))
.build());
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, session, icebergTable, snapshotId));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, ConnectorSession session, Table icebergTable, Optional<Long> snapshotId)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
TableScan tableScan = getTableScan(session, TupleDomain.all(), snapshotId, icebergTable).includeColumnStats();
Map<Integer, Type> idToTypeMapping = icebergTable.schema().columns().stream()
.collect(toImmutableMap(Types.NestedField::fieldId, column -> column.type().asPrimitiveType()));

tableScan.planFiles().forEach(fileScanTask -> {
DataFile dataFile = fileScanTask.file();

pagesBuilder.beginRow();
pagesBuilder.appendVarchar(dataFile.path().toString());
pagesBuilder.appendVarchar(dataFile.format().name());
pagesBuilder.appendBigint(dataFile.recordCount());
pagesBuilder.appendBigint(dataFile.fileSizeInBytes());
if (checkNonNull(dataFile.columnSizes(), pagesBuilder)) {
pagesBuilder.appendIntegerBigintMap(dataFile.columnSizes());
}
if (checkNonNull(dataFile.valueCounts(), pagesBuilder)) {
pagesBuilder.appendIntegerBigintMap(dataFile.valueCounts());
}
if (checkNonNull(dataFile.nullValueCounts(), pagesBuilder)) {
pagesBuilder.appendIntegerBigintMap(dataFile.nullValueCounts());
}
if (checkNonNull(dataFile.lowerBounds(), pagesBuilder)) {
pagesBuilder.appendIntegerVarcharMap(dataFile.lowerBounds().entrySet().stream()
.collect(toImmutableMap(
Map.Entry<Integer, ByteBuffer>::getKey,
entry -> Transforms.identity(idToTypeMapping.get(entry.getKey())).toHumanString(
Conversions.fromByteBuffer(idToTypeMapping.get(entry.getKey()), entry.getValue())))));
}
if (checkNonNull(dataFile.upperBounds(), pagesBuilder)) {
pagesBuilder.appendIntegerVarcharMap(dataFile.upperBounds().entrySet().stream()
.collect(toImmutableMap(
Map.Entry<Integer, ByteBuffer>::getKey,
entry -> Transforms.identity(idToTypeMapping.get(entry.getKey())).toHumanString(
Conversions.fromByteBuffer(idToTypeMapping.get(entry.getKey()), entry.getValue())))));
}
if (checkNonNull(dataFile.keyMetadata(), pagesBuilder)) {
pagesBuilder.appendVarbinary(Slices.wrappedBuffer(dataFile.keyMetadata()));
}
if (checkNonNull(dataFile.splitOffsets(), pagesBuilder)) {
pagesBuilder.appendBigintArray(dataFile.splitOffsets());
}
pagesBuilder.endRow();
});

return pagesBuilder.build();
}

private static boolean checkNonNull(Object object, PageListBuilder pagesBuilder)
{
if (object == null) {
pagesBuilder.appendNull();
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Iceber
return Optional.of(new SnapshotsTable(table.getSchemaTableNameWithType(), typeManager, icebergTable));
case MANIFESTS:
return Optional.of(new ManifestsTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId()));
case FILES:
return Optional.of(new FilesTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId(), typeManager));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static IcebergTableHandle from(SchemaTableName name)
}

Optional<Long> version = Optional.empty();
if (type == TableType.DATA || type == TableType.PARTITIONS || type == TableType.MANIFESTS) {
if (type == TableType.DATA || type == TableType.PARTITIONS || type == TableType.MANIFESTS || type == TableType.FILES) {
if (ver1 != null && ver2 != null) {
throw new PrestoException(NOT_SUPPORTED, "Invalid Iceberg table name (cannot specify two @ versions): " + name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ public enum TableType
SNAPSHOTS,
MANIFESTS,
PARTITIONS,
FILES,
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.prestosql.plugin.iceberg.util;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.prestosql.spi.Page;
import io.prestosql.spi.PageBuilder;
import io.prestosql.spi.block.BlockBuilder;
Expand All @@ -29,6 +30,7 @@
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.VarcharType.VARCHAR;

public final class PageListBuilder
Expand Down Expand Up @@ -105,6 +107,21 @@ public void appendVarchar(String value)
VARCHAR.writeString(nextColumn(), value);
}

public void appendVarbinary(Slice value)
{
VARBINARY.writeSlice(nextColumn(), value);
}

public void appendBigintArray(Iterable<Long> values)
{
BlockBuilder column = nextColumn();
BlockBuilder array = column.beginBlockEntry();
for (Long value : values) {
BIGINT.writeLong(array, value);
}
column.closeEntry();
}

public void appendVarcharArray(Iterable<String> values)
{
BlockBuilder column = nextColumn();
Expand All @@ -126,6 +143,28 @@ public void appendVarcharVarcharMap(Map<String, String> values)
column.closeEntry();
}

public void appendIntegerBigintMap(Map<Integer, Long> values)
{
BlockBuilder column = nextColumn();
BlockBuilder map = column.beginBlockEntry();
values.forEach((key, value) -> {
INTEGER.writeLong(map, key);
BIGINT.writeLong(map, value);
});
column.closeEntry();
}

public void appendIntegerVarcharMap(Map<Integer, String> values)
{
BlockBuilder column = nextColumn();
BlockBuilder map = column.beginBlockEntry();
values.forEach((key, value) -> {
INTEGER.writeLong(map, key);
VARCHAR.writeString(map, value);
});
column.closeEntry();
}

public BlockBuilder nextColumn()
{
int currentChannel = channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,24 @@ public void testManifestsTable()
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$manifests\"");
}

@Test
public void testFilesTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$files\"",
"VALUES ('file_path', 'varchar', '', '')," +
"('file_format', 'varchar', '', '')," +
"('record_count', 'bigint', '', '')," +
"('file_size_in_bytes', 'bigint', '', '')," +
"('column_sizes', 'map(integer, bigint)', '', '')," +
"('value_counts', 'map(integer, bigint)', '', '')," +
"('null_value_counts', 'map(integer, bigint)', '', '')," +
"('lower_bounds', 'map(integer, varchar)', '', '')," +
"('upper_bounds', 'map(integer, varchar)', '', '')," +
"('key_metadata', 'varbinary', '', '')," +
"('split_offsets', 'array(bigint)', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\"");
}

@AfterClass(alwaysRun = true)
public void tearDown()
{
Expand Down

0 comments on commit d512db7

Please sign in to comment.