Skip to content

Commit

Permalink
Change native parquet writer to write v1 parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
joshthoward authored and findepi committed Oct 20, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 90b43c3 commit cd52526
Showing 5 changed files with 49 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@
import static java.lang.Math.toIntExact;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.Objects.requireNonNull;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;

public class ParquetWriter
implements Closeable
@@ -90,7 +90,7 @@ public ParquetWriter(
requireNonNull(compressionCodecName, "compressionCodecName is null");

ParquetProperties parquetProperties = ParquetProperties.builder()
.withWriterVersion(PARQUET_2_0)
.withWriterVersion(PARQUET_1_0)
.withPageSize(writerOption.getMaxPageSize())
.build();

Original file line number Diff line number Diff line change
@@ -148,8 +148,8 @@ public ColumnWriter primitive(PrimitiveType primitive)
return new PrimitiveColumnWriter(
columnDescriptor,
getValueWriter(parquetProperties.newValuesWriter(columnDescriptor), trinoType, columnDescriptor.getPrimitiveType()),
parquetProperties.newDefinitionLevelEncoder(columnDescriptor),
parquetProperties.newRepetitionLevelEncoder(columnDescriptor),
parquetProperties.newDefinitionLevelWriter(columnDescriptor),
parquetProperties.newRepetitionLevelWriter(columnDescriptor),
compressionCodecName,
parquetProperties.getPageSizeThreshold());
}
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.PageEncodingStats;
import org.apache.parquet.format.PageType;
@@ -63,26 +63,25 @@ public class PrimitiveColumnWriter
private final CompressionCodecName compressionCodec;

private final PrimitiveValueWriter primitiveValueWriter;
private final RunLengthBitPackingHybridEncoder definitionLevelEncoder;
private final RunLengthBitPackingHybridEncoder repetitionLevelEncoder;
private final ValuesWriter definitionLevelWriter;
private final ValuesWriter repetitionLevelWriter;

private final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();

private boolean closed;
private boolean getDataStreamsCalled;

// current page stats
private int currentPageRows;
private int valueCount;
private int currentPageNullCounts;
private int currentPageRowCount;

// column meta data stats
private final Set<Encoding> encodings = new HashSet<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dataPagesWithEncoding = new HashMap<>();
private final Map<org.apache.parquet.format.Encoding, Integer> dictionaryPagesWithEncoding = new HashMap<>();
private long totalCompressedSize;
private long totalUnCompressedSize;
private long totalRows;
private long totalValues;
private Statistics<?> columnStatistics;

private final int maxDefinitionLevel;
@@ -94,18 +93,16 @@ public class PrimitiveColumnWriter

private final int pageSizeThreshold;

public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWriter primitiveValueWriter, RunLengthBitPackingHybridEncoder definitionLevelEncoder, RunLengthBitPackingHybridEncoder repetitionLevelEncoder, CompressionCodecName compressionCodecName, int pageSizeThreshold)
public PrimitiveColumnWriter(ColumnDescriptor columnDescriptor, PrimitiveValueWriter primitiveValueWriter, ValuesWriter definitionLevelWriter, ValuesWriter repetitionLevelWriter, CompressionCodecName compressionCodecName, int pageSizeThreshold)
{
this.columnDescriptor = requireNonNull(columnDescriptor, "columnDescriptor is null");
this.maxDefinitionLevel = columnDescriptor.getMaxDefinitionLevel();

this.definitionLevelEncoder = requireNonNull(definitionLevelEncoder, "definitionLevelEncoder is null");
this.repetitionLevelEncoder = requireNonNull(repetitionLevelEncoder, "repetitionLevelEncoder is null");
this.definitionLevelWriter = requireNonNull(definitionLevelWriter, "definitionLevelWriter is null");
this.repetitionLevelWriter = requireNonNull(repetitionLevelWriter, "repetitionLevelWriter is null");
this.primitiveValueWriter = requireNonNull(primitiveValueWriter, "primitiveValueWriter is null");
this.compressionCodec = requireNonNull(compressionCodecName, "compressionCodecName is null");
this.compressor = getCompressor(compressionCodecName);
this.pageSizeThreshold = pageSizeThreshold;

this.columnStatistics = Statistics.createStats(columnDescriptor.getPrimitiveType());
}

@@ -132,21 +129,18 @@ public void writeBlock(ColumnChunk columnChunk)
Iterator<Integer> defIterator = DefLevelIterables.getIterator(current.getDefLevelIterables());
while (defIterator.hasNext()) {
int next = defIterator.next();
definitionLevelEncoder.writeInt(next);
definitionLevelWriter.writeInteger(next);
if (next != maxDefinitionLevel) {
currentPageNullCounts++;
}
currentPageRows++;
valueCount++;
}

