Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manifests and Files tables for Iceberg Connector #2223

Merged
merged 3 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,9 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.groupId>com.github.apache.incubator-iceberg</dep.iceberg.groupId>
<dep.iceberg.version>77a456a</dep.iceberg.version>
<dep.iceberg.version>0.7.0-incubating</dep.iceberg.version>
</properties>

<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>io.prestosql.hadoop</groupId>
Expand Down Expand Up @@ -146,13 +138,13 @@

<!-- Iceberg -->
<dependency>
<groupId>${dep.iceberg.groupId}</groupId>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>${dep.iceberg.groupId}</groupId>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${dep.iceberg.version}</version>
<exclusions>
Expand All @@ -164,13 +156,13 @@
</dependency>

<dependency>
<groupId>${dep.iceberg.groupId}</groupId>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-hive</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>${dep.iceberg.groupId}</groupId>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
<version>${dep.iceberg.version}</version>
<exclusions>
Expand All @@ -182,7 +174,7 @@
</dependency>

<dependency>
<groupId>${dep.iceberg.groupId}</groupId>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${dep.iceberg.version}</version>
<exclusions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slices;
import io.prestosql.plugin.iceberg.util.PageListBuilder;
import io.prestosql.spi.Page;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedPageSource;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SystemTable;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.TypeManager;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.TypeSignature.mapType;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class FilesTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;
private final Optional<Long> snapshotId;

public FilesTable(SchemaTableName tableName, Table icebergTable, Optional<Long> snapshotId, TypeManager typeManager)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("file_path", VARCHAR))
.add(new ColumnMetadata("file_format", VARCHAR))
.add(new ColumnMetadata("record_count", BIGINT))
.add(new ColumnMetadata("file_size_in_bytes", BIGINT))
.add(new ColumnMetadata("column_sizes", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("null_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("lower_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("upper_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("key_metadata", VARBINARY))
.add(new ColumnMetadata("split_offsets", new ArrayType(BIGINT)))
.build());
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, session, icebergTable, snapshotId));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, ConnectorSession session, Table icebergTable, Optional<Long> snapshotId)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
TableScan tableScan = getTableScan(session, TupleDomain.all(), snapshotId, icebergTable).includeColumnStats();
Map<Integer, Type> idToTypeMapping = icebergTable.schema().columns().stream()
.collect(toImmutableMap(Types.NestedField::fieldId, column -> column.type().asPrimitiveType()));

tableScan.planFiles().forEach(fileScanTask -> {
DataFile dataFile = fileScanTask.file();

pagesBuilder.beginRow();
pagesBuilder.appendVarchar(dataFile.path().toString());
pagesBuilder.appendVarchar(dataFile.format().name());
pagesBuilder.appendBigint(dataFile.recordCount());
pagesBuilder.appendBigint(dataFile.fileSizeInBytes());
if (checkNonNull(dataFile.columnSizes(), pagesBuilder)) {
pagesBuilder.appendIntegerBigintMap(dataFile.columnSizes());
}
if (checkNonNull(dataFile.valueCounts(), pagesBuilder)) {
pagesBuilder.appendIntegerBigintMap(dataFile.valueCounts());
}
if (checkNonNull(dataFile.nullValueCounts(), pagesBuilder)) {
pagesBuilder.appendIntegerBigintMap(dataFile.nullValueCounts());
}
if (checkNonNull(dataFile.lowerBounds(), pagesBuilder)) {
pagesBuilder.appendIntegerVarcharMap(dataFile.lowerBounds().entrySet().stream()
.collect(toImmutableMap(
Map.Entry<Integer, ByteBuffer>::getKey,
entry -> Transforms.identity(idToTypeMapping.get(entry.getKey())).toHumanString(
Conversions.fromByteBuffer(idToTypeMapping.get(entry.getKey()), entry.getValue())))));
}
if (checkNonNull(dataFile.upperBounds(), pagesBuilder)) {
pagesBuilder.appendIntegerVarcharMap(dataFile.upperBounds().entrySet().stream()
.collect(toImmutableMap(
Map.Entry<Integer, ByteBuffer>::getKey,
entry -> Transforms.identity(idToTypeMapping.get(entry.getKey())).toHumanString(
Conversions.fromByteBuffer(idToTypeMapping.get(entry.getKey()), entry.getValue())))));
}
if (checkNonNull(dataFile.keyMetadata(), pagesBuilder)) {
pagesBuilder.appendVarbinary(Slices.wrappedBuffer(dataFile.keyMetadata()));
}
if (checkNonNull(dataFile.splitOffsets(), pagesBuilder)) {
pagesBuilder.appendBigintArray(dataFile.splitOffsets());
}
pagesBuilder.endRow();
});

