From 6398acc69a9bc8ed880c39149ee1cb4f070a4e21 Mon Sep 17 00:00:00 2001 From: David Stryker Date: Wed, 8 Jul 2020 14:45:08 -0700 Subject: [PATCH] Change Iceberg to use ParquetFileWriter This commit changes the Iceberg connector to use the new ParquetFileWriter, and fixes bugs in handling of decimal and timestamp columns for both Orc and Parquet. This commit removes DomainConverter and the test TestDomainConverter, superseded by fixes and conversions in ExpressionConverter, MessageTypeConverter and new class TimestampValueWriter. This commit adds new TestIcebergSmoke tests for decimal and timestamp, and fixes the existing tests. With this commit, all TestIcebergSmoke tests pass. --- .../hive/parquet/ParquetFileWriter.java | 9 +- .../parquet/ParquetFileWriterFactory.java | 6 +- .../plugin/hive/benchmark/FileFormat.java | 7 +- .../plugin/hive/parquet/ParquetTester.java | 6 +- .../plugin/iceberg/DomainConverter.java | 93 ------- .../plugin/iceberg/ExpressionConverter.java | 6 + .../plugin/iceberg/IcebergFileWriter.java | 4 +- .../iceberg/IcebergFileWriterFactory.java | 75 +++--- .../plugin/iceberg/IcebergMetadata.java | 22 +- .../plugin/iceberg/IcebergOrcFileWriter.java | 4 +- .../plugin/iceberg/IcebergPageSink.java | 23 +- .../iceberg/IcebergParquetFileWriter.java | 71 ++++++ .../iceberg/IcebergRecordFileWriter.java | 51 ---- .../iceberg/IcebergSessionProperties.java | 10 + .../plugin/iceberg/PartitionData.java | 4 +- .../iceberg/util/PrimitiveTypeMapBuilder.java | 87 +++++++ .../plugin/iceberg/IcebergQueryRunner.java | 10 +- .../plugin/iceberg/TestDomainConverter.java | 226 ------------------ .../plugin/iceberg/TestIcebergSmoke.java | 193 +++++++++------ .../parquet/writer/MessageTypeConverter.java | 2 + .../parquet/writer/ParquetWriter.java | 20 +- .../parquet/writer/ParquetWriters.java | 8 +- .../valuewriter/DecimalValueWriter.java | 4 +- .../valuewriter/PrimitiveValueWriter.java | 5 + .../valuewriter/TimestampValueWriter.java | 50 ++++ 25 files changed, 450 insertions(+), 546 deletions(-) delete mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java create mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergParquetFileWriter.java delete mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergRecordFileWriter.java create mode 100644 presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PrimitiveTypeMapBuilder.java delete mode 100644 presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestDomainConverter.java create mode 100644 presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/TimestampValueWriter.java diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java index 393ce128fb46..43250b6947af 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriter.java @@ -24,12 +24,14 @@ import io.prestosql.spi.block.RunLengthEncodedBlock; import io.prestosql.spi.type.Type; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; import org.openjdk.jol.info.ClassLayout; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import static com.google.common.base.MoreObjects.toStringHelper; @@ -50,8 +52,9 @@ public class ParquetFileWriter public ParquetFileWriter( OutputStream outputStream, Callable rollbackAction, - List columnNames, List fileColumnTypes, + MessageType messageType, + Map, Type> primitiveTypes, ParquetWriterOptions parquetWriterOptions, int[] fileInputColumnIndexes, CompressionCodecName compressionCodecName) @@ -60,8 +63,8 @@ public ParquetFileWriter( this.parquetWriter = new ParquetWriter( outputStream, - columnNames, - fileColumnTypes, + messageType, + primitiveTypes, parquetWriterOptions, compressionCodecName); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java index eae73037c0ec..72edef7b00d9 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetFileWriterFactory.java @@ -13,6 +13,7 @@ */ package io.prestosql.plugin.hive.parquet; +import io.prestosql.parquet.writer.ParquetSchemaConverter; import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.FileWriter; import io.prestosql.plugin.hive.HdfsEnvironment; @@ -122,11 +123,14 @@ public Optional createFileWriter( return null; }; + ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(fileColumnTypes, fileColumnNames); + return Optional.of(new ParquetFileWriter( fileSystem.create(path), rollbackAction, - fileColumnNames, fileColumnTypes, + schemaConverter.getMessageType(), + schemaConverter.getPrimitiveTypes(), parquetWriterOptions, fileInputColumnIndexes, compressionCodecName)); diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java index d72c882dd797..c00378c8996d 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java @@ -21,6 +21,7 @@ import io.prestosql.orc.OrcWriterStats; import io.prestosql.orc.OutputStreamOrcDataSink; import io.prestosql.orc.metadata.OrcType; +import io.prestosql.parquet.writer.ParquetSchemaConverter; import io.prestosql.parquet.writer.ParquetWriter; import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.FileFormatDataSourceStats; @@ -499,10 +500,12 @@ private static class PrestoParquetFormatWriter public PrestoParquetFormatWriter(File targetFile, List columnNames, List types, HiveCompressionCodec compressionCodec) throws IOException { + ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(types, columnNames); + writer = new ParquetWriter( new FileOutputStream(targetFile), - columnNames, - types, + schemaConverter.getMessageType(), + schemaConverter.getPrimitiveTypes(), ParquetWriterOptions.builder().build(), compressionCodec.getParquetCompressionCodec()); } diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java index 84e2f142d7b4..056407880aad 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/parquet/ParquetTester.java @@ -22,6 +22,7 @@ import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.airlift.units.DataSize; +import io.prestosql.parquet.writer.ParquetSchemaConverter; import io.prestosql.parquet.writer.ParquetWriter; import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.HiveConfig; @@ -712,10 +713,11 @@ private static void writeParquetColumnPresto(File outputFile, List types, throws Exception { checkArgument(types.size() == columnNames.size() && types.size() == values.length); + ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(types, columnNames); ParquetWriter writer = new ParquetWriter( new FileOutputStream(outputFile), - columnNames, - types, + schemaConverter.getMessageType(), + schemaConverter.getPrimitiveTypes(), ParquetWriterOptions.builder() .setMaxPageSize(DataSize.ofBytes(100)) .setMaxBlockSize(DataSize.ofBytes(100000)) diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java deleted file mode 100644 index c6a0ab117c2c..000000000000 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/DomainConverter.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prestosql.plugin.iceberg; - -import com.google.common.base.VerifyException; -import io.prestosql.spi.block.Block; -import io.prestosql.spi.predicate.Domain; -import io.prestosql.spi.predicate.EquatableValueSet; -import io.prestosql.spi.predicate.Marker; -import io.prestosql.spi.predicate.Range; -import io.prestosql.spi.predicate.SortedRangeSet; -import io.prestosql.spi.predicate.TupleDomain; -import io.prestosql.spi.predicate.ValueSet; -import io.prestosql.spi.type.TimeType; -import io.prestosql.spi.type.TimeWithTimeZoneType; -import io.prestosql.spi.type.TimestampType; -import io.prestosql.spi.type.TimestampWithTimeZoneType; -import io.prestosql.spi.type.Type; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import static io.prestosql.spi.predicate.Utils.nativeValueToBlock; -import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -public final class DomainConverter -{ - private DomainConverter() {} - - public static TupleDomain convertTupleDomainTypes(TupleDomain tupleDomain) - { - return tupleDomain.transformDomains((column, domain) -> translateDomain(domain)); - } - - private static Domain translateDomain(Domain domain) - { - ValueSet valueSet = domain.getValues(); - Type type = domain.getType(); - if (type instanceof TimestampType || type instanceof TimestampWithTimeZoneType || type instanceof TimeType || type instanceof TimeWithTimeZoneType) { - if (valueSet instanceof EquatableValueSet) { - throw new VerifyException("Did not expect an EquatableValueSet but got " + valueSet.getClass().getSimpleName()); - } - - if (valueSet instanceof SortedRangeSet) { - List ranges = new ArrayList<>(); - for (Range range : valueSet.getRanges().getOrderedRanges()) { - Marker low = range.getLow(); - if (low.getValueBlock().isPresent()) { - Block value = nativeValueToBlock(type, convertToMicros(type, (long) range.getLow().getValue())); - low = new Marker(range.getType(), Optional.of(value), range.getLow().getBound()); - } - - Marker high = range.getHigh(); - if (high.getValueBlock().isPresent()) { - Block value = nativeValueToBlock(type, convertToMicros(type, (long) range.getHigh().getValue())); - high = new Marker(range.getType(), Optional.of(value), range.getHigh().getBound()); - } - - ranges.add(new Range(low, high)); - } - valueSet = SortedRangeSet.copyOf(valueSet.getType(), ranges); - } - return Domain.create(valueSet, domain.isNullAllowed()); - } - return domain; - } - - private static long convertToMicros(Type type, long value) - { - if (type instanceof TimestampWithTimeZoneType || type instanceof TimeWithTimeZoneType) { - return MILLISECONDS.toMicros(unpackMillisUtc(value)); - } - - if (type instanceof TimestampType || type instanceof TimeType) { - return MILLISECONDS.toMicros(value); - } - - throw new IllegalArgumentException(type + " is unsupported"); - } -} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java index f4468af59505..2980ffdf09d1 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/ExpressionConverter.java @@ -29,6 +29,7 @@ import io.prestosql.spi.type.MapType; import io.prestosql.spi.type.RealType; import io.prestosql.spi.type.RowType; +import io.prestosql.spi.type.TimestampType; import io.prestosql.spi.type.Type; import io.prestosql.spi.type.VarbinaryType; import io.prestosql.spi.type.VarcharType; @@ -38,6 +39,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; @@ -198,6 +200,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker) return new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), decimalType.getScale()); } + if (type instanceof TimestampType) { + return TimeUnit.MILLISECONDS.toMicros((long) marker.getValue()); + } + return marker.getValue(); } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java index 97c4280506b9..7d588e36c73c 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriter.java @@ -16,10 +16,8 @@ import io.prestosql.plugin.hive.FileWriter; import org.apache.iceberg.Metrics; -import java.util.Optional; - public interface IcebergFileWriter extends FileWriter { - Optional getMetrics(); + Metrics getMetrics(); } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java index eb3c82e3b620..fc00b4f3b8e0 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java @@ -21,9 +21,9 @@ import io.prestosql.orc.OrcWriterOptions; import io.prestosql.orc.OrcWriterStats; import io.prestosql.orc.OutputStreamOrcDataSink; +import io.prestosql.parquet.writer.ParquetWriterOptions; import io.prestosql.plugin.hive.FileFormatDataSourceStats; import io.prestosql.plugin.hive.HdfsEnvironment; -import io.prestosql.plugin.hive.HiveStorageFormat; import io.prestosql.plugin.hive.NodeVersion; import io.prestosql.plugin.hive.orc.HdfsOrcDataSource; import io.prestosql.plugin.hive.orc.OrcWriterConfig; @@ -33,13 +33,10 @@ import io.prestosql.spi.type.TypeManager; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.mapred.JobConf; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; -import org.apache.parquet.hadoop.ParquetOutputFormat; -import org.weakref.jmx.Flatten; import org.weakref.jmx.Managed; import javax.inject.Inject; @@ -47,7 +44,6 @@ import java.io.IOException; import java.util.List; import java.util.Optional; -import java.util.Properties; import java.util.concurrent.Callable; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -56,8 +52,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME; import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_VERSION_NAME; -import static io.prestosql.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; -import static io.prestosql.plugin.hive.util.ParquetRecordWriterUtil.setParquetSchema; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getCompressionCodec; @@ -67,13 +61,14 @@ import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxStripeSize; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMinStripeSize; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterValidateMode; +import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetWriterBlockSize; +import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize; import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate; -import static io.prestosql.plugin.iceberg.TypeConverter.toHiveType; import static io.prestosql.plugin.iceberg.TypeConverter.toOrcType; import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType; +import static io.prestosql.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap; import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert; import static org.joda.time.DateTimeZone.UTC; @@ -103,7 +98,6 @@ public IcebergFileWriterFactory( } @Managed - @Flatten public OrcWriterStats getOrcWriterStats() { return orcWriterStats; @@ -112,14 +106,13 @@ public OrcWriterStats getOrcWriterStats() public IcebergFileWriter createFileWriter( Path outputPath, Schema icebergSchema, - List columns, JobConf jobConf, ConnectorSession session, FileFormat fileFormat) { switch (fileFormat) { case PARQUET: - return createParquetWriter(outputPath, icebergSchema, columns, jobConf, session); + return createParquetWriter(outputPath, icebergSchema, jobConf, session); case ORC: return createOrcWriter(outputPath, icebergSchema, jobConf, session); } @@ -129,32 +122,44 @@ public IcebergFileWriter createFileWriter( private IcebergFileWriter createParquetWriter( Path outputPath, Schema icebergSchema, - List columns, JobConf jobConf, ConnectorSession session) { - Properties properties = new Properties(); - properties.setProperty(IOConstants.COLUMNS, columns.stream() - .map(IcebergColumnHandle::getName) - .collect(joining(","))); - properties.setProperty(IOConstants.COLUMNS_TYPES, columns.stream() - .map(column -> toHiveType(column.getType()).getHiveTypeName().toString()) - .collect(joining(":"))); - - setParquetSchema(jobConf, convert(icebergSchema, "table")); - jobConf.set(ParquetOutputFormat.COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().name()); - - return new IcebergRecordFileWriter( - outputPath, - columns.stream() - .map(IcebergColumnHandle::getName) - .collect(toImmutableList()), - fromHiveStorageFormat(HiveStorageFormat.PARQUET), - properties, - HiveStorageFormat.PARQUET.getEstimatedWriterSystemMemoryUsage(), - jobConf, - typeManager, - session); + List fileColumnNames = icebergSchema.columns().stream() + .map(Types.NestedField::name) + .collect(toImmutableList()); + List fileColumnTypes = icebergSchema.columns().stream() + .map(column -> toPrestoType(column.type(), typeManager)) + .collect(toImmutableList()); + + try { + FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), outputPath, jobConf); + + Callable rollbackAction = () -> { + fileSystem.delete(outputPath, false); + return null; + }; + + ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder() + .setMaxPageSize(getParquetWriterPageSize(session)) + .setMaxPageSize(getParquetWriterBlockSize(session)) + .build(); + + return new IcebergParquetFileWriter( + fileSystem.create(outputPath), + rollbackAction, + fileColumnTypes, + convert(icebergSchema, "table"), + makeTypeMap(fileColumnTypes, fileColumnNames), + parquetWriterOptions, + IntStream.range(0, fileColumnNames.size()).toArray(), + getCompressionCodec(session).getParquetCompressionCodec(), + outputPath, + jobConf); + } + catch (IOException e) { + throw new PrestoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e); + } } private IcebergFileWriter createOrcWriter( diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java index d2bcf1267ce9..c51a218c3771 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java @@ -52,8 +52,6 @@ import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.security.PrestoPrincipal; import io.prestosql.spi.statistics.ComputedStatistics; -import io.prestosql.spi.type.DecimalType; -import io.prestosql.spi.type.TimestampType; import io.prestosql.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -87,7 +85,6 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.prestosql.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.prestosql.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; -import static io.prestosql.plugin.iceberg.DomainConverter.convertTupleDomainTypes; import static io.prestosql.plugin.iceberg.ExpressionConverter.toIcebergExpression; import static io.prestosql.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation; import static io.prestosql.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; @@ -115,7 +112,6 @@ import static java.util.stream.Collectors.toList; import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; -import static org.apache.iceberg.FileFormat.ORC; import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.Transactions.createTableTransaction; @@ -405,17 +401,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto { IcebergTableHandle table = (IcebergTableHandle) tableHandle; org.apache.iceberg.Table icebergTable = getIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName()); - boolean orcFormat = ORC == getFileFormat(icebergTable); - - for (NestedField column : icebergTable.schema().columns()) { - io.prestosql.spi.type.Type type = toPrestoType(column.type(), typeManager); - if (type instanceof DecimalType && !orcFormat) { - throw new PrestoException(NOT_SUPPORTED, "Writing to columns of type decimal not yet supported"); - } - if (type instanceof TimestampType && !orcFormat) { - throw new PrestoException(NOT_SUPPORTED, "Writing to columns of type timestamp not yet supported for PARQUET format"); - } - } transaction = icebergTable.newTransaction(); @@ -618,10 +603,9 @@ public Optional> applyFilter(C { IcebergTableHandle table = (IcebergTableHandle) handle; // TODO: Remove TupleDomain#simplify once Iceberg supports IN expression - TupleDomain newDomain = convertTupleDomainTypes( - constraint.getSummary() - .transform(IcebergColumnHandle.class::cast) - .simplify()) + TupleDomain newDomain = constraint.getSummary() + .transform(IcebergColumnHandle.class::cast) + .simplify() .intersect(table.getPredicate()); if (newDomain.equals(table.getPredicate())) { diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java index 930eb3354ac6..64b00bc8931f 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergOrcFileWriter.java @@ -84,9 +84,9 @@ public IcebergOrcFileWriter( } @Override - public Optional getMetrics() + public Metrics getMetrics() { - return Optional.of(computeMetrics(icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats())); + return computeMetrics(icebergSchema, orcColumns, orcWriter.getFileRowCount(), orcWriter.getFileStats()); } private static Metrics computeMetrics(Schema icebergSchema, ColumnMetadata orcColumns, long fileRowCount, Optional> columnStatistics) diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java index da91d67a246e..adb54e30139e 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java @@ -13,7 +13,6 @@ */ package io.prestosql.plugin.iceberg; -import com.google.common.base.VerifyException; import com.google.common.collect.ImmutableList; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; @@ -44,13 +43,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.transforms.Transform; import java.util.ArrayList; @@ -69,7 +64,6 @@ import static io.prestosql.plugin.hive.util.ConfigurationUtils.toJobConf; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS; import static io.prestosql.plugin.iceberg.PartitionTransforms.getColumnTransform; -import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED; import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc; import static io.prestosql.spi.type.Decimals.readBigDecimal; import static java.lang.Float.intBitsToFloat; @@ -93,7 +87,6 @@ public class IcebergPageSink private final IcebergFileWriterFactory fileWriterFactory; private final HdfsEnvironment hdfsEnvironment; private final JobConf jobConf; - private final List inputColumns; private final JsonCodec jsonCodec; private final ConnectorSession session; private final FileFormat fileFormat; @@ -129,7 +122,6 @@ public IcebergPageSink( this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.session = requireNonNull(session, "session is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); - this.inputColumns = ImmutableList.copyOf(inputColumns); this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec)); } @@ -169,7 +161,7 @@ public CompletableFuture> finish() CommitTaskData task = new CommitTaskData( context.getPath().toString(), - new MetricsWrapper(getMetrics(context)), + new MetricsWrapper(context.writer.getMetrics()), context.getPartitionData().map(PartitionData::toJson)); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); @@ -314,7 +306,6 @@ private WriteContext createWriter(Optional partitionPath, Optional partitionPath, Optional new VerifyException("Iceberg ORC file writers should return Iceberg metrics")); - } - throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat); - } - private static Optional getPartitionData(List columns, Page page, int position) { if (columns.isEmpty()) { diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergParquetFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergParquetFileWriter.java new file mode 100644 index 000000000000..72df75afc871 --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergParquetFileWriter.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg; + +import io.prestosql.parquet.writer.ParquetWriterOptions; +import io.prestosql.plugin.hive.parquet.ParquetFileWriter; +import io.prestosql.spi.type.Type; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; + +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +import static java.util.Objects.requireNonNull; + +public class IcebergParquetFileWriter + extends ParquetFileWriter + implements IcebergFileWriter +{ + private final Path outputPath; + private final JobConf jobConf; + + public IcebergParquetFileWriter( + OutputStream outputStream, + Callable rollbackAction, + List fileColumnTypes, + MessageType messageType, + Map, Type> primitiveTypes, + ParquetWriterOptions parquetWriterOptions, + int[] fileInputColumnIndexes, + CompressionCodecName compressionCodecName, + Path outputPath, + JobConf jobConf) + { + super(outputStream, + rollbackAction, + fileColumnTypes, + messageType, + primitiveTypes, + parquetWriterOptions, + fileInputColumnIndexes, + compressionCodecName); + this.outputPath = requireNonNull(outputPath, "outputPath is null"); + this.jobConf = requireNonNull(jobConf, "jobConf is null"); + } + + @Override + public Metrics getMetrics() + { + return ParquetUtil.fileMetrics(HadoopInputFile.fromPath(outputPath, jobConf), MetricsConfig.getDefault()); + } +} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergRecordFileWriter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergRecordFileWriter.java deleted file mode 100644 index 210cae89c032..000000000000 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergRecordFileWriter.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.prestosql.plugin.iceberg; - -import io.airlift.units.DataSize; -import io.prestosql.plugin.hive.RecordFileWriter; -import io.prestosql.plugin.hive.metastore.StorageFormat; -import io.prestosql.spi.connector.ConnectorSession; -import io.prestosql.spi.type.TypeManager; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.iceberg.Metrics; - -import java.util.List; -import java.util.Optional; -import java.util.Properties; - -public class IcebergRecordFileWriter - extends RecordFileWriter - implements IcebergFileWriter -{ - public IcebergRecordFileWriter( - Path path, - List inputColumnNames, - StorageFormat storageFormat, - Properties schema, - DataSize estimatedWriterSystemMemoryUsage, - JobConf conf, - TypeManager typeManager, - ConnectorSession session) - { - super(path, inputColumnNames, storageFormat, schema, estimatedWriterSystemMemoryUsage, conf, typeManager, session); - } - - @Override - public Optional getMetrics() - { - return Optional.empty(); - } -} diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java index d80323dedba0..0fe287b7f083 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java @@ -291,4 +291,14 @@ public static DataSize getParquetMaxReadBlockSize(ConnectorSession session) { return session.getProperty(PARQUET_MAX_READ_BLOCK_SIZE, DataSize.class); } + + public static DataSize getParquetWriterPageSize(ConnectorSession session) + { + return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class); + } + + public static DataSize getParquetWriterBlockSize(ConnectorSession session) + { + return session.getProperty(PARQUET_WRITER_PAGE_SIZE, DataSize.class); + } } diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java index 4146320712ab..3e3610eeea60 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionData.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.iceberg.StructLike; @@ -34,7 +35,8 @@ public class PartitionData { private static final String PARTITION_VALUES_FIELD = "partitionValues"; private static final JsonFactory FACTORY = new JsonFactory(); - private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY); + private static final ObjectMapper MAPPER = new ObjectMapper(FACTORY) + .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true); private final Object[] partitionValues; diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PrimitiveTypeMapBuilder.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PrimitiveTypeMapBuilder.java new file mode 100644 index 000000000000..1cd04ce7f839 --- /dev/null +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/util/PrimitiveTypeMapBuilder.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.plugin.iceberg.util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.prestosql.spi.type.ArrayType; +import io.prestosql.spi.type.MapType; +import io.prestosql.spi.type.RowType; +import io.prestosql.spi.type.Type; + +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.prestosql.spi.type.StandardTypes.ARRAY; +import static io.prestosql.spi.type.StandardTypes.MAP; +import static io.prestosql.spi.type.StandardTypes.ROW; + +public class PrimitiveTypeMapBuilder +{ + private final ImmutableMap.Builder, Type> builder = ImmutableMap.builder(); + + private PrimitiveTypeMapBuilder() {} + + public static Map, Type> makeTypeMap(List types, List columnNames) + { + return new PrimitiveTypeMapBuilder().buildTypeMap(types, columnNames); + } + + private Map, Type> buildTypeMap(List types, List columnNames) + { + for (int i = 0; i < types.size(); i++) { + visitType(types.get(i), columnNames.get(i), ImmutableList.of()); + } + return builder.build(); + } + + private void visitType(Type type, String name, List parent) + { + if (ROW.equals(type.getTypeSignature().getBase())) { + visitRowType((RowType) type, name, parent); + } + else if (MAP.equals(type.getTypeSignature().getBase())) { + visitMapType((MapType) type, name, parent); + } + else if (ARRAY.equals(type.getTypeSignature().getBase())) { + visitArrayType((ArrayType) type, name, parent); + } + else { + builder.put(ImmutableList.builder().addAll(parent).add(name).build(), type); + } + } + + private void visitArrayType(ArrayType type, String name, List parent) + { + parent = ImmutableList.builder().addAll(parent).add(name).add("array").build(); + visitType(type.getElementType(), "array", parent); + } + + private void visitMapType(MapType type, String name, List parent) + { + parent = ImmutableList.builder().addAll(parent).add(name).add("map").build(); + visitType(type.getKeyType(), "key", parent); + visitType(type.getValueType(), "value", parent); + } + + private void visitRowType(RowType type, String name, List parent) + { + parent = ImmutableList.builder().addAll(parent).add(name).build(); + for (RowType.Field field : type.getFields()) { + checkArgument(field.getName().isPresent(), "field in struct type doesn't have name"); + visitType(field.getType(), field.getName().get(), parent); + } + } +} diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java index b0c5c73dfdb0..1d84b022c57e 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/IcebergQueryRunner.java @@ -38,6 +38,12 @@ private IcebergQueryRunner() {} public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties) throws Exception + { + return createIcebergQueryRunner(extraProperties, true); + } + + public static DistributedQueryRunner createIcebergQueryRunner(Map extraProperties, boolean createTpchTables) + throws Exception { Session session = testSessionBuilder() .setCatalog(ICEBERG_CATALOG) @@ -63,7 +69,9 @@ public static DistributedQueryRunner createIcebergQueryRunner(Map ICEBERG_COLUMN_PROVIDER = type -> new IcebergColumnHandle(0, "column", type, Optional.empty()); - - @Test - public void testSimple() - { - assertTupleDomain(TupleDomain.all(), TupleDomain.all()); - assertTupleDomain(TupleDomain.all(), TupleDomain.all()); - } - - @Test - public void testBoolean() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BOOLEAN), Domain.singleValue(BOOLEAN, true)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BOOLEAN), Domain.singleValue(BOOLEAN, false)))); - } - - @Test - public void testDate() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DATE), Domain.singleValue(DATE, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DATE), Domain.multipleValues(DATE, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testVarbinary() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARBINARY), Domain.singleValue(VARBINARY, Slices.utf8Slice("apple"))))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARBINARY), Domain.multipleValues(VARBINARY, ImmutableList.of(Slices.utf8Slice("apple"), Slices.utf8Slice("banana")))))); - } - - @Test - public void testDouble() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DOUBLE), Domain.singleValue(DOUBLE, 1.0d)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(DOUBLE), Domain.multipleValues(DOUBLE, ImmutableList.of(1.0d, 2.0d, 3.0d))))); - } - - @Test - public void testBigint() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.singleValue(BIGINT, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testReal() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(REAL), Domain.singleValue(REAL, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(REAL), Domain.multipleValues(REAL, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testInteger() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.singleValue(INTEGER, 1L)))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(INTEGER, ImmutableList.of(1L, 2L, 3L))))); - } - - @Test - public void testVarchar() - { - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARCHAR), Domain.singleValue(VARCHAR, Slices.utf8Slice("apple"))))); - - assertTupleDomainUnchanged( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(VARCHAR), Domain.multipleValues(VARCHAR, ImmutableList.of(Slices.utf8Slice("apple"), Slices.utf8Slice("banana")))))); - } - - @Test - public void testTimestamp() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.singleValue(TIMESTAMP, 1_234_567_890_123L))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.singleValue(TIMESTAMP, 1_234_567_890_123_000L)))); - - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.multipleValues(TIMESTAMP, ImmutableList.of(1_234_567_890_123L, 1_234_567_890_124L)))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP), Domain.multipleValues(TIMESTAMP, ImmutableList.of(1_234_567_890_123_000L, 1_234_567_890_124_000L))))); - } - - @Test - public void testTime() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123L))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123_000L)))); - - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.multipleValues(TIME, ImmutableList.of(1_234_567_890_123L, 1_234_567_890_124L)))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.multipleValues(TIME, ImmutableList.of(1_234_567_890_123_000L, 1_234_567_890_124_000L))))); - } - - @Test - public void testTimestampWithTimezone() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, 1_234_567_890_123L))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, MILLISECONDS.toMicros(unpackMillisUtc(1_234_567_890_123L)))))); - - List list = ImmutableList.of(1_234_567_890_123L, 1_234_567_890_124L); - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.multipleValues(TIMESTAMP_WITH_TIME_ZONE, list))), - TupleDomain.withColumnDomains( - ImmutableMap.of(ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.multipleValues(TIMESTAMP_WITH_TIME_ZONE, list.stream().map(value -> MILLISECONDS.toMicros(unpackMillisUtc(value))).collect(toImmutableList()))))); - } - - @Test - public void testMultipleColumnsTupleDomain() - { - assertTupleDomain( - TupleDomain.withColumnDomains( - ImmutableMap.of( - ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, 1_234_567_890_123L), - ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123L), - ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L)))), - TupleDomain.withColumnDomains( - ImmutableMap.of( - ICEBERG_COLUMN_PROVIDER.apply(TIMESTAMP_WITH_TIME_ZONE), Domain.singleValue(TIMESTAMP_WITH_TIME_ZONE, MILLISECONDS.toMicros(unpackMillisUtc(1_234_567_890_123L))), - ICEBERG_COLUMN_PROVIDER.apply(TIME), Domain.singleValue(TIME, 1_234_567_890_123_000L), - ICEBERG_COLUMN_PROVIDER.apply(BIGINT), Domain.multipleValues(BIGINT, ImmutableList.of(1L, 2L, 3L))))); - } - - private void assertTupleDomainUnchanged(TupleDomain domain) - { - assertTupleDomain(domain, domain); - } - - private void assertTupleDomain(TupleDomain actual, TupleDomain expected) - { - assertEquals(convertTupleDomainTypes(actual), expected); - } -} diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java index 90d6d3b2d56f..c8cea45f2dd9 100644 --- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java +++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java @@ -102,20 +102,32 @@ public void testShowCreateTable() @Test public void testDecimal() { - for (int precision = 1; precision <= 38; precision++) { - testDecimalWithPrecisionAndScale(precision, precision - 1); - } - - for (int scale = 1; scale < 37; scale++) { - testDecimalWithPrecisionAndScale(38, scale); - } + testWithAllFileFormats((session, format) -> testDecimalForFormat(session, format)); + } - for (int scale = 1; scale < 17; scale++) { - testDecimalWithPrecisionAndScale(18, scale); - } + private void testDecimalForFormat(Session session, FileFormat format) + { + testDecimalWithPrecisionAndScale(session, format, 1, 0); + testDecimalWithPrecisionAndScale(session, format, 8, 6); + testDecimalWithPrecisionAndScale(session, format, 9, 8); + testDecimalWithPrecisionAndScale(session, format, 10, 8); + + testDecimalWithPrecisionAndScale(session, format, 18, 1); + testDecimalWithPrecisionAndScale(session, format, 18, 8); + testDecimalWithPrecisionAndScale(session, format, 18, 17); + + testDecimalWithPrecisionAndScale(session, format, 17, 16); + testDecimalWithPrecisionAndScale(session, format, 18, 17); + testDecimalWithPrecisionAndScale(session, format, 24, 10); + testDecimalWithPrecisionAndScale(session, format, 30, 10); + testDecimalWithPrecisionAndScale(session, format, 37, 26); + testDecimalWithPrecisionAndScale(session, format, 38, 37); + + testDecimalWithPrecisionAndScale(session, format, 38, 17); + testDecimalWithPrecisionAndScale(session, format, 38, 37); } - private void testDecimalWithPrecisionAndScale(int precision, int scale) + private void testDecimalWithPrecisionAndScale(Session session, FileFormat format, int precision, int scale) { checkArgument(precision >= 1 && precision <= 38, "Decimal precision (%s) must be between 1 and 38 inclusive", precision); checkArgument(scale < precision && scale >= 0, "Decimal scale (%s) must be less than the precision (%s) and non-negative", scale, precision); @@ -126,27 +138,66 @@ private void testDecimalWithPrecisionAndScale(int precision, int scale) String afterTheDecimalPoint = "09876543210987654321098765432109876543".substring(0, scale); String decimalValue = format("%s.%s", beforeTheDecimalPoint, afterTheDecimalPoint); - assertUpdate(format("CREATE TABLE %s (x %s)", tableName, decimalType)); - assertUpdate(format("INSERT INTO %s (x) VALUES (CAST('%s' AS %s))", tableName, decimalValue, decimalType), 1); - assertQuery(format("SELECT * FROM %s", tableName), format("SELECT CAST('%s' AS %s)", decimalValue, decimalType)); - dropTable(getSession(), tableName); + assertUpdate(session, format("CREATE TABLE %s (x %s) WITH (format = '%s')", tableName, decimalType, format.name())); + assertUpdate(session, format("INSERT INTO %s (x) VALUES (CAST('%s' AS %s))", tableName, decimalValue, decimalType), 1); + assertQuery(session, format("SELECT * FROM %s", tableName), format("SELECT CAST('%s' AS %s)", decimalValue, decimalType)); + dropTable(session, tableName); + } + + @Test + public void testParquetPartitionByTimestamp() + { + assertUpdate("CREATE TABLE test_parquet_partitioned_by_timestamp (_timestamp timestamp) " + + "WITH (format = 'PARQUET', partitioning = ARRAY['_timestamp'])"); + testSelectOrPartitionedByTimestamp("test_parquet_partitioned_by_timestamp"); + } + + @Test + public void testParquetSelectByTimestamp() + { + assertUpdate("CREATE TABLE test_parquet_select_by_timestamp (_timestamp timestamp) WITH (format = 'PARQUET')"); + testSelectOrPartitionedByTimestamp("test_parquet_select_by_timestamp"); } @Test - public void testTimestamp() + public void testOrcPartitionByTimestamp() { - assertUpdate("CREATE TABLE test_timestamp (x timestamp)"); - assertUpdate("INSERT INTO test_timestamp VALUES (timestamp '2017-05-01 10:12:34')", 1); - assertQuery("SELECT * FROM test_timestamp", "SELECT CAST('2017-05-01 10:12:34' AS TIMESTAMP)"); - dropTable(getSession(), "test_timestamp"); + assertUpdate("CREATE TABLE test_orc_partitioned_by_timestamp (_timestamp timestamp) " + + "WITH (format = 'ORC', partitioning = ARRAY['_timestamp'])"); + testSelectOrPartitionedByTimestamp("test_orc_partitioned_by_timestamp"); + } + + @Test + public void testOrcSelectByTimestamp() + { + assertUpdate("CREATE TABLE test_orc_select_by_timestamp (_timestamp timestamp) " + + "WITH (format = 'ORC')"); + testSelectOrPartitionedByTimestamp("test_orc_select_by_timestamp"); + } + + private void testSelectOrPartitionedByTimestamp(String tableName) + { + String select1 = "SELECT CAST('2017-05-01 10:12:34' AS TIMESTAMP) _timestamp"; + assertUpdate(format("INSERT INTO %s ", tableName) + select1, 1); + String select2 = "SELECT CAST('2017-10-01 10:12:34' AS TIMESTAMP) _timestamp"; + assertUpdate(format("INSERT INTO %s " + select2, tableName), 1); + String select3 = "SELECT CAST('2018-05-01 10:12:34' AS TIMESTAMP) _timestamp"; + assertUpdate(format("INSERT INTO %s " + select3, tableName), 1); + assertQuery(format("SELECT COUNT(*) from %s", tableName), "SELECT 3"); + MaterializedResult result = computeActual("SELECT * FROM " + tableName); + assertQuery(format("SELECT * from %s WHERE _timestamp = CAST('2017-05-01 10:12:34' AS TIMESTAMP)", tableName), select1); + assertQuery(format("SELECT * from %s WHERE _timestamp < CAST('2017-06-01 10:12:34' AS TIMESTAMP)", tableName), select1); + assertQuery(format("SELECT * from %s WHERE _timestamp = CAST('2017-10-01 10:12:34' AS TIMESTAMP)", tableName), select2); + assertQuery(format("SELECT * from %s WHERE _timestamp > CAST('2017-06-01 10:12:34' AS TIMESTAMP) AND _timestamp < CAST('2018-05-01 10:12:34' AS TIMESTAMP)", tableName), select2); + assertQuery(format("SELECT * from %s WHERE _timestamp = CAST('2018-05-01 10:12:34' AS TIMESTAMP)", tableName), select3); + assertQuery(format("SELECT * from %s WHERE _timestamp > CAST('2018-01-01 10:12:34' AS TIMESTAMP)", tableName), select3); + dropTable(getSession(), tableName); } @Test public void testCreatePartitionedTable() { testWithAllFileFormats(this::testCreatePartitionedTable); - testWithAllFileFormats(this::testCreatePartitionedTableWithNestedTypes); - testWithAllFileFormats(this::testPartitionedTableWithNullValues); } private void testCreatePartitionedTable(Session session, FileFormat fileFormat) @@ -159,10 +210,9 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) ", _real REAL" + ", _double DOUBLE" + ", _boolean BOOLEAN" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", _decimal_short DECIMAL(3,2)" + - ", _decimal_long DECIMAL(30,10)" + - ", _timestamp TIMESTAMP") + + ", _decimal_short DECIMAL(3,2)" + + ", _decimal_long DECIMAL(30,10)" + + ", _timestamp TIMESTAMP" + ", _date DATE" + ") " + "WITH (" + @@ -174,10 +224,9 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) " '_boolean'," + " '_real'," + " '_double'," + - returnSqlIfFormatSupportsDecimalsAndTimestamps(FileFormat.PARQUET, "" + - " '_decimal_short', " + - " '_decimal_long'," + - " '_timestamp',") + + " '_decimal_short', " + + " '_decimal_long'," + + " '_timestamp'," + " '_date']" + ")"; @@ -194,10 +243,9 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) ", CAST('123.45' AS REAL) _real" + ", CAST('3.14' AS DOUBLE) _double" + ", true _boolean" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", CAST('3.14' AS DECIMAL(3,2)) _decimal_short" + - ", CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) _decimal_long" + - ", CAST('2017-05-01 10:12:34' AS TIMESTAMP) _timestamp") + + ", CAST('3.14' AS DECIMAL(3,2)) _decimal_short" + + ", CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) _decimal_long" + + ", CAST('2017-05-01 10:12:34' AS TIMESTAMP) _timestamp" + ", CAST('2017-05-01' AS DATE) _date"; assertUpdate(session, "INSERT INTO test_partitioned_table " + select, 1); @@ -208,16 +256,21 @@ private void testCreatePartitionedTable(Session session, FileFormat fileFormat) " AND 456 = _integer" + " AND CAST(123 AS BIGINT) = _bigint" + " AND true = _boolean" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - " AND CAST('3.14' AS DECIMAL(3,2)) = _decimal_short" + - " AND CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) = _decimal_long" + - " AND CAST('2017-05-01 10:12:34' AS TIMESTAMP) = _timestamp") + + " AND CAST('3.14' AS DECIMAL(3,2)) = _decimal_short" + + " AND CAST('12345678901234567890.0123456789' AS DECIMAL(30,10)) = _decimal_long" + + " AND CAST('2017-05-01 10:12:34' AS TIMESTAMP) = _timestamp" + " AND CAST('2017-05-01' AS DATE) = _date", select); dropTable(session, "test_partitioned_table"); } + @Test + public void testCreatePartitionedTableWithNestedTypes() + { + testWithAllFileFormats(this::testCreatePartitionedTableWithNestedTypes); + } + private void testCreatePartitionedTableWithNestedTypes(Session session, FileFormat fileFormat) { @Language("SQL") String createTable = "" + @@ -236,20 +289,25 @@ private void testCreatePartitionedTableWithNestedTypes(Session session, FileForm dropTable(session, "test_partitioned_table_nested_type"); } + @Test + public void testPartitionedTableWithNullValues() + { + testWithAllFileFormats(this::testPartitionedTableWithNullValues); + } + private void testPartitionedTableWithNullValues(Session session, FileFormat fileFormat) { @Language("SQL") String createTable = "" + - "CREATE TABLE test_partitioned_table (" + + "CREATE TABLE test_partitioned_table_with_null_values (" + " _string VARCHAR" + ", _bigint BIGINT" + ", _integer INTEGER" + ", _real REAL" + ", _double DOUBLE" + ", _boolean BOOLEAN" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", _decimal_short DECIMAL(3,2)" + - ", _decimal_long DECIMAL(30,10)" + - ", _timestamp TIMESTAMP") + + ", _decimal_short DECIMAL(3,2)" + + ", _decimal_long DECIMAL(30,10)" + + ", _timestamp TIMESTAMP" + ", _date DATE" + ") " + "WITH (" + @@ -261,16 +319,15 @@ private void testPartitionedTableWithNullValues(Session session, FileFormat file " '_boolean'," + " '_real'," + " '_double'," + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - " '_decimal_short', " + - " '_decimal_long'," + - " '_timestamp',") + + " '_decimal_short', " + + " '_decimal_long'," + + " '_timestamp'," + " '_date']" + ")"; assertUpdate(session, createTable); - MaterializedResult result = computeActual("SELECT * from test_partitioned_table"); + MaterializedResult result = computeActual("SELECT * from test_partitioned_table_with_null_values"); assertEquals(result.getRowCount(), 0); @Language("SQL") String select = "" + @@ -281,15 +338,14 @@ private void testPartitionedTableWithNullValues(Session session, FileFormat file ", null _real" + ", null _double" + ", null _boolean" + - returnSqlIfFormatSupportsDecimalsAndTimestamps(fileFormat, "" + - ", null _decimal_short" + - ", null _decimal_long" + - ", null _timestamp") + + ", null _decimal_short" + + ", null _decimal_long" + + ", null _timestamp" + ", null _date"; - assertUpdate(session, "INSERT INTO test_partitioned_table " + select, 1); - assertQuery(session, "SELECT * from test_partitioned_table", select); - dropTable(session, "test_partitioned_table"); + assertUpdate(session, "INSERT INTO test_partitioned_table_with_null_values " + select, 1); + assertQuery(session, "SELECT * from test_partitioned_table_with_null_values", select); + dropTable(session, "test_partitioned_table_with_null_values"); } @Test @@ -413,13 +469,6 @@ private long getLatestSnapshotId() .getOnlyValue(); } - @Test - public void testSchemaEvolution() - { - // Schema evolution should be id based - testWithAllFileFormats(this::testSchemaEvolution); - } - @Test public void testInsertIntoNotNullColumn() { @@ -436,6 +485,19 @@ public void testInsertIntoNotNullColumn() assertUpdate("DROP TABLE IF EXISTS test_commuted_not_null_table"); } + @Test + public void testSchemaEvolution() + { + // Schema evolution should be id based + testWithAllFileFormats(this::testSchemaEvolution); + } + + @Test + public void testSchemaEvolutionParquet() + { + testSchemaEvolution(getSession(), FileFormat.PARQUET); + } + private void testSchemaEvolution(Session session, FileFormat fileFormat) { assertUpdate(session, "CREATE TABLE test_schema_evolution_drop_end (col0 INTEGER, col1 INTEGER, col2 INTEGER) WITH (format = '" + fileFormat + "')"); @@ -444,6 +506,7 @@ private void testSchemaEvolution(Session session, FileFormat fileFormat) assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end DROP COLUMN col2"); assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1)"); assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end ADD COLUMN col2 INTEGER"); + assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, NULL)"); assertUpdate(session, "INSERT INTO test_schema_evolution_drop_end VALUES (3, 4, 5)", 1); assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, NULL), (3, 4, 5)"); dropTable(session, "test_schema_evolution_drop_end"); @@ -536,12 +599,6 @@ private void testWithAllFileFormats(BiConsumer test) test.accept(getSession(), FileFormat.ORC); } - // TODO: Remove and eliminate callers once we correctly handle Parquet decimals and timestamps - private String returnSqlIfFormatSupportsDecimalsAndTimestamps(FileFormat fileFormat, String sql) - { - return fileFormat == FileFormat.ORC ? sql : ""; - } - private void dropTable(Session session, String table) { assertUpdate(session, "DROP TABLE " + table); diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java index c0e301f22a66..5b6d004271bf 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/MessageTypeConverter.java @@ -149,6 +149,8 @@ private static ConvertedType getConvertedType(OriginalType type) return ConvertedType.TIME_MILLIS; case TIMESTAMP_MILLIS: return ConvertedType.TIMESTAMP_MILLIS; + case TIMESTAMP_MICROS: + return ConvertedType.TIMESTAMP_MICROS; case INTERVAL: return ConvertedType.INTERVAL; case INT_8: diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java index d26b5b379098..5de21e64c65e 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriter.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.List; +import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -60,14 +61,12 @@ public class ParquetWriter private final List columnWriters; private final OutputStreamSliceOutput outputStream; - private final List types; private final ParquetWriterOptions writerOption; - private final List names; private final MessageType messageType; private final int chunkMaxLogicalBytes; - private ImmutableList.Builder rowGroupBuilder = ImmutableList.builder(); + private final ImmutableList.Builder rowGroupBuilder = ImmutableList.builder(); private int rows; private long bufferedBytes; @@ -78,28 +77,23 @@ public class ParquetWriter public ParquetWriter( OutputStream outputStream, - List columnNames, - List types, + MessageType messageType, + Map, Type> primitiveTypes, ParquetWriterOptions writerOption, CompressionCodecName compressionCodecName) { this.outputStream = new OutputStreamSliceOutput(requireNonNull(outputStream, "outputstream is null")); - this.names = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null")); - this.types = ImmutableList.copyOf(requireNonNull(types, "types is null")); + this.messageType = requireNonNull(messageType, "messageType is null"); + requireNonNull(primitiveTypes, "primitiveTypes is null"); this.writerOption = requireNonNull(writerOption, "writerOption is null"); requireNonNull(compressionCodecName, "compressionCodecName is null"); - checkArgument(types.size() == columnNames.size(), "type size %s is not equal to name size %s", types.size(), columnNames.size()); - - ParquetSchemaConverter parquetSchemaConverter = new ParquetSchemaConverter(types, columnNames); - this.messageType = parquetSchemaConverter.getMessageType(); - ParquetProperties parquetProperties = ParquetProperties.builder() .withWriterVersion(PARQUET_2_0) .withPageSize(writerOption.getMaxPageSize()) .build(); - this.columnWriters = ParquetWriters.getColumnWriters(messageType, parquetSchemaConverter.getPrimitiveTypes(), parquetProperties, compressionCodecName); + this.columnWriters = ParquetWriters.getColumnWriters(messageType, primitiveTypes, parquetProperties, compressionCodecName); this.chunkMaxLogicalBytes = max(1, CHUNK_MAX_BYTES / 2); } diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java index 3bea3880fbd8..02e86a40738b 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/ParquetWriters.java @@ -23,6 +23,7 @@ import io.prestosql.parquet.writer.valuewriter.IntegerValueWriter; import io.prestosql.parquet.writer.valuewriter.PrimitiveValueWriter; import io.prestosql.parquet.writer.valuewriter.RealValueWriter; +import io.prestosql.parquet.writer.valuewriter.TimestampValueWriter; import io.prestosql.spi.PrestoException; import io.prestosql.spi.type.CharType; import io.prestosql.spi.type.DecimalType; @@ -159,14 +160,17 @@ private static PrimitiveValueWriter getValueWriter(ValuesWriter valuesWriter, io if (INTEGER.equals(type) || SMALLINT.equals(type) || TINYINT.equals(type)) { return new IntegerValueWriter(valuesWriter, type, parquetType); } + if (BIGINT.equals(type)) { + return new BigintValueWriter(valuesWriter, type, parquetType); + } if (type instanceof DecimalType) { return new DecimalValueWriter(valuesWriter, type, parquetType); } if (DATE.equals(type)) { return new DateValueWriter(valuesWriter, parquetType); } - if (BIGINT.equals(type) || TIMESTAMP.equals(type)) { - return new BigintValueWriter(valuesWriter, type, parquetType); + if (TIMESTAMP.equals(type)) { + return new TimestampValueWriter(valuesWriter, type, parquetType); } if (DOUBLE.equals(type)) { return new DoubleValueWriter(valuesWriter, parquetType); diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java index 72261ceb752e..a116808adb44 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/DecimalValueWriter.java @@ -72,9 +72,9 @@ else if (decimalType.isShort()) { } } - private static byte[] paddingBigInteger(BigInteger bigInteger) + private byte[] paddingBigInteger(BigInteger bigInteger) { - byte[] result = new byte[16]; + byte[] result = new byte[getTypeLength()]; if (bigInteger.signum() < 0) { Arrays.fill(result, (byte) 0xFF); } diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java index b44296c2f7bb..92af649d46b1 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/PrimitiveValueWriter.java @@ -47,6 +47,11 @@ public Statistics getStatistics() return statistics; } + protected int getTypeLength() + { + return parquetType.getTypeLength(); + } + @Override public long getBufferedSize() { diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/TimestampValueWriter.java b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/TimestampValueWriter.java new file mode 100644 index 000000000000..a78fcd9f0099 --- /dev/null +++ b/presto-parquet/src/main/java/io/prestosql/parquet/writer/valuewriter/TimestampValueWriter.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.parquet.writer.valuewriter; + +import io.prestosql.spi.block.Block; +import io.prestosql.spi.type.Type; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +public class TimestampValueWriter + extends PrimitiveValueWriter +{ + private final Type type; + private final boolean writeMicroseconds; + + public TimestampValueWriter(ValuesWriter valuesWriter, Type type, PrimitiveType parquetType) + { + super(parquetType, valuesWriter); + this.type = requireNonNull(type, "type is null"); + this.writeMicroseconds = parquetType.isPrimitive() && parquetType.getOriginalType() == OriginalType.TIMESTAMP_MICROS; + } + + @Override + public void write(Block block) + { + for (int i = 0; i < block.getPositionCount(); i++) { + if (!block.isNull(i)) { + long value = type.getLong(block, i); + long scaledValue = writeMicroseconds ? MILLISECONDS.toMicros(value) : value; + getValueWriter().writeLong(scaledValue); + getStatistics().updateStats(scaledValue); + } + } + } +}