// write repetition levels
Iterator<Integer> repIterator = getIterator(current.getRepLevelIterables());
while (repIterator.hasNext()) {
int next = repIterator.next();
repetitionLevelEncoder.writeInt(next);
if (next == 0) {
currentPageRowCount++;
}
repetitionLevelWriter.writeInteger(next);
}

if (getBufferedBytes() >= pageSizeThreshold) {
@@ -178,14 +172,14 @@ private ColumnMetaData getColumnMetaData()
encodings.stream().map(parquetMetadataConverter::getEncoding).collect(toImmutableList()),
ImmutableList.copyOf(columnDescriptor.getPath()),
compressionCodec.getParquetCompressionCodec(),
totalRows,
totalValues,
totalUnCompressedSize,
totalCompressedSize,
-1);
columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics));
ImmutableList.Builder<PageEncodingStats> pageEncodingStats = ImmutableList.builder();
dataPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE_V2, encodingAndCount.getKey(), encodingAndCount.getValue()))
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
.forEach(pageEncodingStats::add);
dictionaryPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DICTIONARY_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
@@ -203,76 +197,61 @@ private void flushCurrentPageToBuffer()
{
ImmutableList.Builder<ParquetDataOutput> outputDataStreams = ImmutableList.builder();

BytesInput bytes = primitiveValueWriter.getBytes();
ParquetDataOutput repetitions = createDataOutput(copy(repetitionLevelEncoder.toBytes()));
ParquetDataOutput definitions = createDataOutput(copy(definitionLevelEncoder.toBytes()));

// Add encoding should be called after primitiveValueWriter.getBytes() and before primitiveValueWriter.reset()
encodings.add(primitiveValueWriter.getEncoding());

long uncompressedSize = bytes.size() + repetitions.size() + definitions.size();

ParquetDataOutput data;
long compressedSize;
if (compressor != null) {
data = compressor.compress(bytes);
compressedSize = data.size() + repetitions.size() + definitions.size();
}
else {
data = createDataOutput(copy(bytes));
compressedSize = uncompressedSize;
}
BytesInput bytesInput = BytesInput.concat(copy(repetitionLevelWriter.getBytes()),
copy(definitionLevelWriter.getBytes()),
copy(primitiveValueWriter.getBytes()));
ParquetDataOutput pageData = (compressor != null) ? compressor.compress(bytesInput) : createDataOutput(bytesInput);
long uncompressedSize = bytesInput.size();
long compressedSize = pageData.size();

ByteArrayOutputStream pageHeaderOutputStream = new ByteArrayOutputStream();

Statistics<?> statistics = primitiveValueWriter.getStatistics();
statistics.incrementNumNulls(currentPageNullCounts);

columnStatistics.mergeStatistics(statistics);

parquetMetadataConverter.writeDataPageV2Header((int) uncompressedSize,
parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize,
(int) compressedSize,
currentPageRows,
currentPageNullCounts,
currentPageRowCount,
statistics,
valueCount,
repetitionLevelWriter.getEncoding(),
definitionLevelWriter.getEncoding(),
primitiveValueWriter.getEncoding(),
(int) repetitions.size(),
(int) definitions.size(),
pageHeaderOutputStream);

ParquetDataOutput pageHeader = createDataOutput(Slices.wrappedBuffer(pageHeaderOutputStream.toByteArray()));
outputDataStreams.add(pageHeader);
outputDataStreams.add(repetitions);
outputDataStreams.add(definitions);
outputDataStreams.add(data);
outputDataStreams.add(pageData);

List<ParquetDataOutput> dataOutputs = outputDataStreams.build();

dataPagesWithEncoding.merge(new ParquetMetadataConverter().getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum);
dataPagesWithEncoding.merge(parquetMetadataConverter.getEncoding(primitiveValueWriter.getEncoding()), 1, Integer::sum);

// update total stats
totalCompressedSize += pageHeader.size() + compressedSize;
totalUnCompressedSize += pageHeader.size() + uncompressedSize;
totalRows += currentPageRows;
totalCompressedSize += pageHeader.size() + compressedSize;
totalValues += valueCount;

pageBuffer.addAll(dataOutputs);

// Add encoding should be called after ValuesWriter#getBytes() and before ValuesWriter#reset()
encodings.add(repetitionLevelWriter.getEncoding());
encodings.add(definitionLevelWriter.getEncoding());
encodings.add(primitiveValueWriter.getEncoding());

// reset page stats
currentPageRows = 0;
valueCount = 0;
currentPageNullCounts = 0;
currentPageRowCount = 0;

definitionLevelEncoder.reset();
repetitionLevelEncoder.reset();
repetitionLevelWriter.reset();
definitionLevelWriter.reset();
primitiveValueWriter.reset();
}