return pagesBuilder.build();
}

private static boolean checkNonNull(Object object, PageListBuilder pagesBuilder)
{
if (object == null) {
pagesBuilder.appendNull();
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum IcebergErrorCode
ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL),
ICEBERG_CURSOR_ERROR(9, EXTERNAL),
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Iceber
return Optional.of(new HistoryTable(table.getSchemaTableNameWithType(), icebergTable));
case SNAPSHOTS:
return Optional.of(new SnapshotsTable(table.getSchemaTableNameWithType(), typeManager, icebergTable));
case MANIFESTS:
return Optional.of(new ManifestsTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId()));
case FILES:
return Optional.of(new FilesTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId(), typeManager));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static IcebergTableHandle from(SchemaTableName name)
}

Optional<Long> version = Optional.empty();
if (type == TableType.DATA || type == TableType.PARTITIONS || type == TableType.MANIFESTS) {
if (type == TableType.DATA || type == TableType.PARTITIONS || type == TableType.MANIFESTS || type == TableType.FILES) {
if (ver1 != null && ver2 != null) {
throw new PrestoException(NOT_SUPPORTED, "Invalid Iceberg table name (cannot specify two @ versions): " + name);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.prestosql.plugin.iceberg.util.PageListBuilder;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedPageSource;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SystemTable;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.RowType;
import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class ManifestsTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;
private final Optional<Long> snapshotId;

public ManifestsTable(SchemaTableName tableName, Table icebergTable, Optional<Long> snapshotId)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

tableMetadata = new ConnectorTableMetadata(
tableName,
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("path", VARCHAR))
.add(new ColumnMetadata("length", BIGINT))
.add(new ColumnMetadata("partition_spec_id", INTEGER))
.add(new ColumnMetadata("added_snapshot_id", BIGINT))
.add(new ColumnMetadata("added_data_files_count", INTEGER))
.add(new ColumnMetadata("existing_data_files_count", INTEGER))
.add(new ColumnMetadata("deleted_data_files_count", INTEGER))
.add(new ColumnMetadata("partitions", RowType.rowType(
RowType.field("contains_null", BOOLEAN),
RowType.field("lower_bound", VARCHAR),
RowType.field("upper_bound", VARCHAR))))
.build());
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

Snapshot snapshot = snapshotId.map(icebergTable::snapshot)
.orElseGet(icebergTable::currentSnapshot);
if (snapshot == null) {
if (snapshotId.isPresent()) {
throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Invalid snapshot ID: " + snapshotId.get());
}
throw new PrestoException(ICEBERG_INVALID_METADATA, "There's no snapshot associated with table " + tableMetadata.getTable().toString());
}
Map<Integer, PartitionSpec> partitionSpecsById = icebergTable.specs();

snapshot.manifests().forEach(file -> {
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(file.path());
pagesBuilder.appendBigint(file.length());
pagesBuilder.appendInteger(file.partitionSpecId());
pagesBuilder.appendBigint(file.snapshotId());
pagesBuilder.appendInteger(file.addedFilesCount());
pagesBuilder.appendInteger(file.existingFilesCount());
pagesBuilder.appendInteger(file.deletedFilesCount());
writePartitionSummaries(pagesBuilder.nextColumn(), file.partitions(), partitionSpecsById.get(file.partitionSpecId()));
pagesBuilder.endRow();
});

return pagesBuilder.build();
}

private static void writePartitionSummaries(BlockBuilder blockBuilder, List<PartitionFieldSummary> summaries, PartitionSpec partitionSpec)
{
for (int i = 0; i < summaries.size(); i++) {
PartitionFieldSummary summary = summaries.get(i);
PartitionField field = partitionSpec.fields().get(i);
Type nestedType = partitionSpec.partitionType().fields().get(i).type();

BlockBuilder rowBuilder = blockBuilder.beginBlockEntry();
BOOLEAN.writeBoolean(rowBuilder, summary.containsNull());
VARCHAR.writeString(rowBuilder, field.transform().toHumanString(
Conversions.fromByteBuffer(nestedType, summary.lowerBound())));
VARCHAR.writeString(rowBuilder, field.transform().toHumanString(
Conversions.fromByteBuffer(nestedType, summary.upperBound())));
blockBuilder.closeEntry();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum TableType
DATA,
HISTORY,
SNAPSHOTS,
MANIFESTS, // TODO: to be implemented
MANIFESTS,
PARTITIONS,
FILES,
}
Loading