From 2ea9bce3e8751dcb8a8a51df7985f4df5f1e2fde Mon Sep 17 00:00:00 2001 From: Xingyuan Lin Date: Fri, 4 Oct 2019 14:27:32 -0700 Subject: [PATCH] Introduce IcebergPageSource and IcebergColumnHandle This commit simplifies logic and fixing bugs in case of Iceberg Partition Spec evolution. --- .../io/prestosql/plugin/hive/HiveType.java | 2 +- .../plugin/hive/HiveTypeTranslator.java | 6 - presto-iceberg/README.md | 3 - .../plugin/iceberg/DomainConverter.java | 5 +- .../plugin/iceberg/ExpressionConverter.java | 13 +- .../plugin/iceberg/IcebergColumnHandle.java | 98 +++++++ .../plugin/iceberg/IcebergErrorCode.java | 4 + .../plugin/iceberg/IcebergHandleResolver.java | 3 +- .../plugin/iceberg/IcebergMetadata.java | 36 +-- .../plugin/iceberg/IcebergModule.java | 6 - .../plugin/iceberg/IcebergPageSink.java | 21 +- .../plugin/iceberg/IcebergPageSource.java | 251 ++++++++++++++++++ .../iceberg/IcebergPageSourceProvider.java | 117 ++++---- .../plugin/iceberg/IcebergSplit.java | 25 +- .../plugin/iceberg/IcebergSplitManager.java | 2 +- .../plugin/iceberg/IcebergSplitSource.java | 89 +++---- .../plugin/iceberg/IcebergTableHandle.java | 7 +- .../prestosql/plugin/iceberg/IcebergUtil.java | 62 +---- .../iceberg/IcebergWritableTableHandle.java | 7 +- .../plugin/iceberg/TypeConveter.java | 132 +++++++++ .../iceberg/TestIcebergDistributed.java | 2 +- .../plugin/iceberg/TestIcebergSmoke.java | 14 + .../spi/connector/ColumnMetadata.java | 5 + 23 files changed, 649 insertions(+), 261 deletions(-) create mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergColumnHandle.java create mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSource.java diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java index b6ef01ab2e85..6cede1bfa27b 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveType.java @@ -186,7 +186,7 @@ public static List toHiveTypes(String hiveTypes) .collect(toList())); } - private static HiveType toHiveType(TypeInfo typeInfo) + public static HiveType toHiveType(TypeInfo typeInfo) { requireNonNull(typeInfo, "typeInfo is null"); return new HiveType(typeInfo); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTypeTranslator.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTypeTranslator.java index ffd462f1a4ff..71cc7eb72a3f 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTypeTranslator.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTypeTranslator.java @@ -49,7 +49,6 @@ import static io.prestosql.spi.type.RealType.REAL; import static io.prestosql.spi.type.SmallintType.SMALLINT; import static io.prestosql.spi.type.TimestampType.TIMESTAMP; -import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; import static io.prestosql.spi.type.TinyintType.TINYINT; import static io.prestosql.spi.type.VarbinaryType.VARBINARY; import static java.lang.String.format; @@ -115,11 +114,6 @@ public TypeInfo translate(Type type) if (TIMESTAMP.equals(type)) { return HIVE_TIMESTAMP.getTypeInfo(); } - if (TIMESTAMP_WITH_TIME_ZONE.equals(type)) { - // Hive does not have TIMESTAMP_WITH_TIME_ZONE, this is just a work around for iceberg, that upstream would not approve - // so we probably will need to handle it in iceberg connector but for now this should unblock netflix users. - return HIVE_TIMESTAMP.getTypeInfo(); - } if (type instanceof DecimalType) { DecimalType decimalType = (DecimalType) type; return new DecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()); diff --git a/presto-iceberg/README.md b/presto-iceberg/README.md index 2baa0aeeb8e3..a8ccf3679fda 100644 --- a/presto-iceberg/README.md +++ b/presto-iceberg/README.md @@ -75,9 +75,6 @@ as a time travel feature which lets you query your table's snapshot at a given t before announcing the connector as ready for use. * Predicate pushdown is currently broken, which means delete is also broken. The code from the original `getTableLayouts()` implementation needs to be updated for `applyFilter()`. -* We should try to remove `HiveColumnHandle`. This will require replacing or abstracting - `HivePageSource`, which is currently used to handle schema evolution and prefilled - column values (identity partitions). * Writing of decimals and timestamps is broken, since their representation in Parquet seems to be different for Iceberg and Hive. Reads are probably also broken, but this isn't tested yet since writes don't work. diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java index 22ee9f9fb30d..c5a0eb3c8f4e 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java @@ -13,7 +13,6 @@ */ package io.prestosql.plugin.iceberg; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.spi.block.Block; import io.prestosql.spi.predicate.Domain; import io.prestosql.spi.predicate.EquatableValueSet; @@ -45,7 +44,7 @@ public final class DomainConverter { private DomainConverter() {} - public static TupleDomain convertTupleDomainTypes(TupleDomain tupleDomain) + public static TupleDomain convertTupleDomainTypes(TupleDomain tupleDomain) { if (tupleDomain.isAll() || tupleDomain.isNone()) { return tupleDomain; @@ -54,7 +53,7 @@ public static TupleDomain convertTupleDomainTypes(TupleDomain< return tupleDomain; } - Map transformedMap = new HashMap<>(); + Map transformedMap = new HashMap<>(); tupleDomain.getDomains().get().forEach((column, domain) -> { ValueSet valueSet = domain.getValues(); ValueSet transformedValueSet = valueSet; diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java index f40931294dea..0d244227921a 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java @@ -15,7 +15,6 @@ import com.google.common.base.VerifyException; import io.airlift.slice.Slice; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.spi.connector.ConnectorSession; import io.prestosql.spi.predicate.Domain; import io.prestosql.spi.predicate.EquatableValueSet; @@ -59,7 +58,7 @@ public final class ExpressionConverter { private ExpressionConverter() {} - public static Expression toIcebergExpression(TupleDomain tupleDomain, ConnectorSession session) + public static Expression toIcebergExpression(TupleDomain tupleDomain, ConnectorSession session) { if (tupleDomain.isAll()) { return alwaysTrue(); @@ -67,14 +66,12 @@ public static Expression toIcebergExpression(TupleDomain tuple if (!tupleDomain.getDomains().isPresent()) { return alwaysFalse(); } - Map domainMap = tupleDomain.getDomains().get(); + Map domainMap = tupleDomain.getDomains().get(); Expression expression = alwaysTrue(); - for (Map.Entry entry : domainMap.entrySet()) { - HiveColumnHandle columnHandle = entry.getKey(); + for (Map.Entry entry : domainMap.entrySet()) { + IcebergColumnHandle columnHandle = entry.getKey(); Domain domain = entry.getValue(); - if (!columnHandle.isHidden()) { - expression = and(expression, toIcebergExpression(columnHandle.getName(), columnHandle.getType(), domain, session)); - } + expression = and(expression, toIcebergExpression(columnHandle.getName(), columnHandle.getType(), domain, session)); } return expression; } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergColumnHandle.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergColumnHandle.java new file mode 100644 index 000000000000..32b78e1adf60 --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergColumnHandle.java @@ -0,0 +1,98 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.prestosql.spi.connector.ColumnHandle; +import io.prestosql.spi.type.Type; + +import java.util.Objects; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class IcebergColumnHandle + implements ColumnHandle +{ + private final int id; + private final String name; + private final Type type; + private final Optional comment; + + @JsonCreator + public IcebergColumnHandle( + @JsonProperty("id") int id, + @JsonProperty("name") String name, + @JsonProperty("type") Type type, + @JsonProperty("comment") Optional comment) + { + this.id = id; + this.name = requireNonNull(name, "name is null"); + this.type = requireNonNull(type, "type is null"); + this.comment = requireNonNull(comment, "comment is null"); + } + + @JsonProperty + public int getId() + { + return id; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public Type getType() + { + return type; + } + + @JsonProperty + public Optional getComment() + { + return comment; + } + + @Override + public int hashCode() + { + return Objects.hash(id, name, type, comment); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + IcebergColumnHandle other = (IcebergColumnHandle) obj; + return this.id == other.id && + Objects.equals(this.name, other.name) && + Objects.equals(this.type, other.type) && + Objects.equals(this.comment, other.comment); + } + + @Override + public String toString() + { + return id + ":" + name + ":" + type.getDisplayName(); + } +} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java index c8eec51b684a..47625623ac12 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java @@ -26,6 +26,10 @@ public enum IcebergErrorCode ICEBERG_UNKNOWN_TABLE_TYPE(0, EXTERNAL), ICEBERG_INVALID_METADATA(1, EXTERNAL), ICEBERG_TOO_MANY_OPEN_PARTITIONS(2, USER_ERROR), + ICEBERG_INVALID_PARTITION_VALUE(3, EXTERNAL), + ICEBERG_BAD_DATA(4, EXTERNAL), + ICEBERG_MISSING_DATA(5, EXTERNAL), + ICEBERG_CANNOT_OPEN_SPLIT(6, EXTERNAL), /**/; private final ErrorCode errorCode; diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergHandleResolver.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergHandleResolver.java index 937959fefe77..d37254bd9f7d 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergHandleResolver.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergHandleResolver.java @@ -13,7 +13,6 @@ */ package io.prestosql.plugin.iceberg; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.plugin.hive.HiveTransactionHandle; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorHandleResolver; @@ -35,7 +34,7 @@ public Class getTableHandleClass() @Override public Class getColumnHandleClass() { - return HiveColumnHandle.class; + return IcebergColumnHandle.class; } @Override diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java index 89b9278ae83f..7bc1532ec319 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java @@ -20,7 +20,6 @@ import io.airlift.slice.Slice; import io.prestosql.plugin.hive.HdfsEnvironment; import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.plugin.hive.HiveWrittenPartitions; import io.prestosql.plugin.hive.TableAlreadyExistsException; import io.prestosql.plugin.hive.authentication.HiveIdentity; @@ -79,7 +78,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.prestosql.plugin.hive.HiveColumnHandle.updateRowIdHandle; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.prestosql.plugin.hive.HiveSchemaProperties.getLocation; import static io.prestosql.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; import static io.prestosql.plugin.iceberg.DomainConverter.convertTupleDomainTypes; @@ -106,7 +105,6 @@ import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.TableMetadata.newTableMetadata; @@ -207,14 +205,17 @@ public Map getColumnHandles(ConnectorSession session, Conn { IcebergTableHandle table = (IcebergTableHandle) tableHandle; org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName()); - List columns = getColumns(icebergTable.schema(), icebergTable.spec(), typeManager); - return columns.stream().collect(toMap(HiveColumnHandle::getName, identity())); + return getColumns(icebergTable.schema(), typeManager).stream() + .collect(toImmutableMap(IcebergColumnHandle::getName, identity())); } @Override public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) { - HiveColumnHandle column = (HiveColumnHandle) columnHandle; + IcebergColumnHandle column = (IcebergColumnHandle) columnHandle; + if (column.getComment().isPresent()) { + return new ColumnMetadata(column.getName(), column.getType(), column.getComment().get()); + } return new ColumnMetadata(column.getName(), column.getType()); } @@ -319,7 +320,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con tableName, SchemaParser.toJson(metadata.schema()), PartitionSpecParser.toJson(metadata.spec()), - getColumns(metadata.schema(), metadata.spec(), typeManager), + getColumns(metadata.schema(), typeManager), targetPath, getFileFormat(tableMetadata.getProperties())); } @@ -353,7 +354,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto table.getTableName(), SchemaParser.toJson(icebergTable.schema()), PartitionSpecParser.toJson(icebergTable.spec()), - getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), + getColumns(icebergTable.schema(), typeManager), getDataPath(icebergTable.location()), getFileFormat(icebergTable)); } @@ -433,7 +434,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) { IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle; - HiveColumnHandle handle = (HiveColumnHandle) column; + IcebergColumnHandle handle = (IcebergColumnHandle) column; org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, icebergTableHandle.getSchemaTableName()); icebergTable.updateSchema().deleteColumn(handle.getName()).commit(); } @@ -442,7 +443,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle source, String target) { IcebergTableHandle icebergTableHandle = (IcebergTableHandle) tableHandle; - HiveColumnHandle columnHandle = (HiveColumnHandle) source; + IcebergColumnHandle columnHandle = (IcebergColumnHandle) source; org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, icebergTableHandle.getSchemaTableName()); icebergTable.updateSchema().renameColumn(columnHandle.getName(), target).commit(); } @@ -469,7 +470,12 @@ private ConnectorTableMetadata getTableMetadata(ConnectorSession session, Schema private List getColumnMetadatas(org.apache.iceberg.Table table) { return table.schema().columns().stream() - .map(column -> new ColumnMetadata(column.name(), toPrestoType(column.type(), typeManager))) + .map(column -> { + if (column.doc() != null) { + return new ColumnMetadata(column.name(), toPrestoType(column.type(), typeManager), column.doc()); + } + return new ColumnMetadata(column.name(), toPrestoType(column.type(), typeManager)); + }) .collect(toImmutableList()); } @@ -491,12 +497,6 @@ private static Schema toIcebergSchema(List columns) return TypeUtil.assignFreshIds(schema, nextFieldId::getAndIncrement); } - @Override - public ColumnHandle getUpdateRowIdColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle) - { - return updateRowIdHandle(); - } - @Override public Optional applyDelete(ConnectorSession session, ConnectorTableHandle handle) { @@ -544,7 +544,7 @@ public void rollback() public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle handle, Constraint constraint) { IcebergTableHandle table = (IcebergTableHandle) handle; - TupleDomain newDomain = convertTupleDomainTypes(constraint.getSummary().transform(HiveColumnHandle.class::cast)); + TupleDomain newDomain = convertTupleDomainTypes(constraint.getSummary().transform(IcebergColumnHandle.class::cast)); if (newDomain.equals(table.getPredicate())) { return Optional.empty(); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java index de90e9351b37..cbc125981ff4 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java @@ -16,22 +16,18 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; -import io.prestosql.plugin.hive.CoercionPolicy; import io.prestosql.plugin.hive.DynamicConfigurationProvider; import io.prestosql.plugin.hive.FileFormatDataSourceStats; import io.prestosql.plugin.hive.HdfsConfig; import io.prestosql.plugin.hive.HdfsConfiguration; import io.prestosql.plugin.hive.HdfsConfigurationInitializer; import io.prestosql.plugin.hive.HdfsEnvironment; -import io.prestosql.plugin.hive.HiveCoercionPolicy; import io.prestosql.plugin.hive.HiveHdfsConfiguration; import io.prestosql.plugin.hive.HiveLocationService; import io.prestosql.plugin.hive.HiveNodePartitioningProvider; import io.prestosql.plugin.hive.HiveTransactionManager; -import io.prestosql.plugin.hive.HiveTypeTranslator; import io.prestosql.plugin.hive.LocationService; import io.prestosql.plugin.hive.NamenodeStats; -import io.prestosql.plugin.hive.TypeTranslator; import io.prestosql.plugin.hive.parquet.ParquetReaderConfig; import io.prestosql.plugin.hive.parquet.ParquetWriterConfig; import io.prestosql.spi.connector.ConnectorNodePartitioningProvider; @@ -50,8 +46,6 @@ public class IcebergModule @Override public void configure(Binder binder) { - binder.bind(TypeTranslator.class).toInstance(new HiveTypeTranslator()); - binder.bind(CoercionPolicy.class).to(HiveCoercionPolicy.class).in(Scopes.SINGLETON); binder.bind(HdfsConfiguration.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON); binder.bind(HdfsEnvironment.class).in(Scopes.SINGLETON); binder.bind(IcebergTransactionManager.class).in(Scopes.SINGLETON); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java index 1c4b40602059..7d2346d2f99e 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java @@ -18,7 +18,6 @@ import io.airlift.slice.Slice; import io.prestosql.plugin.hive.HdfsEnvironment; import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.plugin.hive.HiveFileWriter; import io.prestosql.plugin.hive.HiveStorageFormat; import io.prestosql.plugin.hive.RecordFileWriter; @@ -77,6 +76,7 @@ import static io.prestosql.plugin.hive.util.ParquetRecordWriterUtil.setParquetSchema; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS; import static io.prestosql.plugin.iceberg.PartitionTransforms.getColumnTransform; +import static io.prestosql.plugin.iceberg.TypeConveter.toHiveType; import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc; import static io.prestosql.spi.type.Decimals.readBigDecimal; @@ -102,7 +102,7 @@ public class IcebergPageSink private final String outputPath; private final HdfsEnvironment hdfsEnvironment; private final JobConf jobConf; - private final List inputColumns; + private final List inputColumns; private final JsonCodec jsonCodec; private final ConnectorSession session; private final TypeManager typeManager; @@ -122,7 +122,7 @@ public IcebergPageSink( PageIndexerFactory pageIndexerFactory, HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, - List inputColumns, + List inputColumns, TypeManager typeManager, JsonCodec jsonCodec, ConnectorSession session, @@ -340,10 +340,10 @@ private HiveFileWriter createParquetWriter(Path outputPath) { Properties properties = new Properties(); properties.setProperty(IOConstants.COLUMNS, inputColumns.stream() - .map(HiveColumnHandle::getName) + .map(IcebergColumnHandle::getName) .collect(joining(","))); properties.setProperty(IOConstants.COLUMNS_TYPES, inputColumns.stream() - .map(column -> column.getHiveType().getHiveTypeName().toString()) + .map(column -> toHiveType(column.getType()).getHiveTypeName().toString()) .collect(joining(":"))); setParquetSchema(jobConf, convert(outputSchema, "table")); @@ -351,7 +351,7 @@ private HiveFileWriter createParquetWriter(Path outputPath) return new RecordFileWriter( outputPath, inputColumns.stream() - .map(HiveColumnHandle::getName) + .map(IcebergColumnHandle::getName) .collect(toImmutableList()), fromHiveStorageFormat(HiveStorageFormat.PARQUET), properties, @@ -432,17 +432,16 @@ public static Object getIcebergValue(Block block, int position, Type type) throw new UnsupportedOperationException("Type not supported as partition column: " + type.getDisplayName()); } - private static List toPartitionColumns(List handles, PartitionSpec partitionSpec) + private static List toPartitionColumns(List handles, PartitionSpec partitionSpec) { - Map nameChannels = new HashMap<>(); + Map idChannels = new HashMap<>(); for (int i = 0; i < handles.size(); i++) { - nameChannels.put(handles.get(i).getName(), i); + idChannels.put(handles.get(i).getId(), i); } return partitionSpec.fields().stream() .map(field -> { - String name = partitionSpec.schema().findColumnName(field.sourceId()); - Integer channel = nameChannels.get(name); + Integer channel = idChannels.get(field.sourceId()); checkArgument(channel != null, "partition field not found: %s", field); Type inputType = handles.get(channel).getType(); ColumnTransform transform = getColumnTransform(field, inputType); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSource.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSource.java new file mode 100644 index 000000000000..4fc0262d9a7b --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSource.java @@ -0,0 +1,251 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg; + +import io.airlift.slice.Slice; +import io.airlift.slice.SliceUtf8; +import io.prestosql.spi.Page; +import io.prestosql.spi.PrestoException; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.block.RunLengthEncodedBlock; +import io.prestosql.spi.connector.ConnectorPageSource; +import io.prestosql.spi.predicate.Utils; +import io.prestosql.spi.type.DecimalType; +import io.prestosql.spi.type.Decimals; +import io.prestosql.spi.type.TimeZoneKey; +import io.prestosql.spi.type.Type; +import io.prestosql.spi.type.VarbinaryType; +import io.prestosql.spi.type.VarcharType; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Throwables.throwIfInstanceOf; +import static io.airlift.slice.Slices.utf8Slice; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; +import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.prestosql.spi.type.BigintType.BIGINT; +import static io.prestosql.spi.type.BooleanType.BOOLEAN; +import static io.prestosql.spi.type.DateTimeEncoding.packDateTimeWithZone; +import static io.prestosql.spi.type.DateType.DATE; +import static io.prestosql.spi.type.Decimals.isLongDecimal; +import static io.prestosql.spi.type.Decimals.isShortDecimal; +import static io.prestosql.spi.type.DoubleType.DOUBLE; +import static io.prestosql.spi.type.IntegerType.INTEGER; +import static io.prestosql.spi.type.RealType.REAL; +import static io.prestosql.spi.type.TimeType.TIME; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP; +import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; +import static io.prestosql.spi.type.Varchars.isVarcharType; +import static java.lang.Double.parseDouble; +import static java.lang.Float.floatToRawIntBits; +import static java.lang.Float.parseFloat; +import static java.lang.Long.parseLong; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class IcebergPageSource + implements ConnectorPageSource +{ + private final Block[] prefilledBlocks; + private final int[] delegateIndexes; + private final ConnectorPageSource delegate; + + public IcebergPageSource( + List columns, + Map partitionKeys, + ConnectorPageSource delegate, + TimeZoneKey timeZoneKey) + { + int size = requireNonNull(columns, "columns is null").size(); + requireNonNull(partitionKeys, "partitionKeys is null"); + this.delegate = requireNonNull(delegate, "delegate is null"); + + this.prefilledBlocks = new Block[size]; + this.delegateIndexes = new int[size]; + + int outputIndex = 0; + int delegateIndex = 0; + for (IcebergColumnHandle column : columns) { + String partitionValue = partitionKeys.get(column.getId()); + if (partitionValue != null) { + Type type = column.getType(); + Object prefilledValue = deserializePartitionValue(type, partitionValue, column.getName(), timeZoneKey); + prefilledBlocks[outputIndex] = Utils.nativeValueToBlock(type, prefilledValue); + delegateIndexes[outputIndex] = -1; + } + else { + delegateIndexes[outputIndex] = delegateIndex; + delegateIndex++; + } + outputIndex++; + } + } + + @Override + public long getCompletedBytes() + { + return delegate.getCompletedBytes(); + } + + @Override + public long getReadTimeNanos() + { + return delegate.getReadTimeNanos(); + } + + @Override + public boolean isFinished() + { + return delegate.isFinished(); + } + + @Override + public Page getNextPage() + { + try { + Page dataPage = delegate.getNextPage(); + if (dataPage == null) { + return null; + } + int batchSize = dataPage.getPositionCount(); + Block[] blocks = new Block[prefilledBlocks.length]; + for (int i = 0; i < prefilledBlocks.length; i++) { + if (prefilledBlocks[i] != null) { + blocks[i] = new RunLengthEncodedBlock(prefilledBlocks[i], batchSize); + } + else { + blocks[i] = dataPage.getBlock(delegateIndexes[i]); + } + } + return new Page(batchSize, blocks); + } + catch (RuntimeException e) { + closeWithSuppression(e); + throwIfInstanceOf(e, PrestoException.class); + throw new PrestoException(ICEBERG_BAD_DATA, e); + } + } + + @Override + public void close() + { + try { + delegate.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public String toString() + { + return delegate.toString(); + } + + @Override + public long getSystemMemoryUsage() + { + return delegate.getSystemMemoryUsage(); + } + + protected void closeWithSuppression(Throwable throwable) + { + requireNonNull(throwable, "throwable is null"); + try { + close(); + } + catch (RuntimeException e) { + // Self-suppression not permitted + if (throwable != e) { + throwable.addSuppressed(e); + } + } + } + + private static Object deserializePartitionValue(Type type, String valueString, String name, TimeZoneKey timeZoneKey) + { + try { + if (type.equals(BOOLEAN)) { + if (valueString.equalsIgnoreCase("true")) { + return true; + } + if (valueString.equalsIgnoreCase("false")) { + return false; + } + throw new IllegalArgumentException(); + } + if (type.equals(INTEGER)) { + return parseLong(valueString); + } + if (type.equals(BIGINT)) { + return parseLong(valueString); + } + if (type.equals(REAL)) { + return (long) floatToRawIntBits(parseFloat(valueString)); + } + if (type.equals(DOUBLE)) { + return parseDouble(valueString); + } + if (type.equals(DATE)) { + return parseLong(valueString); + } + if (type.equals(TIME)) { + return parseLong(valueString); + } + if (type.equals(TIMESTAMP)) { + return parseLong(valueString); + } + if (type.equals(TIMESTAMP_WITH_TIME_ZONE)) { + return packDateTimeWithZone(parseLong(valueString), timeZoneKey); + } + if (isVarcharType(type)) { + Slice value = utf8Slice(valueString); + VarcharType varcharType = (VarcharType) type; + if (!varcharType.isUnbounded() && SliceUtf8.countCodePoints(value) > varcharType.getBoundedLength()) { + throw new IllegalArgumentException(); + } + return value; + } + if (type.equals(VarbinaryType.VARBINARY)) { + return utf8Slice(valueString); + } + if (isShortDecimal(type) || isLongDecimal(type)) { + DecimalType decimalType = (DecimalType) type; + BigDecimal decimal = new BigDecimal(valueString); + decimal = decimal.setScale(decimalType.getScale(), BigDecimal.ROUND_UNNECESSARY); + if (decimal.precision() > decimalType.getPrecision()) { + throw new IllegalArgumentException(); + } + BigInteger unscaledValue = decimal.unscaledValue(); + return isShortDecimal(type) ? unscaledValue.longValue() : Decimals.encodeUnscaledValue(unscaledValue); + } + } + catch (IllegalArgumentException e) { + throw new PrestoException(ICEBERG_INVALID_PARTITION_VALUE, format( + "Invalid partition value '%s' for %s partition key: %s", + valueString, + type.getDisplayName(), + name)); + } + // Iceberg tables don't partition by non-primitive-type columns. + throw new PrestoException(GENERIC_INTERNAL_ERROR, "Invalid partition type " + type.toString()); + } +} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java index 588064d740aa..fb08c4392947 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java @@ -27,10 +27,6 @@ import io.prestosql.plugin.hive.FileFormatDataSourceStats; import io.prestosql.plugin.hive.HdfsEnvironment; import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext; -import io.prestosql.plugin.hive.HiveColumnHandle; -import io.prestosql.plugin.hive.HivePageSource; -import io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMapping; -import io.prestosql.plugin.hive.HivePartitionKey; import io.prestosql.plugin.hive.parquet.ParquetPageSource; import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ColumnHandle; @@ -40,9 +36,10 @@ import io.prestosql.spi.connector.ConnectorSplit; import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.ConnectorTransactionHandle; +import io.prestosql.spi.predicate.Domain; import io.prestosql.spi.predicate.TupleDomain; +import io.prestosql.spi.type.StandardTypes; import io.prestosql.spi.type.Type; -import io.prestosql.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -55,7 +52,6 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.MessageType; -import org.joda.time.DateTimeZone; import javax.inject.Inject; @@ -65,10 +61,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalInt; import java.util.function.Function; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; @@ -77,14 +71,11 @@ import static io.prestosql.parquet.ParquetTypeUtils.getParquetTypeByName; import static io.prestosql.parquet.predicate.PredicateUtils.buildPredicate; import static io.prestosql.parquet.predicate.PredicateUtils.predicateMatches; -import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; -import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; -import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; -import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA; -import static io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings; import static io.prestosql.plugin.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; import static io.prestosql.plugin.hive.parquet.ParquetColumnIOConverter.constructField; -import static io.prestosql.plugin.hive.parquet.ParquetPageSourceFactory.getParquetTupleDomain; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetMaxReadBlockSize; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isFailOnCorruptedParquetStatistics; import static java.lang.String.format; @@ -95,17 +86,14 @@ public class IcebergPageSourceProvider implements ConnectorPageSourceProvider { private final HdfsEnvironment hdfsEnvironment; - private final TypeManager typeManager; private final FileFormatDataSourceStats fileFormatDataSourceStats; @Inject public IcebergPageSourceProvider( HdfsEnvironment hdfsEnvironment, - TypeManager typeManager, FileFormatDataSourceStats fileFormatDataSourceStats) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); - this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); } @@ -115,28 +103,35 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti IcebergSplit split = (IcebergSplit) connectorSplit; IcebergTableHandle table = (IcebergTableHandle) connectorTable; + List icebergColumns = columns.stream() + .map(IcebergColumnHandle.class::cast) + .collect(toImmutableList()); + + Map partitionKeys = split.getPartitionKeys(); + + List regularColumns = columns.stream() + .map(IcebergColumnHandle.class::cast) + .filter(column -> !partitionKeys.containsKey(column.getId())) + .collect(toImmutableList()); + Path path = new Path(split.getPath()); long start = split.getStart(); long length = split.getLength(); - List hiveColumns = columns.stream() - .map(HiveColumnHandle.class::cast) - .collect(toList()); HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName()); - return createParquetPageSource( + ConnectorPageSource parquetPageSource = createParquetPageSource( hdfsEnvironment, session.getUser(), hdfsEnvironment.getConfiguration(hdfsContext, path), path, start, length, - hiveColumns, - split.getNameToId(), - typeManager, + regularColumns, getParquetMaxReadBlockSize(session), isFailOnCorruptedParquetStatistics(session), split.getPredicate(), - split.getPartitionKeys(), fileFormatDataSourceStats); + + return new IcebergPageSource(icebergColumns, partitionKeys, parquetPageSource, session.getTimeZoneKey()); } private static ConnectorPageSource createParquetPageSource( @@ -146,13 +141,10 @@ private static ConnectorPageSource createParquetPageSource( Path path, long start, long length, - List columns, - Map icebergNameToId, - TypeManager typeManager, + List regularColumns, DataSize maxReadBlockSize, boolean failOnCorruptedStatistics, - TupleDomain effectivePredicate, - List partitionKeys, + TupleDomain effectivePredicate, FileFormatDataSourceStats fileFormatDataSourceStats) { AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext(); @@ -168,10 +160,6 @@ private static ConnectorPageSource createParquetPageSource( FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); - List regularColumns = columns.stream() - .filter(column -> column.getColumnType() == REGULAR) - .collect(toImmutableList()); - // Mapping from Iceberg field ID to Parquet fields. Map parquetIdToField = fileSchema.getFields().stream() .filter(field -> field.getId() != null) @@ -179,17 +167,11 @@ private static ConnectorPageSource createParquetPageSource( List parquetFields = regularColumns.stream() .map(column -> { - String columnName = column.getName(); - Integer id = icebergNameToId.get(columnName); - checkArgument(id != null, "column name not in ID map: %s", columnName); - - org.apache.parquet.schema.Type parquetField; if (parquetIdToField.isEmpty()) { // This is a migrated table - return getParquetTypeByName(columnName, fileSchema); + return getParquetTypeByName(column.getName(), fileSchema); } - - return parquetIdToField.get(id); + return parquetIdToField.get(column.getId()); }) .collect(toList()); @@ -215,22 +197,10 @@ private static ConnectorPageSource createParquetPageSource( systemMemoryContext, maxReadBlockSize); - List columnMappings = buildColumnMappings( - partitionKeys, - columns.stream() - .filter(column -> !column.isHidden()) - .collect(toImmutableList()), - ImmutableList.of(), - ImmutableMap.of(), - path, - OptionalInt.empty(), - fileSize, - fileStatus.getModificationTime()); - ImmutableList.Builder prestoTypes = ImmutableList.builder(); ImmutableList.Builder> internalFields = ImmutableList.builder(); for (int columnIndex = 0; columnIndex < regularColumns.size(); columnIndex++) { - HiveColumnHandle column = regularColumns.get(columnIndex); + IcebergColumnHandle column = regularColumns.get(columnIndex); org.apache.parquet.schema.Type parquetField = parquetFields.get(columnIndex); Type prestoType = column.getType(); @@ -241,18 +211,11 @@ private static ConnectorPageSource createParquetPageSource( internalFields.add(Optional.empty()); } else { - internalFields.add(constructField(prestoType, messageColumnIO.getChild(parquetField.getName()))); + internalFields.add(constructField(column.getType(), messageColumnIO.getChild(parquetField.getName()))); } } - ParquetPageSource parquetPageSource = new ParquetPageSource(parquetReader, prestoTypes.build(), internalFields.build()); - - return new HivePageSource( - columnMappings, - Optional.empty(), - DateTimeZone.UTC, - typeManager, - parquetPageSource); + return new ParquetPageSource(parquetReader, prestoTypes.build(), internalFields.build()); } catch (IOException | RuntimeException e) { try { @@ -268,13 +231,33 @@ private static ConnectorPageSource createParquetPageSource( String message = format("Error opening Iceberg split %s (offset=%s, length=%s): %s", path, start, length, e.getMessage()); if (e instanceof ParquetCorruptionException) { - throw new PrestoException(HIVE_BAD_DATA, message, e); + throw new PrestoException(ICEBERG_BAD_DATA, message, e); } if (e instanceof BlockMissingException) { - throw new PrestoException(HIVE_MISSING_DATA, message, e); + throw new PrestoException(ICEBERG_MISSING_DATA, message, e); } - throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e); + throw new PrestoException(ICEBERG_CANNOT_OPEN_SPLIT, message, e); + } + } + + private static TupleDomain getParquetTupleDomain(Map, RichColumnDescriptor> descriptorsByPath, TupleDomain effectivePredicate) + { + if (effectivePredicate.isNone()) { + return TupleDomain.none(); } + + ImmutableMap.Builder predicate = ImmutableMap.builder(); + effectivePredicate.getDomains().get().forEach((columnHandle, domain) -> { + String baseType = columnHandle.getType().getTypeSignature().getBase(); + // skip looking up predicates for complex types as Parquet only stores stats for primitives + if (!baseType.equals(StandardTypes.MAP) && !baseType.equals(StandardTypes.ARRAY) && !baseType.equals(StandardTypes.ROW)) { + RichColumnDescriptor descriptor = descriptorsByPath.get(ImmutableList.of(columnHandle.getName())); + if (descriptor != null) { + predicate.put(descriptor, domain); + } + } + }); + return TupleDomain.withColumnDomains(predicate.build()); } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java index 8e72a50bd828..a01a8fd3687a 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java @@ -17,8 +17,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import io.prestosql.plugin.hive.HiveColumnHandle; -import io.prestosql.plugin.hive.HivePartitionKey; import io.prestosql.spi.HostAddress; import io.prestosql.spi.connector.ConnectorSplit; import io.prestosql.spi.predicate.TupleDomain; @@ -36,9 +34,8 @@ public class IcebergSplit private final long start; private final long length; private final List addresses; - private final Map nameToId; - private final TupleDomain predicate; - private final List partitionKeys; + private final TupleDomain predicate; + private final Map partitionKeys; @JsonCreator public IcebergSplit( @@ -46,17 +43,15 @@ public IcebergSplit( @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("addresses") List addresses, - @JsonProperty("nameToId") Map nameToId, - @JsonProperty("predicate") TupleDomain predicate, - @JsonProperty("partitionKeys") List partitionKeys) + @JsonProperty("predicate") TupleDomain predicate, + @JsonProperty("partitionKeys") Map partitionKeys) { this.path = requireNonNull(path, "path is null"); this.start = start; this.length = length; this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null")); - this.nameToId = ImmutableMap.copyOf(requireNonNull(nameToId, "nameToId is null")); this.predicate = requireNonNull(predicate, "predicate is null"); - this.partitionKeys = ImmutableList.copyOf(requireNonNull(partitionKeys, "partitionKeys is null")); + this.partitionKeys = ImmutableMap.copyOf(requireNonNull(partitionKeys, "partitionKeys is null")); } @Override @@ -91,19 +86,13 @@ public long getLength() } @JsonProperty - public Map getNameToId() - { - return nameToId; - } - - @JsonProperty - public TupleDomain getPredicate() + public TupleDomain getPredicate() { return predicate; } @JsonProperty - public List getPartitionKeys() + public Map getPartitionKeys() { return partitionKeys; } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java index 9f6cdf1fac56..bd853e4eff3f 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java @@ -58,7 +58,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co // TODO Use residual. Right now there is no way to propagate residual to presto but at least we can // propagate it at split level so the parquet pushdown can leverage it. - IcebergSplitSource splitSource = new IcebergSplitSource(tableScan.planTasks(), table.getPredicate(), icebergTable.schema()); + IcebergSplitSource splitSource = new IcebergSplitSource(tableScan.planTasks(), table.getPredicate()); return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader()); } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java index 229ecb2925d3..148d4a660a39 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java @@ -14,9 +14,9 @@ package io.prestosql.plugin.iceberg; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; -import io.prestosql.plugin.hive.HiveColumnHandle; -import io.prestosql.plugin.hive.HivePartitionKey; +import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ConnectorPartitionHandle; import io.prestosql.spi.connector.ConnectorSplit; import io.prestosql.spi.connector.ConnectorSplitSource; @@ -25,18 +25,13 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types.NestedField; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -44,40 +39,31 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterators.limit; -import static io.prestosql.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION; +import static io.prestosql.plugin.iceberg.DomainConverter.convertTupleDomainTypes; +import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; import static io.prestosql.plugin.iceberg.IcebergUtil.getIdentityPartitions; -import static java.lang.Math.toIntExact; +import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.time.ZoneOffset.UTC; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.iceberg.types.Type.TypeID.BINARY; +import static org.apache.iceberg.types.Type.TypeID.FIXED; public class IcebergSplitSource implements ConnectorSplitSource { - private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").withZone(UTC); - private final CloseableIterable combinedScanIterable; - private final TupleDomain predicate; - private final Map nameToId; + private final TupleDomain predicate; private final Iterator fileScanIterator; public IcebergSplitSource( CloseableIterable combinedScanIterable, - TupleDomain predicate, - Schema schema) + TupleDomain predicate) { this.combinedScanIterable = requireNonNull(combinedScanIterable, "combinedScanIterable is null"); this.predicate = requireNonNull(predicate, "predicate is null"); - this.nameToId = requireNonNull(schema, "schema is null").columns().stream() - .collect(toImmutableMap(NestedField::name, NestedField::fieldId)); - this.fileScanIterator = Streams.stream(combinedScanIterable) .map(CombinedScanTask::files) .flatMap(Collection::stream) @@ -89,6 +75,7 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan { // TODO: move this to a background thread List splits = new ArrayList<>(); + TupleDomain predicate = convertTupleDomainTypes(this.predicate); Iterator iterator = limit(fileScanIterator, maxSize); while (iterator.hasNext()) { FileScanTask task = iterator.next(); @@ -114,7 +101,7 @@ public void close() } } - private ConnectorSplit toIcebergSplit(TupleDomain predicate, FileScanTask task) + private ConnectorSplit toIcebergSplit(TupleDomain predicate, FileScanTask task) { // TODO: We should leverage residual expression and convert that to TupleDomain. // The predicate here is used by readers for predicate push down at reader level, @@ -126,51 +113,41 @@ private ConnectorSplit toIcebergSplit(TupleDomain predicate, F task.start(), task.length(), ImmutableList.of(), - nameToId, predicate, getPartitionKeys(task)); } - private static List getPartitionKeys(FileScanTask scanTask) + private static Map getPartitionKeys(FileScanTask scanTask) { StructLike partition = scanTask.file().partition(); PartitionSpec spec = scanTask.spec(); Map fieldToIndex = getIdentityPartitions(spec); - List partitionKeys = new ArrayList<>(); + ImmutableMap.Builder partitionKeys = ImmutableMap.builder(); fieldToIndex.forEach((field, index) -> { - String name = field.name(); - Type sourceType = spec.schema().findType(field.sourceId()); - Type partitionType = field.transform().getResultType(sourceType); - Class javaClass = partitionType.typeId().javaClass(); + int id = field.sourceId(); + Type type = spec.schema().findType(id); + Class javaClass = type.typeId().javaClass(); Object value = partition.get(index, javaClass); - String partitionValue = HIVE_DEFAULT_DYNAMIC_PARTITION; - if (value != null) { - switch (partitionType.typeId()) { - case DATE: - partitionValue = DATE_FORMATTER.format(LocalDate.ofEpochDay((int) value)); - break; - case TIMESTAMP: - partitionValue = TIMESTAMP_FORMATTER.format(toLocalDateTime((long) value)); - break; - case FIXED: - case BINARY: - partitionValue = new String(((ByteBuffer) value).array(), UTF_8); - break; - default: - partitionValue = value.toString(); - } + + if (value == null) { + throw new PrestoException(ICEBERG_INVALID_PARTITION_VALUE, format( + "File %s has no partition data for partitioning column %s", + scanTask.file().path().toString(), + field.name())); + } + + String partitionValue; + if (type.typeId() == FIXED || type.typeId() == BINARY) { + // this is safe because Iceberg PartitionData directly wraps the byte array + partitionValue = new String(((ByteBuffer) value).array(), UTF_8); + } + else { + partitionValue = value.toString(); } - partitionKeys.add(new HivePartitionKey(name, partitionValue)); + partitionKeys.put(id, partitionValue); }); - return partitionKeys; - } - private static LocalDateTime toLocalDateTime(long epochMicro) - { - long epochSecond = MICROSECONDS.toSeconds(epochMicro); - long microOfSecond = epochMicro - SECONDS.toMicros(epochSecond); - int nanoOfSecond = toIntExact(MICROSECONDS.toNanos(microOfSecond)); - return LocalDateTime.ofEpochSecond(epochSecond, nanoOfSecond, UTC); + return partitionKeys.build(); } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java index ea413ac695e8..bd1ccfbe082b 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableHandle.java @@ -15,7 +15,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.spi.PrestoException; import io.prestosql.spi.connector.ConnectorTableHandle; import io.prestosql.spi.connector.SchemaTableName; @@ -43,7 +42,7 @@ public class IcebergTableHandle private final String tableName; private final TableType tableType; private final Optional snapshotId; - private final TupleDomain predicate; + private final TupleDomain predicate; @JsonCreator public IcebergTableHandle( @@ -51,7 +50,7 @@ public IcebergTableHandle( @JsonProperty("tableName") String tableName, @JsonProperty("tableType") TableType tableType, @JsonProperty("snapshotId") Optional snapshotId, - @JsonProperty("predicate") TupleDomain predicate) + @JsonProperty("predicate") TupleDomain predicate) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -85,7 +84,7 @@ public Optional getSnapshotId() } @JsonProperty - public TupleDomain getPredicate() + public TupleDomain getPredicate() { return predicate; } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java index 81743e7209d9..1429e476e69b 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java @@ -13,15 +13,9 @@ */ package io.prestosql.plugin.iceberg; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.prestosql.plugin.hive.HdfsEnvironment; import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext; -import io.prestosql.plugin.hive.HiveColumnHandle; -import io.prestosql.plugin.hive.HiveColumnHandle.ColumnType; -import io.prestosql.plugin.hive.HiveType; -import io.prestosql.plugin.hive.HiveTypeTranslator; -import io.prestosql.plugin.hive.TypeTranslator; import io.prestosql.plugin.hive.authentication.HiveIdentity; import io.prestosql.plugin.hive.metastore.HiveMetastore; import io.prestosql.spi.connector.ConnectorSession; @@ -37,22 +31,15 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import static com.google.common.collect.Maps.uniqueIndex; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Streams.stream; -import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; -import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; -import static io.prestosql.plugin.hive.HiveType.toHiveType; import static io.prestosql.plugin.iceberg.TypeConveter.toPrestoType; -import static io.prestosql.spi.type.TimestampType.TIMESTAMP; -import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; @@ -60,8 +47,6 @@ final class IcebergUtil { - private static final TypeTranslator TYPE_TRANSLATOR = new HiveTypeTranslator(); - private IcebergUtil() {} public static boolean isIcebergTable(io.prestosql.plugin.hive.metastore.Table table) @@ -77,42 +62,15 @@ public static Table getIcebergTable(HiveMetastore metastore, HdfsEnvironment hdf return new BaseTable(operations, table.getSchemaName() + "." + table.getTableName()); } - public static List getColumns(Schema schema, PartitionSpec spec, TypeManager typeManager) + public static List getColumns(Schema schema, TypeManager typeManager) { - // Iceberg may or may not store identity columns in data file and the identity transformations have the same name as data column. - // So we remove the identity columns from the set of regular columns which does not work with some of Presto's validation. - - List partitionFields = ImmutableList.copyOf(getIdentityPartitions(spec).keySet()); - Map partitionColumnNames = uniqueIndex(partitionFields, PartitionField::name); - - int columnIndex = 0; - ImmutableList.Builder builder = ImmutableList.builder(); - - for (Types.NestedField column : schema.columns()) { - Type type = column.type(); - ColumnType columnType = REGULAR; - if (partitionColumnNames.containsKey(column.name())) { - PartitionField partitionField = partitionColumnNames.get(column.name()); - Type sourceType = schema.findType(partitionField.sourceId()); - type = partitionField.transform().getResultType(sourceType); - columnType = PARTITION_KEY; - } - io.prestosql.spi.type.Type prestoType = toPrestoType(type, typeManager); - HiveType hiveType = toHiveType(TYPE_TRANSLATOR, coerceForHive(prestoType)); - HiveColumnHandle columnHandle = new HiveColumnHandle(column.name(), hiveType, prestoType, columnIndex, columnType, Optional.empty()); - columnIndex++; - builder.add(columnHandle); - } - - return builder.build(); - } - - public static io.prestosql.spi.type.Type coerceForHive(io.prestosql.spi.type.Type prestoType) - { - if (prestoType.equals(TIMESTAMP_WITH_TIME_ZONE)) { - return TIMESTAMP; - } - return prestoType; + return schema.columns().stream() + .map(column -> new IcebergColumnHandle( + column.fieldId(), + column.name(), + toPrestoType(column.type(), typeManager), + Optional.ofNullable(column.doc()))) + .collect(toImmutableList()); } public static Map getIdentityPartitions(PartitionSpec partitionSpec) @@ -143,7 +101,7 @@ public static FileFormat getFileFormat(Table table) .toUpperCase(Locale.ENGLISH)); } - public static TableScan getTableScan(ConnectorSession session, TupleDomain predicates, Optional snapshotId, Table icebergTable) + public static TableScan getTableScan(ConnectorSession session, TupleDomain predicates, Optional snapshotId, Table icebergTable) { Expression expression = ExpressionConverter.toIcebergExpression(predicates, session); TableScan tableScan = icebergTable.newScan().filter(expression); diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergWritableTableHandle.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergWritableTableHandle.java index 991c16db4b49..5d0f01421618 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergWritableTableHandle.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergWritableTableHandle.java @@ -16,7 +16,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; -import io.prestosql.plugin.hive.HiveColumnHandle; import io.prestosql.spi.connector.ConnectorInsertTableHandle; import io.prestosql.spi.connector.ConnectorOutputTableHandle; import org.apache.iceberg.FileFormat; @@ -32,7 +31,7 @@ public class IcebergWritableTableHandle private final String tableName; private final String schemaAsJson; private final String partitionSpecAsJson; - private final List inputColumns; + private final List inputColumns; private final String outputPath; private final FileFormat fileFormat; @@ -42,7 +41,7 @@ public IcebergWritableTableHandle( @JsonProperty("tableName") String tableName, @JsonProperty("schemaAsJson") String schemaAsJson, @JsonProperty("partitionSpecAsJson") String partitionSpecAsJson, - @JsonProperty("inputColumns") List inputColumns, + @JsonProperty("inputColumns") List inputColumns, @JsonProperty("outputPath") String outputPath, @JsonProperty("fileFormat") FileFormat fileFormat) { @@ -80,7 +79,7 @@ public String getPartitionSpecAsJson() } @JsonProperty - public List getInputColumns() + public List getInputColumns() { return inputColumns; } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConveter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConveter.java index bf2883a3ef56..e5e60cc74855 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConveter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConveter.java @@ -15,15 +15,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.prestosql.plugin.hive.HiveType; import io.prestosql.spi.PrestoException; import io.prestosql.spi.type.ArrayType; import io.prestosql.spi.type.BigintType; import io.prestosql.spi.type.BooleanType; +import io.prestosql.spi.type.CharType; import io.prestosql.spi.type.DateType; import io.prestosql.spi.type.DecimalType; import io.prestosql.spi.type.DoubleType; import io.prestosql.spi.type.IntegerType; import io.prestosql.spi.type.MapType; +import io.prestosql.spi.type.NamedTypeSignature; import io.prestosql.spi.type.RealType; import io.prestosql.spi.type.RowType; import io.prestosql.spi.type.StandardTypes; @@ -36,6 +39,10 @@ import io.prestosql.spi.type.TypeSignatureParameter; import io.prestosql.spi.type.VarbinaryType; import io.prestosql.spi.type.VarcharType; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.iceberg.types.Types; import java.util.ArrayList; @@ -44,8 +51,39 @@ import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.prestosql.plugin.hive.HiveType.HIVE_BINARY; +import static io.prestosql.plugin.hive.HiveType.HIVE_BOOLEAN; +import static io.prestosql.plugin.hive.HiveType.HIVE_BYTE; +import static io.prestosql.plugin.hive.HiveType.HIVE_DATE; +import static io.prestosql.plugin.hive.HiveType.HIVE_DOUBLE; +import static io.prestosql.plugin.hive.HiveType.HIVE_FLOAT; +import static io.prestosql.plugin.hive.HiveType.HIVE_INT; +import static io.prestosql.plugin.hive.HiveType.HIVE_LONG; +import static io.prestosql.plugin.hive.HiveType.HIVE_SHORT; +import static io.prestosql.plugin.hive.HiveType.HIVE_STRING; +import static io.prestosql.plugin.hive.HiveType.HIVE_TIMESTAMP; +import static io.prestosql.plugin.hive.util.HiveUtil.isArrayType; +import static io.prestosql.plugin.hive.util.HiveUtil.isMapType; +import static io.prestosql.plugin.hive.util.HiveUtil.isRowType; import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.prestosql.spi.type.BigintType.BIGINT; +import static io.prestosql.spi.type.BooleanType.BOOLEAN; +import static io.prestosql.spi.type.DateType.DATE; +import static io.prestosql.spi.type.DoubleType.DOUBLE; +import static io.prestosql.spi.type.IntegerType.INTEGER; +import static io.prestosql.spi.type.RealType.REAL; +import static io.prestosql.spi.type.SmallintType.SMALLINT; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP; +import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; +import static io.prestosql.spi.type.TinyintType.TINYINT; +import static io.prestosql.spi.type.VarbinaryType.VARBINARY; import static java.lang.String.format; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getCharTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getListTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getMapTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getStructTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getVarcharTypeInfo; public final class TypeConveter { @@ -135,6 +173,11 @@ public static org.apache.iceberg.types.Type toIcebergType(Type type) throw new PrestoException(NOT_SUPPORTED, "Type not supported for Iceberg: " + type.getDisplayName()); } + public static HiveType toHiveType(Type type) + { + return HiveType.toHiveType(toHiveTypeInfo(type)); + } + private static org.apache.iceberg.types.Type fromDecimal(DecimalType type) { return Types.DecimalType.of(type.getPrecision(), type.getScale()); @@ -160,4 +203,93 @@ private static org.apache.iceberg.types.Type fromMap(MapType type) { return Types.MapType.ofOptional(1, 2, toIcebergType(type.getKeyType()), toIcebergType(type.getValueType())); } + + private static TypeInfo toHiveTypeInfo(Type type) + { + if (BOOLEAN.equals(type)) { + return HIVE_BOOLEAN.getTypeInfo(); + } + if (BIGINT.equals(type)) { + return HIVE_LONG.getTypeInfo(); + } + if (INTEGER.equals(type)) { + return HIVE_INT.getTypeInfo(); + } + if (SMALLINT.equals(type)) { + return HIVE_SHORT.getTypeInfo(); + } + if (TINYINT.equals(type)) { + return HIVE_BYTE.getTypeInfo(); + } + if (REAL.equals(type)) { + return HIVE_FLOAT.getTypeInfo(); + } + if (DOUBLE.equals(type)) { + return HIVE_DOUBLE.getTypeInfo(); + } + if (type instanceof VarcharType) { + VarcharType varcharType = (VarcharType) type; + if (varcharType.isUnbounded()) { + return HIVE_STRING.getTypeInfo(); + } + if (varcharType.getBoundedLength() <= HiveVarchar.MAX_VARCHAR_LENGTH) { + return getVarcharTypeInfo(varcharType.getBoundedLength()); + } + throw new PrestoException(NOT_SUPPORTED, format("Unsupported Hive type: %s. Supported VARCHAR types: VARCHAR(<=%d), VARCHAR.", type, HiveVarchar.MAX_VARCHAR_LENGTH)); + } + if (type instanceof CharType) { + CharType charType = (CharType) type; + int charLength = charType.getLength(); + if (charLength <= HiveChar.MAX_CHAR_LENGTH) { + return getCharTypeInfo(charLength); + } + throw new PrestoException(NOT_SUPPORTED, format("Unsupported Hive type: %s. Supported CHAR types: CHAR(<=%d).", + type, HiveChar.MAX_CHAR_LENGTH)); + } + if (VARBINARY.equals(type)) { + return HIVE_BINARY.getTypeInfo(); + } + if (DATE.equals(type)) { + return HIVE_DATE.getTypeInfo(); + } + if (TIMESTAMP.equals(type)) { + return HIVE_TIMESTAMP.getTypeInfo(); + } + if (TIMESTAMP_WITH_TIME_ZONE.equals(type)) { + // Hive does not have TIMESTAMP_WITH_TIME_ZONE, this is just a work around for iceberg. + return HIVE_TIMESTAMP.getTypeInfo(); + } + if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; + return new DecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()); + } + if (isArrayType(type)) { + TypeInfo elementType = toHiveTypeInfo(type.getTypeParameters().get(0)); + return getListTypeInfo(elementType); + } + if (isMapType(type)) { + TypeInfo keyType = toHiveTypeInfo(type.getTypeParameters().get(0)); + TypeInfo valueType = toHiveTypeInfo(type.getTypeParameters().get(1)); + return getMapTypeInfo(keyType, valueType); + } + if (isRowType(type)) { + ImmutableList.Builder fieldNames = ImmutableList.builder(); + for (TypeSignatureParameter parameter : type.getTypeSignature().getParameters()) { + if (!parameter.isNamedTypeSignature()) { + throw new IllegalArgumentException(format("Expected all parameters to be named type, but got %s", parameter)); + } + NamedTypeSignature namedTypeSignature = parameter.getNamedTypeSignature(); + if (!namedTypeSignature.getName().isPresent()) { + throw new PrestoException(NOT_SUPPORTED, format("Anonymous row type is not supported in Hive. Please give each field a name: %s", type)); + } + fieldNames.add(namedTypeSignature.getName().get()); + } + return getStructTypeInfo( + fieldNames.build(), + type.getTypeParameters().stream() + .map(TypeConveter::toHiveTypeInfo) + .collect(toList())); + } + throw new PrestoException(NOT_SUPPORTED, format("Unsupported Hive type: %s", type)); + } } diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergDistributed.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergDistributed.java index 79be6e3b224f..43b312c5ca8b 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergDistributed.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergDistributed.java @@ -35,7 +35,7 @@ protected boolean supportsViews() @Override public void testDelete() { - assertQueryFails("DELETE FROM orders WHERE orderkey % 2 = 0", "This connector only supports delete where one or more partitions are deleted entirely"); + // Neither row delete nor partition delete is supported yet } @Override diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java index 3dc48c2563e9..ed2f8f732f69 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java @@ -101,6 +101,8 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) " '_integer'," + " '_bigint'," + " '_boolean'," + + " '_real'," + + " '_double'," + // " '_decimal_short', " + // " '_decimal_long'," + // " '_timestamp'," + @@ -204,6 +206,18 @@ private void testCreatePartitionedTableAs(Session session, FileFormat fileFormat dropTable(session, "test_create_partitioned_table_as"); } + @Test + public void testColumnComments() + { + Session session = getSession(); + assertUpdate(session, "CREATE TABLE test_column_comments (_bigint BIGINT COMMENT 'test column comment')"); + + assertQuery(session, "SHOW COLUMNS FROM test_column_comments", + "VALUES ('_bigint', 'bigint', '', 'test column comment')"); + + dropTable(session, "test_column_comments"); + } + @Test public void testPartitionTableBasic() { diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ColumnMetadata.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ColumnMetadata.java index 6a1c64f5bd5a..8d429a4e6029 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/connector/ColumnMetadata.java +++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ColumnMetadata.java @@ -40,6 +40,11 @@ public ColumnMetadata(String name, Type type) this(name, type, true, null, null, false, emptyMap()); } + public ColumnMetadata(String name, Type type, String comment) + { + this(name, type, true, comment, null, false, emptyMap()); + } + public ColumnMetadata(String name, Type type, String comment, boolean hidden) { this(name, type, true, comment, null, hidden, emptyMap());