private List<ParquetDataOutput> getDataStreams()
throws IOException
{
List<ParquetDataOutput> dictPage = new ArrayList<>();
if (currentPageRows > 0) {
if (valueCount > 0) {
flushCurrentPageToBuffer();
}
// write dict page if possible
@@ -314,8 +293,8 @@ private List<ParquetDataOutput> getDataStreams()
public long getBufferedBytes()
{
return pageBuffer.stream().mapToLong(ParquetDataOutput::size).sum() +
definitionLevelEncoder.getBufferedSize() +
repetitionLevelEncoder.getBufferedSize() +
definitionLevelWriter.getBufferedSize() +
repetitionLevelWriter.getBufferedSize() +
primitiveValueWriter.getBufferedSize();
}

@@ -324,22 +303,22 @@ public long getRetainedBytes()
{
return INSTANCE_SIZE +
primitiveValueWriter.getAllocatedSize() +
definitionLevelEncoder.getAllocatedSize() +
repetitionLevelEncoder.getAllocatedSize();
definitionLevelWriter.getAllocatedSize() +
repetitionLevelWriter.getAllocatedSize();
}

@Override
public void reset()
{
definitionLevelEncoder.reset();
repetitionLevelEncoder.reset();
definitionLevelWriter.reset();
repetitionLevelWriter.reset();
primitiveValueWriter.reset();
pageBuffer.clear();
closed = false;

totalCompressedSize = 0;
totalUnCompressedSize = 0;
totalRows = 0;
totalValues = 0;
encodings.clear();
dataPagesWithEncoding.clear();
dictionaryPagesWithEncoding.clear();
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@
import io.trino.tempto.Requirement;
import io.trino.tempto.RequirementsProvider;
import io.trino.tempto.configuration.Configuration;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

@@ -32,7 +31,6 @@
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestHiveCompression
extends HiveProductTest
@@ -91,20 +89,7 @@ public void testSnappyCompressedParquetTableCreatedInTrino()
@Test(groups = HIVE_COMPRESSION)
public void testSnappyCompressedParquetTableCreatedInTrinoWithNativeWriter()
{
if (getHiveVersionMajor() >= 2) {
testSnappyCompressedParquetTableCreatedInTrino(true);
return;
}

// TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet writer creates files that cannot be read by Hive
assertThatThrownBy(() -> testSnappyCompressedParquetTableCreatedInTrino(true))
.hasStackTraceContaining("at org.apache.hive.jdbc.HiveQueryResultSet.next") // comes via Hive JDBC
.extracting(Throwable::toString, InstanceOfAssertFactories.STRING)
// There are a few cases here each of which are downstream:
// - HDP 2 and CDH 5 cannot read Parquet V2 files and throw "org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file"
// - CDH 5 Parquet uses parquet.* packages, while HDP 2 uses org.apache.parquet.* packages
// - HDP 3 throws java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveVarcharWritable
.matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: java.io.IOException:\\E (org.apache.)?parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file .*");
testSnappyCompressedParquetTableCreatedInTrino(true);
}

private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedParquetWriter)
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@
package io.trino.tests.product.hive;

import io.trino.tempto.ProductTest;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@@ -36,7 +35,6 @@
import static java.lang.String.join;
import static java.util.Collections.nCopies;
import static java.util.Locale.ENGLISH;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestHiveSparkCompatibility
extends ProductTest
@@ -215,11 +213,7 @@ public void testReadTrinoCreatedParquetTable()
public void testReadTrinoCreatedParquetTableWithNativeWriter()
{
onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".experimental_parquet_optimized_writer_enabled = true");
// TODO (https://github.com/trinodb/trino/issues/6377) Native Parquet Writer writes Parquet V2 files that are not compatible with Spark's vectorized reader, see https://github.com/trinodb/trino/issues/7953 for more details
assertThatThrownBy(() -> testReadTrinoCreatedTable("using_native_parquet", "PARQUET"))
.hasStackTraceContaining("at org.apache.hive.jdbc.HiveStatement.execute")
.extracting(Throwable::toString, InstanceOfAssertFactories.STRING)
.matches("\\Qio.trino.tempto.query.QueryExecutionException: java.sql.SQLException: Error running query: java.lang.UnsupportedOperationException: Unsupported encoding: RLE\\E");
testReadTrinoCreatedTable("using_native_parquet", "PARQUET");
}

private void testReadTrinoCreatedTable(String tableName, String tableFormat)

0 comments on commit cd52526

Please sign in to comment.