diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java index 393ce128fb46..43250b6947af 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java @@ -24,12 +24,14 @@ import io.prestosql.spi.block.RunLengthEncodedBlock; import io.prestosql.spi.type.Type; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; import org.openjdk.jol.info.ClassLayout; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import static com.google.common.base.MoreObjects.toStringHelper; @@ -50,8 +52,9 @@ public class ParquetFileWriter public ParquetFileWriter( OutputStream outputStream, Callable rollbackAction, - List columnNames, List fileColumnTypes, + MessageType messageType, + Map, Type> primitiveTypes, ParquetWriterOptions parquetWriterOptions, int[] fileInputColumnIndexes, CompressionCodecName compressionCodecName) @@ -60,8 +63,8 @@ public ParquetFileWriter( this.parquetWriter = new ParquetWriter( outputStream, - columnNames, - fileColumnTypes, + messageType, + primitiveTypes, parquetWriterOptions, compressionCodecName); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java index eae73037c0ec..72edef7b00d9 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java @@ -13,6 +13,7 @@ */ package io.prestosql.plugin.hive.parquet; +import io.prestosql.parquet.writer.ParquetSchemaConverter; import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.FileWriter; import io.prestosql.plugin.hive.HdfsEnvironment; @@ -122,11 +123,14 @@ public Optional createFileWriter( return null; }; + ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(fileColumnTypes, fileColumnNames); + return Optional.of(new ParquetFileWriter( fileSystem.create(path), rollbackAction, - fileColumnNames, fileColumnTypes, + schemaConverter.getMessageType(), + schemaConverter.getPrimitiveTypes(), parquetWriterOptions, fileInputColumnIndexes, compressionCodecName)); diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java index d72c882dd797..c00378c8996d 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java @@ -21,6 +21,7 @@ import io.prestosql.orc.OrcWriterStats; import io.prestosql.orc.OutputStreamOrcDataSink; import io.prestosql.orc.metadata.OrcType; +import io.prestosql.parquet.writer.ParquetSchemaConverter; import io.prestosql.parquet.writer.ParquetWriter; import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.FileFormatDataSourceStats; @@ -499,10 +500,12 @@ private static class PrestoParquetFormatWriter public PrestoParquetFormatWriter(File targetFile, List columnNames, List types, HiveCompressionCodec compressionCodec) throws IOException { + ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(types, columnNames); + writer = new ParquetWriter( new FileOutputStream(targetFile), - columnNames, - types, + schemaConverter.getMessageType(), + schemaConverter.getPrimitiveTypes(), ParquetWriterOptions.builder().build(), compressionCodec.getParquetCompressionCodec()); } diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java index 84e2f142d7b4..056407880aad 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java @@ -22,6 +22,7 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.prestosql.parquet.writer.ParquetSchemaConverter; import io.prestosql.parquet.writer.ParquetWriter; import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.HiveConfig; @@ -712,10 +713,11 @@ private static void writeParquetColumnPresto(File outputFile, List types, throws Exception { checkArgument(types.size() == columnNames.size() && types.size() == values.length); + ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(types, columnNames); ParquetWriter writer = new ParquetWriter( new FileOutputStream(outputFile), - columnNames, - types, + schemaConverter.getMessageType(), + schemaConverter.getPrimitiveTypes(), ParquetWriterOptions.builder() .setMaxPageSize(DataSize.ofBytes(100)) .setMaxBlockSize(DataSize.ofBytes(100000)) diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java deleted file mode 100644 index c6a0ab117c2c..000000000000 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prestosql.plugin.iceberg; - -import com.google.common.base.VerifyException; -import io.prestosql.spi.block.Block; -import io.prestosql.spi.predicate.Domain; -import io.prestosql.spi.predicate.EquatableValueSet; -import io.prestosql.spi.predicate.Marker; -import io.prestosql.spi.predicate.Range; -import io.prestosql.spi.predicate.SortedRangeSet; -import io.prestosql.spi.predicate.TupleDomain; -import io.prestosql.spi.predicate.ValueSet; -import io.prestosql.spi.type.TimeType; -import io.prestosql.spi.type.TimeWithTimeZoneType; -import io.prestosql.spi.type.TimestampType; -import io.prestosql.spi.type.TimestampWithTimeZoneType; -import io.prestosql.spi.type.Type; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import static io.prestosql.spi.predicate.Utils.nativeValueToBlock; -import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -public final class DomainConverter -{ - private DomainConverter() {} - - public static TupleDomain convertTupleDomainTypes(TupleDomain tupleDomain) - { - return tupleDomain.transformDomains((column, domain) -> translateDomain(domain)); - } - - private static Domain translateDomain(Domain domain) - { - ValueSet valueSet = domain.getValues(); - Type type = domain.getType(); - if (type instanceof TimestampType || type instanceof TimestampWithTimeZoneType || type instanceof TimeType || type instanceof TimeWithTimeZoneType) { - if (valueSet instanceof EquatableValueSet) { - throw new VerifyException("Did not expect an EquatableValueSet but got " + valueSet.getClass().getSimpleName()); - } - - if (valueSet instanceof SortedRangeSet) { - List ranges = new ArrayList<>(); - for (Range range : valueSet.getRanges().getOrderedRanges()) { - Marker low = range.getLow(); - if (low.getValueBlock().isPresent()) { - Block value = nativeValueToBlock(type, convertToMicros(type, (long) range.getLow().getValue())); - low = new Marker(range.getType(), Optional.of(value), range.getLow().getBound()); - } - - Marker high = range.getHigh(); - if (high.getValueBlock().isPresent()) { - Block value = nativeValueToBlock(type, convertToMicros(type, (long) range.getHigh().getValue())); - high = new Marker(range.getType(), Optional.of(value), range.getHigh().getBound()); - } - - ranges.add(new Range(low, high)); - } - valueSet = SortedRangeSet.copyOf(valueSet.getType(), ranges); - } - return Domain.create(valueSet, domain.isNullAllowed()); - } - return domain; - } - - private static long convertToMicros(Type type, long value) - { - if (type instanceof TimestampWithTimeZoneType || type instanceof TimeWithTimeZoneType) { - return MILLISECONDS.toMicros(unpackMillisUtc(value)); - } - - if (type instanceof TimestampType || type instanceof TimeType) { - return MILLISECONDS.toMicros(value); - } - - throw new IllegalArgumentException(type + " is unsupported"); - } -} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java index f4468af59505..2980ffdf09d1 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java @@ -29,6 +29,7 @@ import io.prestosql.spi.type.MapType; import io.prestosql.spi.type.RealType; import io.prestosql.spi.type.RowType; +import io.prestosql.spi.type.TimestampType; import io.prestosql.spi.type.Type; import io.prestosql.spi.type.VarbinaryType; import io.prestosql.spi.type.VarcharType; @@ -38,6 +39,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -198,6 +200,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker) return new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), decimalType.getScale()); } + if (type instanceof TimestampType) { + return TimeUnit.MILLISECONDS.toMicros((long) marker.getValue()); + } + return marker.getValue(); } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java index 97c4280506b9..7d588e36c73c 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java @@ -16,10 +16,8 @@ import io.prestosql.plugin.hive.FileWriter; import org.apache.iceberg.Metrics; -import java.util.Optional; - public interface IcebergFileWriter extends FileWriter { - Optional getMetrics(); + Metrics getMetrics(); } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java index eb3c82e3b620..fc00b4f3b8e0 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java @@ -21,9 +21,9 @@ import io.prestosql.orc.OrcWriterOptions; import io.prestosql.orc.OrcWriterStats; import io.prestosql.orc.OutputStreamOrcDataSink; +import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.FileFormatDataSourceStats; import io.prestosql.plugin.hive.HdfsEnvironment; -import io.prestosql.plugin.hive.HiveStorageFormat; import io.prestosql.plugin.hive.NodeVersion; import io.prestosql.plugin.hive.orc.HdfsOrcDataSource; import io.prestosql.plugin.hive.orc.OrcWriterConfig; @@ -33,13 +33,10 @@ import io.prestosql.spi.type.TypeManager; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.mapred.JobConf; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; -import org.apache.parquet.hadoop.ParquetOutputFormat; -import org.weakref.jmx.Flatten; import org.weakref.jmx.Managed; import javax.inject.Inject; @@ -47,7 +44,6 @@ import java.io.IOException; import java.util.List; import java.util.Optional; -import java.util.Properties; import java.util.concurrent.Callable; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -56,8 +52,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME; import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_VERSION_NAME; -import static io.prestosql.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; -import static io.prestosql.plugin.hive.util.ParquetRecordWriterUtil.setParquetSchema; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getCompressionCodec; @@ -67,13 +61,14 @@ import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxStripeSize; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMinStripeSize; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterValidateMode; +import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetWriterBlockSize; +import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate; -import static io.prestosql.plugin.iceberg.TypeConverter.toHiveType; import static io.prestosql.plugin.iceberg.TypeConverter.toOrcType; import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType; +import static io.prestosql.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap; import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert; import static org.joda.time.DateTimeZone.UTC; @@ -103,7 +98,6 @@ public IcebergFileWriterFactory( } @Managed - @Flatten public OrcWriterStats getOrcWriterStats() { return orcWriterStats; @@ -112,14 +106,13 @@ public OrcWriterStats getOrcWriterStats() public IcebergFileWriter createFileWriter( Path outputPath, Schema icebergSchema, - List columns, JobConf jobConf, ConnectorSession session, FileFormat fileFormat) { switch (fileFormat) { case PARQUET: - return createParquetWriter(outputPath, icebergSchema, columns, jobConf, session); + return createParquetWriter(outputPath, icebergSchema, jobConf, session); case ORC: return createOrcWriter(outputPath, icebergSchema, jobConf, session); } @@ -129,32 +122,44 @@ public IcebergFileWriter createFileWriter( private IcebergFileWriter createParquetWriter( Path outputPath, Schema icebergSchema, - List columns, JobConf jobConf, ConnectorSession session) { - Properties properties = new Properties(); - properties.setProperty(IOConstants.COLUMNS, columns.stream() - .map(IcebergColumnHandle::getName) - .collect(joining(","))); - properties.setProperty(IOConstants.COLUMNS_TYPES, columns.stream() - .map(column -> toHiveType(column.getType()).getHiveTypeName().toString()) - .collect(joining(":"))); - - setParquetSchema(jobConf, convert(icebergSchema, "table")); - jobConf.set(ParquetOutputFormat.COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().name()); - - return new IcebergRecordFileWriter( - outputPath, - columns.stream() - .map(IcebergColumnHandle::getName) - .collect(toImmutableList()), - fromHiveStorageFormat(HiveStorageFormat.PARQUET), - properties, - HiveStorageFormat.PARQUET.getEstimatedWriterSystemMemoryUsage(), - jobConf, - typeManager, - session); + List fileColumnNames = icebergSchema.columns().stream() + .map(Types.NestedField::name) + .collect(toImmutableList()); + List fileColumnTypes = icebergSchema.columns().stream() + .map(column -> toPrestoType(column.type(), typeManager)) + .collect(toImmutableList()); + + try { + FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), outputPath, jobConf); + + Callable rollbackAction = () -> { + fileSystem.delete(outputPath, false); + return null; + }; + + ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder() + .setMaxPageSize(getParquetWriterPageSize(session)) + .setMaxPageSize(getParquetWriterBlockSize(session)) + .build(); + + return new IcebergParquetFileWriter( + fileSystem.create(outputPath), + rollbackAction, + fileColumnTypes, + convert(icebergSchema, "table"), + makeTypeMap(fileColumnTypes, fileColumnNames), + parquetWriterOptions, + IntStream.range(0, fileColumnNames.size()).toArray(), + getCompressionCodec(session).getParquetCompressionCodec(), + outputPath, + jobConf); + } + catch (IOException e) { + throw new PrestoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e); + } } private IcebergFileWriter createOrcWriter( diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java index d2bcf1267ce9..c51a218c3771 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java @@ -52,8 +52,6 @@ import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.security.PrestoPrincipal; import io.prestosql.spi.statistics.ComputedStatistics; -import io.prestosql.spi.type.DecimalType; -import io.prestosql.spi.type.TimestampType; import io.prestosql.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -87,7 +85,6 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.prestosql.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.prestosql.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; -import static io.prestosql.plugin.iceberg.DomainConverter.convertTupleDomainTypes; import static io.prestosql.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.prestosql.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation; import static io.prestosql.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; @@ -115,7 +112,6 @@ import static java.util.stream.Collectors.toList; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; -import static org.apache.iceberg.FileFormat.ORC; import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.Transactions.createTableTransaction; @@ -405,17 +401,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto { IcebergTableHandle table = (IcebergTableHandle) tableHandle; org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName()); - boolean orcFormat = ORC == getFileFormat(icebergTable); - - for (NestedField column : icebergTable.schema().columns()) { - io.prestosql.spi.type.Type type = toPrestoType(column.type(), typeManager); - if (type instanceof DecimalType && !orcFormat) { - throw new PrestoException(NOT_SUPPORTED, "Writing to columns of type decimal not yet supported"); - } - if (type instanceof TimestampType && !orcFormat) { - throw new PrestoException(NOT_SUPPORTED, "Writing to columns of type timestamp not yet supported for PARQUET format"); - } - } transaction = icebergTable.newTransaction(); @@ -618,10 +603,9 @@ public Optional> applyFilter(C { IcebergTableHandle table = (IcebergTableHandle) handle; // TODO: Remove TupleDomain#simplify once Iceberg supports IN expression - TupleDomain newDomain = convertTupleDomainTypes( - constraint.getSummary() - .transform(IcebergColumnHandle.class::cast) - .simplify()) + TupleDomain newDomain = constraint.getSummary() + .transform(IcebergColumnHandle.class::cast) + .simplify() .intersect(table.getPredicate()); if (newDomain.equals(table.getPredicate())) { diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java index 930eb3354ac6..64b00bc8931f 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java @@ -84,9 +84,9 @@ public IcebergOrcFileWriter( } @Override - public Optional getMetrics() + public Metrics getMetrics() { - return Optional.of(computeMetrics(icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats())); + return computeMetrics(icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats()); } private static Metrics computeMetrics(Schema icebergSchema, ColumnMetadata orcColumns, long fileRowCount, Optional> columnStatistics) diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java index da91d67a246e..adb54e30139e 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java @@ -13,7 +13,6 @@ */ package io.prestosql.plugin.iceberg; -import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableList; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; @@ -44,13 +43,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.transforms.Transform; import java.util.ArrayList; @@ -69,7 +64,6 @@ import static io.prestosql.plugin.hive.util.ConfigurationUtils.toJobConf; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS; import static io.prestosql.plugin.iceberg.PartitionTransforms.getColumnTransform; -import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc; import static io.prestosql.spi.type.Decimals.readBigDecimal; import static java.lang.Float.intBitsToFloat; @@ -93,7 +87,6 @@ public class IcebergPageSink private final IcebergFileWriterFactory fileWriterFactory; private final HdfsEnvironment hdfsEnvironment; private final JobConf jobConf; - private final List inputColumns; private final JsonCodec jsonCodec; private final ConnectorSession session; private final FileFormat fileFormat; @@ -129,7 +122,6 @@ public IcebergPageSink( this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.session = requireNonNull(session, "session is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); - this.inputColumns = ImmutableList.copyOf(inputColumns); this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec)); } @@ -169,7 +161,7 @@ public CompletableFuture> finish() CommitTaskData task = new CommitTaskData( context.getPath().toString(), - new MetricsWrapper(getMetrics(context)), + new MetricsWrapper(context.writer.getMetrics()), context.getPartitionData().map(PartitionData::toJson)); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); @@ -314,7 +306,6 @@ private WriteContext createWriter(Optional partitionPath, Optional partitionPath, Optional new VerifyException("Iceberg ORC file writers should return Iceberg metrics")); - } - throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat); - } - private static Optional getPartitionData(List columns, Page page, int position) { if (columns.isEmpty()) { diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergParquetFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergParquetFileWriter.java new file mode 100644 index 000000000000..72df75afc871 --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergParquetFileWriter.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg; + +import io.prestosql.parquet.writer.ParquetWriterOptions; +import io.prestosql.plugin.hive.parquet.ParquetFileWriter; +import io.prestosql.spi.type.Type; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; + +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import static java.util.Objects.requireNonNull; + +public class IcebergParquetFileWriter + extends ParquetFileWriter + implements IcebergFileWriter +{ + private final Path outputPath; + private final JobConf jobConf; + + public IcebergParquetFileWriter( + OutputStream outputStream, + Callable rollbackAction, + List fileColumnTypes, + MessageType messageType, + Map, Type> primitiveTypes, + ParquetWriterOptions parquetWriterOptions, + int[] fileInputColumnIndexes, + CompressionCodecName compressionCodecName, + Path outputPath, + JobConf jobConf) + { + super(outputStream, + rollbackAction, + fileColumnTypes, + messageType, + primitiveTypes, + parquetWriterOptions, + fileInputColumnIndexes, + compressionCodecName); + this.outputPath = requireNonNull(outputPath, "outputPath is null"); + this.jobConf = requireNonNull(jobConf, "jobConf is null"); + } + + @Override + public Metrics getMetrics() + { + return ParquetUtil.fileMetrics(HadoopInputFile.fromPath(outputPath, jobConf), MetricsConfig.getDefault()); + } +} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergRecordFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergRecordFileWriter.java deleted file mode 100644 index 210cae89c032..000000000000 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergRecordFileWriter.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prestosql.plugin.iceberg; - -import io.airlift.units.DataSize; -import io.prestosql.plugin.hive.RecordFileWriter; -import io.prestosql.plugin.hive.metastore.StorageFormat; -import io.prestosql.spi.connector.ConnectorSession; -import io.prestosql.spi.type.TypeManager; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.iceberg.Metrics; - -import java.util.List; -import java.util.Optional; -import java.util.Properties; - -public class IcebergRecordFileWriter - extends RecordFileWriter - implements IcebergFileWriter -{ - public IcebergRecordFileWriter( - Path path, - List inputColumnNames, - StorageFormat storageFormat, - Properties schema, - DataSize estimatedWriterSystemMemoryUsage, - JobConf conf, - TypeManager typeManager, - ConnectorSession session) - { - super(path, inputColumnNames, storageFormat, schema, estimatedWriterSystemMemoryUsage, conf, typeManager, session); - } - - @Override - public Optional getMetrics() - { - return Optional.empty(); - } -} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java index d80323dedba0..0fe287b7f083 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java @@ -291,4 +291,14 @@ public static DataSize getParquetMaxReadBlockSize(ConnectorSession session) { return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class); } + + public static DataSize getParquetWriterPageSize(ConnectorSession session) + { + return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class); + } + + public static DataSize getParquetWriterBlockSize(ConnectorSession session) + { + return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class); + } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java index 4146320712ab..3e3610eeea60 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.iceberg.StructLike; @@ -34,7 +35,8 @@ public class PartitionData { private static final String PARTITION_VALUES_FIELD = "partitionValues"; private static final JsonFactory FACTORY = new JsonFactory(); - private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY); + private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY) + .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true); private final Object[] partitionValues; diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PrimitiveTypeMapBuilder.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PrimitiveTypeMapBuilder.java new file mode 100644 index 000000000000..1cd04ce7f839 --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PrimitiveTypeMapBuilder.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg.util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.prestosql.spi.type.ArrayType; +import io.prestosql.spi.type.MapType; +import io.prestosql.spi.type.RowType; +import io.prestosql.spi.type.Type; + +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.prestosql.spi.type.StandardTypes.ARRAY; +import static io.prestosql.spi.type.StandardTypes.MAP; +import static io.prestosql.spi.type.StandardTypes.ROW; + +public class PrimitiveTypeMapBuilder +{ + private final ImmutableMap.Builder, Type> builder = ImmutableMap.builder(); + + private PrimitiveTypeMapBuilder() {} + + public static Map, Type> makeTypeMap(List types, List columnNames) + { + return new PrimitiveTypeMapBuilder().buildTypeMap(types, columnNames); + } + + private Map, Type> buildTypeMap(List types, List columnNames) + { + for (int i = 0; i < types.size(); i++) { + visitType(types.get(i), columnNames.get(i), ImmutableList.of()); + } + return builder.build(); + } + + private void visitType(Type type, String name, List parent) + { + if (ROW.equals(type.getTypeSignature().getBase())) { + visitRowType((RowType) type, name, parent); + } + else if (MAP.equals(type.getTypeSignature().getBase())) { + visitMapType((MapType) type, name, parent); + } + else if (ARRAY.equals(type.getTypeSignature().getBase())) { + visitArrayType((ArrayType) type, name, parent); + } + else { + builder.put(ImmutableList.builder().addAll(parent).add(name).build(), type); + } + } + + private void visitArrayType(ArrayType type, String name, List parent) + { + parent = ImmutableList.builder().addAll(parent).add(name).add("array").build(); + visitType(type.getElementType(), "array", parent); + } + + private void visitMapType(MapType type, String name, List parent) + { + parent = ImmutableList.builder().addAll(parent).add(name).add("map").build(); + visitType(type.getKeyType(), "key", parent); + visitType(type.getValueType(), "value", parent); + } + + private void visitRowType(RowType type, String name, List parent) + { + parent = ImmutableList.builder().addAll(parent).add(name).build(); + for (RowType.Field field : type.getFields()) { + checkArgument(field.getName().isPresent(), "field in struct type doesn't have name"); + visitType(field.getType(), field.getName().get(), parent); + } + } +} diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java index b0c5c73dfdb0..1d84b022c57e 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java @@ -38,6 +38,12 @@ private IcebergQueryRunner() {} public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties) throws Exception + { + return createIcebergQueryRunner(extraProperties, true); + } + + public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties, boolean createTpchTables) + throws Exception { Session session = testSessionBuilder() .setCatalog(ICEBERG_CATALOG) @@ -63,7 +69,9 @@ public static DistributedQueryRunner createIcebergQueryRunner(Map ICEBERG_COLUMN_PROVIDER = type -> new IcebergColumnHandle(0, "column", type, Optional.empty()); - - @Test - public void testSimple() - { - assertTupleDomain(TupleDomain.all(), TupleDomain.all()); - assertTupleDomain(TupleDomain.all(), TupleDomain.all()); - } - - @Test - public void testBoolean() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BOOLEAN), Domain.singleValue(BOOLEAN, true)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BOOLEAN), Domain.singleValue(BOOLEAN, false)))); - } - - @Test - public void testDate() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DATE), Domain.singleValue(DATE, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DATE), Domain.multipleValues(DATE, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testVarbinary() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARBINARY), Domain.singleValue(VARBINARY, Slices.utf8Slice("apple"))))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARBINARY), Domain.multipleValues(VARBINARY, ImmutableList.of(Slices.utf8Slice("apple"), Slices.utf8Slice("banana")))))); - } - - @Test - public void testDouble() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DOUBLE), Domain.singleValue(DOUBLE, 1.0d)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DOUBLE), Domain.multipleValues(DOUBLE, ImmutableList.of(1.0d, 2.0d, 3.0d))))); - } - - @Test - public void testBigint() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.singleValue(BIGINT, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testReal() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(REAL), Domain.singleValue(REAL, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(REAL), Domain.multipleValues(REAL, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testInteger() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.singleValue(INTEGER, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(INTEGER, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testVarchar() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARCHAR), Domain.singleValue(VARCHAR, Slices.utf8Slice("apple"))))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARCHAR), Domain.multipleValues(VARCHAR, ImmutableList.of(Slices.utf8Slice("apple"), Slices.utf8Slice("banana")))))); - } - - @Test - public void testTimestamp() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.singleValue(TIMESTAMP, 1_234_567_890_123L))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.singleValue(TIMESTAMP, 1_234_567_890_123_000L)))); - - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.multipleValues(TIMESTAMP, ImmutableList.of(1_234_567_890_123L, 1_234_567_890_124L)))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.multipleValues(TIMESTAMP, ImmutableList.of(1_234_567_890_123_000L, 1_234_567_890_124_000L))))); - } - - @Test - public void testTime() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123L))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123_000L)))); - - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.multipleValues(TIME, ImmutableList.of(1_234_567_890_123L, 1_234_567_890_124L)))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.multipleValues(TIME, ImmutableList.of(1_234_567_890_123_000L, 1_234_567_890_124_000L))))); - } - - @Test - public void testTimestampWithTimezone() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, 1_234_567_890_123L))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, MILLISECONDS.toMicros(unpackMillisUtc(1_234_567_890_123L)))))); - - List list = ImmutableList.of(1_234_567_890_123L, 1_234_567_890_124L); - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.multipleValues(TIMESTAMP_WITH_TIME_ZONE, list))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.multipleValues(TIMESTAMP_WITH_TIME_ZONE, list.stream().map(value -> MILLISECONDS.toMicros(unpackMillisUtc(value))).collect(toImmutableList()))))); - } - - @Test - public void testMultipleColumnsTupleDomain() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of( - ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, 1_234_567_890_123L), - ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123L), - ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L)))), - TupleDomain.withColumnDomains( - ImmutableMap.of( - ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, MILLISECONDS.toMicros(unpackMillisUtc(1_234_567_890_123L))), - ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123_000L), - ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L))))); - } - - private void assertTupleDomainUnchanged(TupleDomain domain) - { - assertTupleDomain(domain, domain); - } - - private void assertTupleDomain(TupleDomain actual, TupleDomain expected) - { - assertEquals(convertTupleDomainTypes(actual), expected); - } -} diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java index 90d6d3b2d56f..c8cea45f2dd9 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java @@ -102,20 +102,32 @@ public void testShowCreateTable() @Test public void testDecimal() { - for (int precision = 1; precision <= 38; precision++) { - testDecimalWithPrecisionAndScale(precision, precision - 1); - } - - for (int scale = 1; scale < 37; scale++) { - testDecimalWithPrecisionAndScale(38, scale); - } + testWithAllFileFormats((session, format) -> testDecimalForFormat(session, format)); + } - for (int scale = 1; scale < 17; scale++) { - testDecimalWithPrecisionAndScale(18, scale); - } + private void testDecimalForFormat(Session session, FileFormat format) + { + testDecimalWithPrecisionAndScale(session, format, 1, 0); + testDecimalWithPrecisionAndScale(session, format, 8, 6); + testDecimalWithPrecisionAndScale(session, format, 9, 8); + testDecimalWithPrecisionAndScale(session, format, 10, 8); + + testDecimalWithPrecisionAndScale(session, format, 18, 1); + testDecimalWithPrecisionAndScale(session, format, 18, 8); + testDecimalWithPrecisionAndScale(session, format, 18, 17); + + testDecimalWithPrecisionAndScale(session, format, 17, 16); + testDecimalWithPrecisionAndScale(session, format, 18, 17); + testDecimalWithPrecisionAndScale(session, format, 24, 10); + testDecimalWithPrecisionAndScale(session, format, 30, 10); + testDecimalWithPrecisionAndScale(session, format, 37, 26); + testDecimalWithPrecisionAndScale(session, format, 38, 37); + + testDecimalWithPrecisionAndScale(session, format, 38, 17); + testDecimalWithPrecisionAndScale(session, format, 38, 37); } - private void testDecimalWithPrecisionAndScale(int precision, int scale) + private void testDecimalWithPrecisionAndScale(Session session, FileFormat format, int precision, int scale) { checkArgument(precision >= 1 && precision <= 38, "Decimal precision (%s) must be between 1 and 38 inclusive", precision); checkArgument(scale < precision && scale >= 0, "Decimal scale (%s) must be less than the precision (%s) and non-negative", scale, precision); @@ -126,27 +138,66 @@ private void testDecimalWithPrecisionAndScale(int precision, int scale) String afterTheDecimalPoint = "09876543210987654321098765432109876543".substring(0, scale); String decimalValue = format("%s.%s", beforeTheDecimalPoint, afterTheDecimalPoint); - assertUpdate(format("CREATE TABLE %s (x %s)", tableName, decimalType)); - assertUpdate(format("INSERT INTO %s (x) VALUES (CAST('%s' AS %s))", tableName, decimalValue, decimalType), 1); - assertQuery(format("SELECT * FROM %s", tableName), format("SELECT CAST('%s' AS %s)", decimalValue, decimalType)); - dropTable(getSession(), tableName); + assertUpdate(session, format("CREATE TABLE %s (x %s) WITH (format = '%s')", tableName, decimalType, format.name())); + assertUpdate(session, format("INSERT INTO %s (x) VALUES (CAST('%s' AS %s))", tableName, decimalValue, decimalType), 1); + assertQuery(session, format("SELECT * FROM %s", tableName), format("SELECT CAST('%s' AS %s)", decimalValue, decimalType)); + dropTable(session, tableName); + } + + @Test + public void testParquetPartitionByTimestamp() + { + assertUpdate("CREATE TABLE test_parquet_partitioned_by_timestamp (_timestamp timestamp) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['_timestamp'])"); + testSelectOrPartitionedByTimestamp("test_parquet_partitioned_by_timestamp"); + } + + @Test + public void testParquetSelectByTimestamp() + { + assertUpdate("CREATE TABLE test_parquet_select_by_timestamp (_timestamp timestamp) WITH (format = 'PARQUET')"); + testSelectOrPartitionedByTimestamp("test_parquet_select_by_timestamp"); } @Test - public void testTimestamp() + public void testOrcPartitionByTimestamp() { - assertUpdate("CREATE TABLE test_timestamp (x timestamp)"); - assertUpdate("INSERT INTO test_timestamp VALUES (timestamp '2017-05-01 10:12:34')", 1); - assertQuery("SELECT * FROM test_timestamp", "SELECT CAST('2017-05-01 10:12:34' AS TIMESTAMP)"); - dropTable(getSession(), "test_timestamp"); + assertUpdate("CREATE TABLE test_orc_partitioned_by_timestamp (_timestamp timestamp) " + + "WITH (format = 'ORC', partitioning = ARRAY['_timestamp'])"); + testSelectOrPartitionedByTimestamp("test_orc_partitioned_by_timestamp"); + } + + @Test + public void testOrcSelectByTimestamp() + { + assertUpdate("CREATE TABLE test_orc_select_by_timestamp (_timestamp timestamp) " + + "WITH (format = 'ORC')"); + testSelectOrPartitionedByTimestamp("test_orc_select_by_timestamp"); + } + + private void testSelectOrPartitionedByTimestamp(String tableName) + { + String select1 = "SELECT CAST('2017-05-01 10:12:34' AS TIMESTAMP) _timestamp"; + assertUpdate(format("INSERT INTO %s ", tableName) + select1, 1); + String select2 = "SELECT CAST('2017-10-01 10:12:34' AS TIMESTAMP) _timestamp"; + assertUpdate(format("INSERT INTO %s " + select2, tableName), 1); + String select3 = "SELECT CAST('2018-05-01 10:12:34' AS TIMESTAMP) _timestamp"; + assertUpdate(format("INSERT INTO %s " + select3, tableName), 1); + assertQuery(format("SELECT COUNT(*) from %s", tableName), "SELECT 3"); + MaterializedResult result = computeActual("SELECT * FROM " + tableName); + assertQuery(format("SELECT * from %s WHERE _timestamp = CAST('2017-05-01 10:12:34' AS TIMESTAMP)", tableName), select1); + assertQuery(format("SELECT * from %s WHERE _timestamp < CAST('2017-06-01 10:12:34' AS TIMESTAMP)", tableName), select1); + assertQuery(format("SELECT * from %s WHERE _timestamp = CAST('2017-10-01 10:12:34' AS TIMESTAMP)", tableName), select2); + assertQuery(format("SELECT * from %s WHERE _timestamp > CAST('2017-06-01 10:12:34' AS TIMESTAMP) AND _timestamp < CAST('2018-05-01 10:12:34' AS TIMESTAMP)", tableName), select2); + assertQuery(format("SELECT * from %s WHERE _timestamp = CAST('2018-05-01 10:12:34' AS TIMESTAMP)", tableName), select3); + assertQuery(format("SELECT * from %s WHERE _timestamp > CAST('2018-01-01 10:12:34' AS TIMESTAMP)", tableName), select3); + dropTable(getSession(), tableName); } @Test public void testCreatePartitionedTable() { testWithAllFileFormats(this::testCreatePartitionedTable); - testWithAllFileFormats(this::testCreatePartitionedTableWithNestedTypes); - testWithAllFileFormats(this::testPartitionedTableWithNullValues); } private void testCreatePartitionedTable(Session session, FileFormat fileFormat) @@ -159,10 +210,9 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) ", _real REAL" + ", _double DOUBLE" + ", _boolean BOOLEAN" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", _decimal_short DECIMAL(3,2)" + - ", _decimal_long DECIMAL(30,10)" + - ", _timestamp TIMESTAMP") + + ", _decimal_short DECIMAL(3,2)" + + ", _decimal_long DECIMAL(30,10)" + + ", _timestamp TIMESTAMP" + ", _date DATE" + ") " + "WITH (" + @@ -174,10 +224,9 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) " '_boolean'," + " '_real'," + " '_double'," + - returnSqlIfFormatSupportsDecimalsAndTimestamps(FileFormat.PARQUET, "" + - " '_decimal_short', " + - " '_decimal_long'," + - " '_timestamp',") + + " '_decimal_short', " + + " '_decimal_long'," + + " '_timestamp'," + " '_date']" + ")"; @@ -194,10 +243,9 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) ", CAST('123.45' AS REAL) _real" + ", CAST('3.14' AS DOUBLE) _double" + ", true _boolean" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", CAST('3.14' AS DECIMAL(3,2)) _decimal_short" + - ", CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) _decimal_long" + - ", CAST('2017-05-01 10:12:34' AS TIMESTAMP) _timestamp") + + ", CAST('3.14' AS DECIMAL(3,2)) _decimal_short" + + ", CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) _decimal_long" + + ", CAST('2017-05-01 10:12:34' AS TIMESTAMP) _timestamp" + ", CAST('2017-05-01' AS DATE) _date"; assertUpdate(session, "INSERT INTO test_partitioned_table " + select, 1); @@ -208,16 +256,21 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) " AND 456 = _integer" + " AND CAST(123 AS BIGINT) = _bigint" + " AND true = _boolean" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - " AND CAST('3.14' AS DECIMAL(3,2)) = _decimal_short" + - " AND CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) = _decimal_long" + - " AND CAST('2017-05-01 10:12:34' AS TIMESTAMP) = _timestamp") + + " AND CAST('3.14' AS DECIMAL(3,2)) = _decimal_short" + + " AND CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) = _decimal_long" + + " AND CAST('2017-05-01 10:12:34' AS TIMESTAMP) = _timestamp" + " AND CAST('2017-05-01' AS DATE) = _date", select); dropTable(session, "test_partitioned_table"); } + @Test + public void testCreatePartitionedTableWithNestedTypes() + { + testWithAllFileFormats(this::testCreatePartitionedTableWithNestedTypes); + } + private void testCreatePartitionedTableWithNestedTypes(Session session, FileFormat fileFormat) { @Language("SQL") String createTable = "" + @@ -236,20 +289,25 @@ private void testCreatePartitionedTableWithNestedTypes(Session session, FileForm dropTable(session, "test_partitioned_table_nested_type"); } + @Test + public void testPartitionedTableWithNullValues() + { + testWithAllFileFormats(this::testPartitionedTableWithNullValues); + } + private void testPartitionedTableWithNullValues(Session session, FileFormat fileFormat) { @Language("SQL") String createTable = "" + - "CREATE TABLE test_partitioned_table (" + + "CREATE TABLE test_partitioned_table_with_null_values (" + " _string VARCHAR" + ", _bigint BIGINT" + ", _integer INTEGER" + ", _real REAL" + ", _double DOUBLE" + ", _boolean BOOLEAN" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", _decimal_short DECIMAL(3,2)" + - ", _decimal_long DECIMAL(30,10)" + - ", _timestamp TIMESTAMP") + + ", _decimal_short DECIMAL(3,2)" + + ", _decimal_long DECIMAL(30,10)" + + ", _timestamp TIMESTAMP" + ", _date DATE" + ") " + "WITH (" + @@ -261,16 +319,15 @@ private void testPartitionedTableWithNullValues(Session session, FileFormat file " '_boolean'," + " '_real'," + " '_double'," + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - " '_decimal_short', " + - " '_decimal_long'," + - " '_timestamp',") + + " '_decimal_short', " + + " '_decimal_long'," + + " '_timestamp'," + " '_date']" + ")"; assertUpdate(session, createTable); - MaterializedResult result = computeActual("SELECT * from test_partitioned_table"); + MaterializedResult result = computeActual("SELECT * from test_partitioned_table_with_null_values"); assertEquals(result.getRowCount(), 0); @Language("SQL") String select = "" + @@ -281,15 +338,14 @@ private void testPartitionedTableWithNullValues(Session session, FileFormat file ", null _real" + ", null _double" + ", null _boolean" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", null _decimal_short" + - ", null _decimal_long" + - ", null _timestamp") + + ", null _decimal_short" + + ", null _decimal_long" + + ", null _timestamp" + ", null _date"; - assertUpdate(session, "INSERT INTO test_partitioned_table " + select, 1); - assertQuery(session, "SELECT * from test_partitioned_table", select); - dropTable(session, "test_partitioned_table"); + assertUpdate(session, "INSERT INTO test_partitioned_table_with_null_values " + select, 1); + assertQuery(session, "SELECT * from test_partitioned_table_with_null_values", select); + dropTable(session, "test_partitioned_table_with_null_values"); } @Test @@ -413,13 +469,6 @@ private long getLatestSnapshotId() .getOnlyValue(); } - @Test - public void testSchemaEvolution() - { - // Schema evolution should be id based - testWithAllFileFormats(this::testSchemaEvolution); - } - @Test public void testInsertIntoNotNullColumn() { @@ -436,6 +485,19 @@ public void testInsertIntoNotNullColumn() assertUpdate("DROP TABLE IF EXISTS test_commuted_not_null_table"); } + @Test + public void testSchemaEvolution() + { + // Schema evolution should be id based + testWithAllFileFormats(this::testSchemaEvolution); + } + + @Test + public void testSchemaEvolutionParquet() + { + testSchemaEvolution(getSession(), FileFormat.PARQUET); + } + private void testSchemaEvolution(Session session, FileFormat fileFormat) { assertUpdate(session, "CREATE TABLE test_schema_evolution_drop_end (col0 INTEGER, col1 INTEGER, col2 INTEGER) WITH (format = '" + fileFormat + "')"); @@ -444,6 +506,7 @@ private void testSchemaEvolution(Session session, FileFormat fileFormat) assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end DROP COLUMN col2"); assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1)"); assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end ADD COLUMN col2 INTEGER"); + assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, NULL)"); assertUpdate(session, "INSERT INTO test_schema_evolution_drop_end VALUES (3, 4, 5)", 1); assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, NULL), (3, 4, 5)"); dropTable(session, "test_schema_evolution_drop_end"); @@ -536,12 +599,6 @@ private void testWithAllFileFormats(BiConsumer test) test.accept(getSession(), FileFormat.ORC); } - // TODO: Remove and eliminate callers once we correctly handle Parquet decimals and timestamps - private String returnSqlIfFormatSupportsDecimalsAndTimestamps(FileFormat fileFormat, String sql) - { - return fileFormat == FileFormat.ORC ? sql : ""; - } - private void dropTable(Session session, String table) { assertUpdate(session, "DROP TABLE " + table); diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java index c0e301f22a66..5b6d004271bf 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java @@ -149,6 +149,8 @@ private static ConvertedType getConvertedType(OriginalType type) return ConvertedType.TIME_MILLIS; case TIMESTAMP_MILLIS: return ConvertedType.TIMESTAMP_MILLIS; + case TIMESTAMP_MICROS: + return ConvertedType.TIMESTAMP_MICROS; case INTERVAL: return ConvertedType.INTERVAL; case INT_8: diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java index d26b5b379098..5de21e64c65e 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.List; +import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -60,14 +61,12 @@ public class ParquetWriter private final List columnWriters; private final OutputStreamSliceOutput outputStream; - private final List types; private final ParquetWriterOptions writerOption; - private final List names; private final MessageType messageType; private final int chunkMaxLogicalBytes; - private ImmutableList.Builder rowGroupBuilder = ImmutableList.builder(); + private final ImmutableList.Builder rowGroupBuilder = ImmutableList.builder(); private int rows; private long bufferedBytes; @@ -78,28 +77,23 @@ public class ParquetWriter public ParquetWriter( OutputStream outputStream, - List columnNames, - List types, + MessageType messageType, + Map, Type> primitiveTypes, ParquetWriterOptions writerOption, CompressionCodecName compressionCodecName) { this.outputStream = new OutputStreamSliceOutput(requireNonNull(outputStream, "outputstream is null")); - this.names = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); - this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); + this.messageType = requireNonNull(messageType, "messageType is null"); + requireNonNull(primitiveTypes, "primitiveTypes is null"); this.writerOption = requireNonNull(writerOption, "writerOption is null"); requireNonNull(compressionCodecName, "compressionCodecName is null"); - checkArgument(types.size() == columnNames.size(), "type size %s is not equal to name size %s", types.size(), columnNames.size()); - - ParquetSchemaConverter parquetSchemaConverter = new ParquetSchemaConverter(types, columnNames); - this.messageType = parquetSchemaConverter.getMessageType(); - ParquetProperties parquetProperties = ParquetProperties.builder() .withWriterVersion(PARQUET_2_0) .withPageSize(writerOption.getMaxPageSize()) .build(); - this.columnWriters = ParquetWriters.getColumnWriters(messageType, parquetSchemaConverter.getPrimitiveTypes(), parquetProperties, compressionCodecName); + this.columnWriters = ParquetWriters.getColumnWriters(messageType, primitiveTypes, parquetProperties, compressionCodecName); this.chunkMaxLogicalBytes = max(1, CHUNK_MAX_BYTES / 2); } diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java index 3bea3880fbd8..02e86a40738b 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java @@ -23,6 +23,7 @@ import io.prestosql.parquet.writer.valuewriter.IntegerValueWriter; import io.prestosql.parquet.writer.valuewriter.PrimitiveValueWriter; import io.prestosql.parquet.writer.valuewriter.RealValueWriter; +import io.prestosql.parquet.writer.valuewriter.TimestampValueWriter; import io.prestosql.spi.PrestoException; import io.prestosql.spi.type.CharType; import io.prestosql.spi.type.DecimalType; @@ -159,14 +160,17 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, io if (INTEGER.equals(type) || SMALLINT.equals(type) || TINYINT.equals(type)) { return new IntegerValueWriter(valuesWriter, type, parquetType); } + if (BIGINT.equals(type)) { + return new BigintValueWriter(valuesWriter, type, parquetType); + } if (type instanceof DecimalType) { return new DecimalValueWriter(valuesWriter, type, parquetType); } if (DATE.equals(type)) { return new DateValueWriter(valuesWriter, parquetType); } - if (BIGINT.equals(type) || TIMESTAMP.equals(type)) { - return new BigintValueWriter(valuesWriter, type, parquetType); + if (TIMESTAMP.equals(type)) { + return new TimestampValueWriter(valuesWriter, type, parquetType); } if (DOUBLE.equals(type)) { return new DoubleValueWriter(valuesWriter, parquetType); diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java index 72261ceb752e..a116808adb44 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java @@ -72,9 +72,9 @@ else if (decimalType.isShort()) { } } - private static byte[] paddingBigInteger(BigInteger bigInteger) + private byte[] paddingBigInteger(BigInteger bigInteger) { - byte[] result = new byte[16]; + byte[] result = new byte[getTypeLength()]; if (bigInteger.signum() < 0) { Arrays.fill(result, (byte) 0xFF); } diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java index b44296c2f7bb..92af649d46b1 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java @@ -47,6 +47,11 @@ public Statistics getStatistics() return statistics; } + protected int getTypeLength() + { + return parquetType.getTypeLength(); + } + @Override public long getBufferedSize() { diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/TimestampValueWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/TimestampValueWriter.java new file mode 100644 index 000000000000..a78fcd9f0099 --- /dev/null +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/TimestampValueWriter.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.parquet.writer.valuewriter; + +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class TimestampValueWriter + extends PrimitiveValueWriter +{ + private final Type type; + private final boolean writeMicroseconds; + + public TimestampValueWriter(ValuesWriter valuesWriter, Type type, PrimitiveType parquetType) + { + super(parquetType, valuesWriter); + this.type = requireNonNull(type, "type is null"); + this.writeMicroseconds = parquetType.isPrimitive() && parquetType.getOriginalType() == OriginalType.TIMESTAMP_MICROS; + } + + @Override + public void write(Block block) + { + for (int i = 0; i < block.getPositionCount(); i++) { + if (!block.isNull(i)) { + long value = type.getLong(block, i); + long scaledValue = writeMicroseconds ? MILLISECONDS.toMicros(value) : value; + getValueWriter().writeLong(scaledValue); + getStatistics().updateStats(scaledValue); + } + } + } +}