Skip to content

Commit

Permalink
Add support for Iceberg manifests tables
Browse files Browse the repository at this point in the history
  • Loading branch information
lxynov authored and electrum committed Feb 26, 2020
1 parent 39043a5 commit 83e8b1e
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum IcebergErrorCode
ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL),
ICEBERG_CURSOR_ERROR(9, EXTERNAL),
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Iceber
return Optional.of(new HistoryTable(table.getSchemaTableNameWithType(), icebergTable));
case SNAPSHOTS:
return Optional.of(new SnapshotsTable(table.getSchemaTableNameWithType(), typeManager, icebergTable));
case MANIFESTS:
return Optional.of(new ManifestsTable(table.getSchemaTableNameWithType(), icebergTable, table.getSnapshotId()));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import io.prestosql.plugin.iceberg.util.PageListBuilder;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedPageSource;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SystemTable;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.RowType;
import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class ManifestsTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final Table icebergTable;
private final Optional<Long> snapshotId;

public ManifestsTable(SchemaTableName tableName, Table icebergTable, Optional<Long> snapshotId)
{
this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");

tableMetadata = new ConnectorTableMetadata(
tableName,
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("path", VARCHAR))
.add(new ColumnMetadata("length", BIGINT))
.add(new ColumnMetadata("partition_spec_id", INTEGER))
.add(new ColumnMetadata("added_snapshot_id", BIGINT))
.add(new ColumnMetadata("added_data_files_count", INTEGER))
.add(new ColumnMetadata("existing_data_files_count", INTEGER))
.add(new ColumnMetadata("deleted_data_files_count", INTEGER))
.add(new ColumnMetadata("partitions", RowType.rowType(
RowType.field("contains_null", BOOLEAN),
RowType.field("lower_bound", VARCHAR),
RowType.field("upper_bound", VARCHAR))))
.build());
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId));
}

private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

Snapshot snapshot = snapshotId.map(icebergTable::snapshot)
.orElseGet(icebergTable::currentSnapshot);
if (snapshot == null) {
if (snapshotId.isPresent()) {
throw new PrestoException(ICEBERG_INVALID_SNAPSHOT_ID, "Invalid snapshot ID: " + snapshotId.get());
}
throw new PrestoException(ICEBERG_INVALID_METADATA, "There's no snapshot associated with table " + tableMetadata.getTable().toString());
}
Map<Integer, PartitionSpec> partitionSpecsById = icebergTable.specs();

snapshot.manifests().forEach(file -> {
pagesBuilder.beginRow();
pagesBuilder.appendVarchar(file.path());
pagesBuilder.appendBigint(file.length());
pagesBuilder.appendInteger(file.partitionSpecId());
pagesBuilder.appendBigint(file.snapshotId());
pagesBuilder.appendInteger(file.addedFilesCount());
pagesBuilder.appendInteger(file.existingFilesCount());
pagesBuilder.appendInteger(file.deletedFilesCount());
writePartitionSummaries(pagesBuilder.nextColumn(), file.partitions(), partitionSpecsById.get(file.partitionSpecId()));
pagesBuilder.endRow();
});

return pagesBuilder.build();
}

private static void writePartitionSummaries(BlockBuilder blockBuilder, List<PartitionFieldSummary> summaries, PartitionSpec partitionSpec)
{
for (int i = 0; i < summaries.size(); i++) {
PartitionFieldSummary summary = summaries.get(i);
PartitionField field = partitionSpec.fields().get(i);
Type nestedType = partitionSpec.partitionType().fields().get(i).type();

BlockBuilder rowBuilder = blockBuilder.beginBlockEntry();
BOOLEAN.writeBoolean(rowBuilder, summary.containsNull());
VARCHAR.writeString(rowBuilder, field.transform().toHumanString(
Conversions.fromByteBuffer(nestedType, summary.lowerBound())));
VARCHAR.writeString(rowBuilder, field.transform().toHumanString(
Conversions.fromByteBuffer(nestedType, summary.upperBound())));
blockBuilder.closeEntry();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public enum TableType
DATA,
HISTORY,
SNAPSHOTS,
MANIFESTS, // TODO: to be implemented
MANIFESTS,
PARTITIONS,
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.Type;

import java.util.List;
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.VarcharType.VARCHAR;

Expand Down Expand Up @@ -84,9 +85,14 @@ public void appendNull()
nextColumn().appendNull();
}

public void appendInteger(int value)
{
INTEGER.writeLong(nextColumn(), value);
}

public void appendBigint(long value)
{
BigintType.BIGINT.writeLong(nextColumn(), value);
BIGINT.writeLong(nextColumn(), value);
}

public void appendTimestamp(long value)
Expand Down Expand Up @@ -120,7 +126,7 @@ public void appendVarcharVarcharMap(Map<String, String> values)
column.closeEntry();
}

private BlockBuilder nextColumn()
public BlockBuilder nextColumn()
{
int currentChannel = channel;
channel++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ public void testSnapshotsTable()
assertQuery("SELECT summary['total-records'] FROM test_schema.\"test_table$snapshots\"", "VALUES '0', '3', '6'");
}

@Test
public void testManifestsTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$manifests\"",
"VALUES ('path', 'varchar', '', '')," +
"('length', 'bigint', '', '')," +
"('partition_spec_id', 'integer', '', '')," +
"('added_snapshot_id', 'bigint', '', '')," +
"('added_data_files_count', 'integer', '', '')," +
"('existing_data_files_count', 'integer', '', '')," +
"('deleted_data_files_count', 'integer', '', '')," +
"('partitions', 'row(contains_null boolean, lower_bound varchar, upper_bound varchar)', '', '')");
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$manifests\"");
}

@AfterClass(alwaysRun = true)
public void tearDown()
{
Expand Down

0 comments on commit 83e8b1e

Please sign in to comment.