Skip to content

Commit

Permalink
Change Iceberg to use ParquetFileWriter
Browse files Browse the repository at this point in the history
This commit changes the Iceberg connector to use the new
ParquetFileWriter, and fixes bugs in handling of decimal and
timestamp columns for both Orc and Parquet.

This commit removes DomainConverter and the test
TestDomainConverter, superseded by fixes and conversions
in ExpressionConverter, MessageTypeConverter and
new class TimestampValueWriter.

This commit adds new TestIcebergSmoke tests for decimal
and timestamp, and fixes the existing tests.  With this
commit, all TestIcebergSmoke tests pass.
  • Loading branch information
djsstarburst authored and electrum committed Jul 22, 2020
1 parent 5fda809 commit 6398acc
Show file tree
Hide file tree
Showing 25 changed files with 450 additions and 546 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.type.Type;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.openjdk.jol.info.ClassLayout;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -50,8 +52,9 @@ public class ParquetFileWriter
public ParquetFileWriter(
OutputStream outputStream,
Callable<Void> rollbackAction,
List<String> columnNames,
List<Type> fileColumnTypes,
MessageType messageType,
Map<List<String>, Type> primitiveTypes,
ParquetWriterOptions parquetWriterOptions,
int[] fileInputColumnIndexes,
CompressionCodecName compressionCodecName)
Expand All @@ -60,8 +63,8 @@ public ParquetFileWriter(

this.parquetWriter = new ParquetWriter(
outputStream,
columnNames,
fileColumnTypes,
messageType,
primitiveTypes,
parquetWriterOptions,
compressionCodecName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestosql.plugin.hive.parquet;

import io.prestosql.parquet.writer.ParquetSchemaConverter;
import io.prestosql.parquet.writer.ParquetWriterOptions;
import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.plugin.hive.HdfsEnvironment;
Expand Down Expand Up @@ -122,11 +123,14 @@ public Optional<FileWriter> createFileWriter(
return null;
};

ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(fileColumnTypes, fileColumnNames);

return Optional.of(new ParquetFileWriter(
fileSystem.create(path),
rollbackAction,
fileColumnNames,
fileColumnTypes,
schemaConverter.getMessageType(),
schemaConverter.getPrimitiveTypes(),
parquetWriterOptions,
fileInputColumnIndexes,
compressionCodecName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
import io.prestosql.orc.metadata.OrcType;
import io.prestosql.parquet.writer.ParquetSchemaConverter;
import io.prestosql.parquet.writer.ParquetWriter;
import io.prestosql.parquet.writer.ParquetWriterOptions;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
Expand Down Expand Up @@ -499,10 +500,12 @@ private static class PrestoParquetFormatWriter
public PrestoParquetFormatWriter(File targetFile, List<String> columnNames, List<Type> types, HiveCompressionCodec compressionCodec)
throws IOException
{
ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(types, columnNames);

writer = new ParquetWriter(
new FileOutputStream(targetFile),
columnNames,
types,
schemaConverter.getMessageType(),
schemaConverter.getPrimitiveTypes(),
ParquetWriterOptions.builder().build(),
compressionCodec.getParquetCompressionCodec());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.prestosql.parquet.writer.ParquetSchemaConverter;
import io.prestosql.parquet.writer.ParquetWriter;
import io.prestosql.parquet.writer.ParquetWriterOptions;
import io.prestosql.plugin.hive.HiveConfig;
Expand Down Expand Up @@ -712,10 +713,11 @@ private static void writeParquetColumnPresto(File outputFile, List<Type> types,
throws Exception
{
checkArgument(types.size() == columnNames.size() && types.size() == values.length);
ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(types, columnNames);
ParquetWriter writer = new ParquetWriter(
new FileOutputStream(outputFile),
columnNames,
types,
schemaConverter.getMessageType(),
schemaConverter.getPrimitiveTypes(),
ParquetWriterOptions.builder()
.setMaxPageSize(DataSize.ofBytes(100))
.setMaxBlockSize(DataSize.ofBytes(100000))
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.prestosql.spi.type.MapType;
import io.prestosql.spi.type.RealType;
import io.prestosql.spi.type.RowType;
import io.prestosql.spi.type.TimestampType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
Expand All @@ -38,6 +39,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -198,6 +200,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
return new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), decimalType.getScale());
}

if (type instanceof TimestampType) {
return TimeUnit.MILLISECONDS.toMicros((long) marker.getValue());
}

return marker.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
import io.prestosql.plugin.hive.FileWriter;
import org.apache.iceberg.Metrics;

import java.util.Optional;

public interface IcebergFileWriter
extends FileWriter
{
Optional<Metrics> getMetrics();
Metrics getMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
import io.prestosql.parquet.writer.ParquetWriterOptions;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveStorageFormat;
import io.prestosql.plugin.hive.NodeVersion;
import io.prestosql.plugin.hive.orc.HdfsOrcDataSource;
import io.prestosql.plugin.hive.orc.OrcWriterConfig;
Expand All @@ -33,21 +33,17 @@
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;

import javax.inject.Inject;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import java.util.stream.IntStream;
Expand All @@ -56,8 +52,6 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_VERSION_NAME;
import static io.prestosql.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.prestosql.plugin.hive.util.ParquetRecordWriterUtil.setParquetSchema;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getCompressionCodec;
Expand All @@ -67,13 +61,14 @@
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxStripeSize;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMinStripeSize;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterValidateMode;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetWriterBlockSize;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageSize;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate;
import static io.prestosql.plugin.iceberg.TypeConverter.toHiveType;
import static io.prestosql.plugin.iceberg.TypeConverter.toOrcType;
import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
import static io.prestosql.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
import static org.joda.time.DateTimeZone.UTC;

Expand Down Expand Up @@ -103,7 +98,6 @@ public IcebergFileWriterFactory(
}

@Managed
@Flatten
public OrcWriterStats getOrcWriterStats()
{
return orcWriterStats;
Expand All @@ -112,14 +106,13 @@ public OrcWriterStats getOrcWriterStats()
public IcebergFileWriter createFileWriter(
Path outputPath,
Schema icebergSchema,
List<IcebergColumnHandle> columns,
JobConf jobConf,
ConnectorSession session,
FileFormat fileFormat)
{
switch (fileFormat) {
case PARQUET:
return createParquetWriter(outputPath, icebergSchema, columns, jobConf, session);
return createParquetWriter(outputPath, icebergSchema, jobConf, session);
case ORC:
return createOrcWriter(outputPath, icebergSchema, jobConf, session);
}
Expand All @@ -129,32 +122,44 @@ public IcebergFileWriter createFileWriter(
private IcebergFileWriter createParquetWriter(
Path outputPath,
Schema icebergSchema,
List<IcebergColumnHandle> columns,
JobConf jobConf,
ConnectorSession session)
{
Properties properties = new Properties();
properties.setProperty(IOConstants.COLUMNS, columns.stream()
.map(IcebergColumnHandle::getName)
.collect(joining(",")));
properties.setProperty(IOConstants.COLUMNS_TYPES, columns.stream()
.map(column -> toHiveType(column.getType()).getHiveTypeName().toString())
.collect(joining(":")));

setParquetSchema(jobConf, convert(icebergSchema, "table"));
jobConf.set(ParquetOutputFormat.COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().name());

return new IcebergRecordFileWriter(
outputPath,
columns.stream()
.map(IcebergColumnHandle::getName)
.collect(toImmutableList()),
fromHiveStorageFormat(HiveStorageFormat.PARQUET),
properties,
HiveStorageFormat.PARQUET.getEstimatedWriterSystemMemoryUsage(),
jobConf,
typeManager,
session);
List<String> fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
.collect(toImmutableList());
List<Type> fileColumnTypes = icebergSchema.columns().stream()
.map(column -> toPrestoType(column.type(), typeManager))
.collect(toImmutableList());

try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), outputPath, jobConf);

Callable<Void> rollbackAction = () -> {
fileSystem.delete(outputPath, false);
return null;
};

ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder()
.setMaxPageSize(getParquetWriterPageSize(session))
.setMaxPageSize(getParquetWriterBlockSize(session))
.build();

return new IcebergParquetFileWriter(
fileSystem.create(outputPath),
rollbackAction,
fileColumnTypes,
convert(icebergSchema, "table"),
makeTypeMap(fileColumnTypes, fileColumnNames),
parquetWriterOptions,
IntStream.range(0, fileColumnNames.size()).toArray(),
getCompressionCodec(session).getParquetCompressionCodec(),
outputPath,
jobConf);
}
catch (IOException e) {
throw new PrestoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
}
}

private IcebergFileWriter createOrcWriter(
Expand Down
Loading

0 comments on commit 6398acc

Please sign in to comment.