Skip to content

Commit

Permalink
Introduce IcebergPageSource and IcebergColumnHandle
Browse files Browse the repository at this point in the history
This commit simplifies logic and fixing bugs in case of Iceberg
Partition Spec evolution.
  • Loading branch information
lxynov committed Oct 22, 2019
1 parent 06aca1e commit 2ea9bce
Show file tree
Hide file tree
Showing 23 changed files with 649 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static List<HiveType> toHiveTypes(String hiveTypes)
.collect(toList()));
}

private static HiveType toHiveType(TypeInfo typeInfo)
public static HiveType toHiveType(TypeInfo typeInfo)
{
requireNonNull(typeInfo, "typeInfo is null");
return new HiveType(typeInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.SmallintType.SMALLINT;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static java.lang.String.format;
Expand Down Expand Up @@ -115,11 +114,6 @@ public TypeInfo translate(Type type)
if (TIMESTAMP.equals(type)) {
return HIVE_TIMESTAMP.getTypeInfo();
}
if (TIMESTAMP_WITH_TIME_ZONE.equals(type)) {
// Hive does not have TIMESTAMP_WITH_TIME_ZONE, this is just a work around for iceberg, that upstream would not approve
// so we probably will need to handle it in iceberg connector but for now this should unblock netflix users.
return HIVE_TIMESTAMP.getTypeInfo();
}
if (type instanceof DecimalType) {
DecimalType decimalType = (DecimalType) type;
return new DecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
Expand Down
3 changes: 0 additions & 3 deletions presto-iceberg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ as a time travel feature which lets you query your table's snapshot at a given t
before announcing the connector as ready for use.
* Predicate pushdown is currently broken, which means delete is also broken. The code from the
original `getTableLayouts()` implementation needs to be updated for `applyFilter()`.
* We should try to remove `HiveColumnHandle`. This will require replacing or abstracting
`HivePageSource`, which is currently used to handle schema evolution and prefilled
column values (identity partitions).
* Writing of decimals and timestamps is broken, since their representation in Parquet seems
to be different for Iceberg and Hive. Reads are probably also broken, but this isn't tested
yet since writes don't work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.prestosql.plugin.iceberg;

import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.EquatableValueSet;
Expand Down Expand Up @@ -45,7 +44,7 @@ public final class DomainConverter
{
private DomainConverter() {}

public static TupleDomain<HiveColumnHandle> convertTupleDomainTypes(TupleDomain<HiveColumnHandle> tupleDomain)
public static TupleDomain<IcebergColumnHandle> convertTupleDomainTypes(TupleDomain<IcebergColumnHandle> tupleDomain)
{
if (tupleDomain.isAll() || tupleDomain.isNone()) {
return tupleDomain;
Expand All @@ -54,7 +53,7 @@ public static TupleDomain<HiveColumnHandle> convertTupleDomainTypes(TupleDomain<
return tupleDomain;
}

Map<HiveColumnHandle, Domain> transformedMap = new HashMap<>();
Map<IcebergColumnHandle, Domain> transformedMap = new HashMap<>();
tupleDomain.getDomains().get().forEach((column, domain) -> {
ValueSet valueSet = domain.getValues();
ValueSet transformedValueSet = valueSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.base.VerifyException;
import io.airlift.slice.Slice;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.EquatableValueSet;
Expand Down Expand Up @@ -59,22 +58,20 @@ public final class ExpressionConverter
{
private ExpressionConverter() {}

public static Expression toIcebergExpression(TupleDomain<HiveColumnHandle> tupleDomain, ConnectorSession session)
public static Expression toIcebergExpression(TupleDomain<IcebergColumnHandle> tupleDomain, ConnectorSession session)
{
if (tupleDomain.isAll()) {
return alwaysTrue();
}
if (!tupleDomain.getDomains().isPresent()) {
return alwaysFalse();
}
Map<HiveColumnHandle, Domain> domainMap = tupleDomain.getDomains().get();
Map<IcebergColumnHandle, Domain> domainMap = tupleDomain.getDomains().get();
Expression expression = alwaysTrue();
for (Map.Entry<HiveColumnHandle, Domain> entry : domainMap.entrySet()) {
HiveColumnHandle columnHandle = entry.getKey();
for (Map.Entry<IcebergColumnHandle, Domain> entry : domainMap.entrySet()) {
IcebergColumnHandle columnHandle = entry.getKey();
Domain domain = entry.getValue();
if (!columnHandle.isHidden()) {
expression = and(expression, toIcebergExpression(columnHandle.getName(), columnHandle.getType(), domain, session));
}
expression = and(expression, toIcebergExpression(columnHandle.getName(), columnHandle.getType(), domain, session));
}
return expression;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.type.Type;

import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class IcebergColumnHandle
implements ColumnHandle
{
private final int id;
private final String name;
private final Type type;
private final Optional<String> comment;

@JsonCreator
public IcebergColumnHandle(
@JsonProperty("id") int id,
@JsonProperty("name") String name,
@JsonProperty("type") Type type,
@JsonProperty("comment") Optional<String> comment)
{
this.id = id;
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.comment = requireNonNull(comment, "comment is null");
}

@JsonProperty
public int getId()
{
return id;
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public Type getType()
{
return type;
}

@JsonProperty
public Optional<String> getComment()
{
return comment;
}

@Override
public int hashCode()
{
return Objects.hash(id, name, type, comment);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
IcebergColumnHandle other = (IcebergColumnHandle) obj;
return this.id == other.id &&
Objects.equals(this.name, other.name) &&
Objects.equals(this.type, other.type) &&
Objects.equals(this.comment, other.comment);
}

@Override
public String toString()
{
return id + ":" + name + ":" + type.getDisplayName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public enum IcebergErrorCode
ICEBERG_UNKNOWN_TABLE_TYPE(0, EXTERNAL),
ICEBERG_INVALID_METADATA(1, EXTERNAL),
ICEBERG_TOO_MANY_OPEN_PARTITIONS(2, USER_ERROR),
ICEBERG_INVALID_PARTITION_VALUE(3, EXTERNAL),
ICEBERG_BAD_DATA(4, EXTERNAL),
ICEBERG_MISSING_DATA(5, EXTERNAL),
ICEBERG_CANNOT_OPEN_SPLIT(6, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.prestosql.plugin.iceberg;

import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveTransactionHandle;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorHandleResolver;
Expand All @@ -35,7 +34,7 @@ public Class<? extends ConnectorTableHandle> getTableHandleClass()
@Override
public Class<? extends ColumnHandle> getColumnHandleClass()
{
return HiveColumnHandle.class;
return IcebergColumnHandle.class;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airlift.slice.Slice;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveWrittenPartitions;
import io.prestosql.plugin.hive.TableAlreadyExistsException;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
Expand Down Expand Up @@ -79,7 +78,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveColumnHandle.updateRowIdHandle;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.prestosql.plugin.hive.HiveSchemaProperties.getLocation;
import static io.prestosql.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation;
import static io.prestosql.plugin.iceberg.DomainConverter.convertTupleDomainTypes;
Expand All @@ -106,7 +105,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
Expand Down Expand Up @@ -207,14 +205,17 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());
List<HiveColumnHandle> columns = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager);
return columns.stream().collect(toMap(HiveColumnHandle::getName, identity()));
return getColumns(icebergTable.schema(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, identity()));
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
HiveColumnHandle column = (HiveColumnHandle) columnHandle;
IcebergColumnHandle column = (IcebergColumnHandle) columnHandle;
if (column.getComment().isPresent()) {
return new ColumnMetadata(column.getName(), column.getType(), column.getComment().get());
}
return new ColumnMetadata(column.getName(), column.getType());
}

Expand Down Expand Up @@ -319,7 +320,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
tableName,
SchemaParser.toJson(metadata.schema()),
PartitionSpecParser.toJson(metadata.spec()),
getColumns(metadata.schema(), metadata.spec(), typeManager),
getColumns(metadata.schema(), typeManager),
targetPath,
getFileFormat(tableMetadata.getProperties()));
}
Expand Down Expand Up @@ -353,7 +354,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
table.getTableName(),
SchemaParser.toJson(icebergTable.schema()),
PartitionSpecParser.toJson(icebergTable.spec()),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), typeManager),
getDataPath(icebergTable.location()),
getFileFormat(icebergTable));
}
Expand Down Expand Up @@ -433,7 +434,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
HiveColumnHandle handle = (HiveColumnHandle) column;
IcebergColumnHandle handle = (IcebergColumnHandle) column;
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, icebergTableHandle.getSchemaTableName());
icebergTable.updateSchema().deleteColumn(handle.getName()).commit();
}
Expand All @@ -442,7 +443,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target)
{
IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle;
HiveColumnHandle columnHandle = (HiveColumnHandle) source;
IcebergColumnHandle columnHandle = (IcebergColumnHandle) source;
org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, icebergTableHandle.getSchemaTableName());
icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit();
}
Expand All @@ -469,7 +470,12 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema
private List<ColumnMetadata> getColumnMetadatas(org.apache.iceberg.Table table)
{
return table.schema().columns().stream()
.map(column -> new ColumnMetadata(column.name(), toPrestoType(column.type(), typeManager)))
.map(column -> {
if (column.doc() != null) {
return new ColumnMetadata(column.name(), toPrestoType(column.type(), typeManager), column.doc());
}
return new ColumnMetadata(column.name(), toPrestoType(column.type(), typeManager));
})
.collect(toImmutableList());
}

Expand All @@ -491,12 +497,6 @@ private static Schema toIcebergSchema(List<ColumnMetadata> columns)
return TypeUtil.assignFreshIds(schema, nextFieldId::getAndIncrement);
}

@Override
public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return updateRowIdHandle();
}

@Override
public Optional<ConnectorTableHandle> applyDelete(ConnectorSession session, ConnectorTableHandle handle)
{
Expand Down Expand Up @@ -544,7 +544,7 @@ public void rollback()
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
TupleDomain<HiveColumnHandle> newDomain = convertTupleDomainTypes(constraint.getSummary().transform(HiveColumnHandle.class::cast));
TupleDomain<IcebergColumnHandle> newDomain = convertTupleDomainTypes(constraint.getSummary().transform(IcebergColumnHandle.class::cast));

if (newDomain.equals(table.getPredicate())) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.prestosql.plugin.hive.CoercionPolicy;
import io.prestosql.plugin.hive.DynamicConfigurationProvider;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HdfsConfig;
import io.prestosql.plugin.hive.HdfsConfiguration;
import io.prestosql.plugin.hive.HdfsConfigurationInitializer;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveCoercionPolicy;
import io.prestosql.plugin.hive.HiveHdfsConfiguration;
import io.prestosql.plugin.hive.HiveLocationService;
import io.prestosql.plugin.hive.HiveNodePartitioningProvider;
import io.prestosql.plugin.hive.HiveTransactionManager;
import io.prestosql.plugin.hive.HiveTypeTranslator;
import io.prestosql.plugin.hive.LocationService;
import io.prestosql.plugin.hive.NamenodeStats;
import io.prestosql.plugin.hive.TypeTranslator;
import io.prestosql.plugin.hive.parquet.ParquetReaderConfig;
import io.prestosql.plugin.hive.parquet.ParquetWriterConfig;
import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
Expand All @@ -50,8 +46,6 @@ public class IcebergModule
@Override
public void configure(Binder binder)
{
binder.bind(TypeTranslator.class).toInstance(new HiveTypeTranslator());
binder.bind(CoercionPolicy.class).to(HiveCoercionPolicy.class).in(Scopes.SINGLETON);
binder.bind(HdfsConfiguration.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON);
binder.bind(HdfsEnvironment.class).in(Scopes.SINGLETON);
binder.bind(IcebergTransactionManager.class).in(Scopes.SINGLETON);
Expand Down
Loading

0 comments on commit 2ea9bce

Please sign in to comment.