From d31a89165d4e9574eac76122d93810d139388334 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 27 Feb 2024 17:04:52 +0800 Subject: [PATCH] PARQUET-2261: Implement SizeStatistics (#1177) --- .../column/impl/ColumnValueCollector.java | 193 +++++++++++++ .../parquet/column/impl/ColumnWriterBase.java | 103 ++----- .../parquet/column/impl/ColumnWriterV1.java | 3 + .../parquet/column/impl/ColumnWriterV2.java | 5 +- .../parquet/column/page/PageWriter.java | 53 ++++ .../column/statistics/SizeStatistics.java | 260 ++++++++++++++++++ .../column/columnindex/ColumnIndex.java | 16 ++ .../columnindex/ColumnIndexBuilder.java | 118 +++++++- .../column/columnindex/OffsetIndex.java | 12 + .../columnindex/OffsetIndexBuilder.java | 71 ++++- .../parquet/column/mem/TestMemColumn.java | 8 +- .../column/page/mem/MemPageWriter.java | 31 +++ .../column/statistics/TestSizeStatistics.java | 127 +++++++++ .../converter/ParquetMetadataConverter.java | 70 ++++- .../hadoop/ColumnChunkPageWriteStore.java | 75 ++++- .../hadoop/ColumnIndexFilterUtils.java | 6 + .../parquet/hadoop/ParquetFileWriter.java | 186 ++++++++++++- .../hadoop/metadata/ColumnChunkMetaData.java | 93 ++++++- .../TestParquetMetadataConverter.java | 161 +++++++---- .../hadoop/TestColumnChunkPageWriteStore.java | 1 + .../TestSizeStatisticsRoundTrip.java | 104 +++++++ 21 files changed, 1518 insertions(+), 178 deletions(-) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnValueCollector.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/statistics/SizeStatistics.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/statistics/TestSizeStatistics.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnValueCollector.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnValueCollector.java new file mode 100644 index 0000000000..bb3135a189 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnValueCollector.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.impl; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.OptionalDouble; +import java.util.OptionalLong; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.AdaptiveBlockSplitBloomFilter; +import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; +import org.apache.parquet.io.api.Binary; + +// An internal class to collect column values to build column statistics and bloom filter. +class ColumnValueCollector { + + private final ColumnDescriptor path; + private BloomFilterWriter bloomFilterWriter; + private BloomFilter bloomFilter; + private Statistics statistics; + private SizeStatistics.Builder sizeStatisticsBuilder; + + ColumnValueCollector(ColumnDescriptor path, BloomFilterWriter bloomFilterWriter, ParquetProperties props) { + this.path = path; + resetPageStatistics(); + initBloomFilter(bloomFilterWriter, props); + } + + void resetPageStatistics() { + this.statistics = Statistics.createStats(path.getPrimitiveType()); + this.sizeStatisticsBuilder = SizeStatistics.newBuilder( + path.getPrimitiveType(), path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel()); + } + + void writeNull(int repetitionLevel, int definitionLevel) { + statistics.incrementNumNulls(); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); + } + + void write(boolean value, int repetitionLevel, int definitionLevel) { + statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); + } + + void write(int value, int repetitionLevel, int definitionLevel) { + statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); + bloomFilter.insertHash(bloomFilter.hash(value)); + } + + void write(long value, int repetitionLevel, int definitionLevel) { + statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); + bloomFilter.insertHash(bloomFilter.hash(value)); + } + + void write(float value, int repetitionLevel, int definitionLevel) { + statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); + bloomFilter.insertHash(bloomFilter.hash(value)); + } + + void write(double value, int repetitionLevel, int definitionLevel) { + statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel); + bloomFilter.insertHash(bloomFilter.hash(value)); + } + + void write(Binary value, int repetitionLevel, int definitionLevel) { + statistics.updateStats(value); + sizeStatisticsBuilder.add(repetitionLevel, definitionLevel, value); + bloomFilter.insertHash(bloomFilter.hash(value)); + } + + void initBloomFilter(BloomFilterWriter bloomFilterWriter, ParquetProperties props) { + this.bloomFilterWriter = bloomFilterWriter; + if (bloomFilterWriter == null) { + this.bloomFilter = new BloomFilter() { + @Override + public void writeTo(OutputStream out) throws IOException {} + + @Override + public void insertHash(long hash) {} + + @Override + public boolean findHash(long hash) { + return false; + } + + @Override + public int getBitsetSize() { + return 0; + } + + @Override + public long hash(int value) { + return 0; + } + + @Override + public long hash(long value) { + return 0; + } + + @Override + public long hash(double value) { + return 0; + } + + @Override + public long hash(float value) { + return 0; + } + + @Override + public long hash(Binary value) { + return 0; + } + + @Override + public long hash(Object value) { + return 0; + } + + @Override + public HashStrategy getHashStrategy() { + return null; + } + + @Override + public Algorithm getAlgorithm() { + return null; + } + + @Override + public Compression getCompression() { + return null; + } + }; + return; + } + + int maxBloomFilterSize = props.getMaxBloomFilterBytes(); + OptionalLong ndv = props.getBloomFilterNDV(path); + OptionalDouble fpp = props.getBloomFilterFPP(path); + // If user specify the column NDV, we construct Bloom filter from it. + if (ndv.isPresent()) { + int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble()); + this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize); + } else if (props.getAdaptiveBloomFilterEnabled(path)) { + int numCandidates = props.getBloomFilterCandidatesCount(path); + this.bloomFilter = + new AdaptiveBlockSplitBloomFilter(maxBloomFilterSize, numCandidates, fpp.getAsDouble(), path); + } else { + this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, maxBloomFilterSize); + } + } + + void finalizeColumnChunk() { + if (bloomFilterWriter != null) { + bloomFilterWriter.writeBloomFilter(bloomFilter); + } + } + + Statistics getStatistics() { + return statistics; + } + + SizeStatistics getSizeStatistics() { + return sizeStatisticsBuilder.build(); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index c099cb7e2a..e0d0e1a19a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -19,18 +19,14 @@ package org.apache.parquet.column.impl; import java.io.IOException; -import java.util.OptionalDouble; -import java.util.OptionalLong; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; -import org.apache.parquet.column.values.bloomfilter.AdaptiveBlockSplitBloomFilter; -import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; -import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.io.api.Binary; @@ -54,12 +50,10 @@ abstract class ColumnWriterBase implements ColumnWriter { private ValuesWriter dataColumn; private int valueCount; - private Statistics statistics; private long rowsWrittenSoFar = 0; private int pageRowCount; - private final BloomFilterWriter bloomFilterWriter; - private final BloomFilter bloomFilter; + private final ColumnValueCollector collector; ColumnWriterBase(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) { this(path, pageWriter, null, props); @@ -72,34 +66,12 @@ abstract class ColumnWriterBase implements ColumnWriter { ParquetProperties props) { this.path = path; this.pageWriter = pageWriter; - resetStatistics(); this.repetitionLevelColumn = createRLWriter(props, path); this.definitionLevelColumn = createDLWriter(props, path); this.dataColumn = props.newValuesWriter(path); - this.bloomFilterWriter = bloomFilterWriter; - if (bloomFilterWriter == null) { - this.bloomFilter = null; - return; - } - int maxBloomFilterSize = props.getMaxBloomFilterBytes(); - - OptionalLong ndv = props.getBloomFilterNDV(path); - OptionalDouble fpp = props.getBloomFilterFPP(path); - // If user specify the column NDV, we construct Bloom filter from it. - if (ndv.isPresent()) { - int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble()); - this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize); - } else { - if (props.getAdaptiveBloomFilterEnabled(path)) { - int numCandidates = props.getBloomFilterCandidatesCount(path); - this.bloomFilter = - new AdaptiveBlockSplitBloomFilter(maxBloomFilterSize, numCandidates, fpp.getAsDouble(), path); - } else { - this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, maxBloomFilterSize); - } - } + this.collector = new ColumnValueCollector(path, bloomFilterWriter, props); } abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path); @@ -110,10 +82,6 @@ private void log(Object value, int r, int d) { LOG.debug("{} {} r:{} d:{}", path, value, r, d); } - private void resetStatistics() { - this.statistics = Statistics.createStats(path.getPrimitiveType()); - } - private void definitionLevel(int definitionLevel) { definitionLevelColumn.writeInteger(definitionLevel); } @@ -137,7 +105,7 @@ public void writeNull(int repetitionLevel, int definitionLevel) { if (DEBUG) log(null, repetitionLevel, definitionLevel); repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); - statistics.incrementNumNulls(); + collector.writeNull(repetitionLevel, definitionLevel); ++valueCount; } @@ -157,36 +125,6 @@ public long getBufferedSizeInMemory() { + pageWriter.getMemSize(); } - private void updateBloomFilter(int value) { - if (bloomFilter != null) { - bloomFilter.insertHash(bloomFilter.hash(value)); - } - } - - private void updateBloomFilter(long value) { - if (bloomFilter != null) { - bloomFilter.insertHash(bloomFilter.hash(value)); - } - } - - private void updateBloomFilter(double value) { - if (bloomFilter != null) { - bloomFilter.insertHash(bloomFilter.hash(value)); - } - } - - private void updateBloomFilter(float value) { - if (bloomFilter != null) { - bloomFilter.insertHash(bloomFilter.hash(value)); - } - } - - private void updateBloomFilter(Binary value) { - if (bloomFilter != null) { - bloomFilter.insertHash(bloomFilter.hash(value)); - } - } - /** * Writes the current value * @@ -200,8 +138,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) { repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); dataColumn.writeDouble(value); - statistics.updateStats(value); - updateBloomFilter(value); + collector.write(value, repetitionLevel, definitionLevel); ++valueCount; } @@ -218,8 +155,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) { repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); dataColumn.writeFloat(value); - statistics.updateStats(value); - updateBloomFilter(value); + collector.write(value, repetitionLevel, definitionLevel); ++valueCount; } @@ -236,8 +172,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) { repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); dataColumn.writeBytes(value); - statistics.updateStats(value); - updateBloomFilter(value); + collector.write(value, repetitionLevel, definitionLevel); ++valueCount; } @@ -254,7 +189,7 @@ public void write(boolean value, int repetitionLevel, int definitionLevel) { repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); dataColumn.writeBoolean(value); - statistics.updateStats(value); + collector.write(value, repetitionLevel, definitionLevel); ++valueCount; } @@ -271,8 +206,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) { repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); dataColumn.writeInteger(value); - statistics.updateStats(value); - updateBloomFilter(value); + collector.write(value, repetitionLevel, definitionLevel); ++valueCount; } @@ -289,8 +223,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) { repetitionLevel(repetitionLevel); definitionLevel(definitionLevel); dataColumn.writeLong(value); - statistics.updateStats(value); - updateBloomFilter(value); + collector.write(value, repetitionLevel, definitionLevel); ++valueCount; } @@ -310,9 +243,7 @@ void finalizeColumnChunk() { dataColumn.resetDictionary(); } - if (bloomFilterWriter != null && bloomFilter != null) { - bloomFilterWriter.writeBloomFilter(bloomFilter); - } + collector.finalizeColumnChunk(); } /** @@ -389,7 +320,14 @@ void writePage() { this.rowsWrittenSoFar += pageRowCount; if (DEBUG) LOG.debug("write page"); try { - writePage(pageRowCount, valueCount, statistics, repetitionLevelColumn, definitionLevelColumn, dataColumn); + writePage( + pageRowCount, + valueCount, + collector.getStatistics(), + collector.getSizeStatistics(), + repetitionLevelColumn, + definitionLevelColumn, + dataColumn); } catch (IOException e) { throw new ParquetEncodingException("could not write page for " + path, e); } @@ -397,7 +335,7 @@ void writePage() { definitionLevelColumn.reset(); dataColumn.reset(); valueCount = 0; - resetStatistics(); + collector.resetPageStatistics(); pageRowCount = 0; } @@ -405,6 +343,7 @@ abstract void writePage( int rowCount, int valueCount, Statistics statistics, + SizeStatistics sizeStatistics, ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java index c63e425e3d..e15f9ecb34 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java @@ -24,6 +24,7 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; @@ -60,6 +61,7 @@ void writePage( int rowCount, int valueCount, Statistics statistics, + SizeStatistics sizeStatistics, ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) @@ -69,6 +71,7 @@ void writePage( valueCount, rowCount, statistics, + sizeStatistics, repetitionLevels.getEncoding(), definitionLevels.getEncoding(), values.getEncoding()); diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java index d098623667..b66749e093 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java @@ -24,6 +24,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter; @@ -86,6 +87,7 @@ void writePage( int rowCount, int valueCount, Statistics statistics, + SizeStatistics sizeStatistics, ValuesWriter repetitionLevels, ValuesWriter definitionLevels, ValuesWriter values) @@ -102,6 +104,7 @@ void writePage( definitionLevels.getBytes(), encoding, bytes, - statistics); + statistics, + sizeStatistics); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java index e0016123ab..97d5ca68c1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; /** @@ -73,6 +74,31 @@ void writePage( Encoding valuesEncoding) throws IOException; + /** + * writes a single page + * @param bytesInput the bytes for the page + * @param valueCount the number of values in that page + * @param rowCount the number of rows in that page + * @param statistics the statistics for that page + * @param sizeStatistics the size statistics for that page + * @param rlEncoding repetition level encoding + * @param dlEncoding definition level encoding + * @param valuesEncoding values encoding + * @throws IOException + */ + default void writePage( + BytesInput bytesInput, + int valueCount, + int rowCount, + Statistics statistics, + SizeStatistics sizeStatistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) + throws IOException { + throw new UnsupportedOperationException("writePage with SizeStatistics is not implemented"); + } + /** * writes a single page in the new format * @@ -97,6 +123,33 @@ void writePageV2( Statistics statistics) throws IOException; + /** + * writes a single page in the new format + * @param rowCount the number of rows in this page + * @param nullCount the number of null values (out of valueCount) + * @param valueCount the number of values in that page (there could be multiple values per row for repeated fields) + * @param repetitionLevels the repetition levels encoded in RLE without any size header + * @param definitionLevels the definition levels encoded in RLE without any size header + * @param dataEncoding the encoding for the data + * @param data the data encoded with dataEncoding + * @param statistics optional stats for this page + * @param sizeStatistics optional size stats for this page + * @throws IOException if there is an exception while writing page data + */ + default void writePageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput data, + Statistics statistics, + SizeStatistics sizeStatistics) + throws IOException { + throw new UnsupportedOperationException("writePageV2 with SizeStatistics is not implemented"); + } + /** * @return the current size used in the memory buffer for that column chunk */ diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/SizeStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/SizeStatistics.java new file mode 100644 index 0000000000..4241f23c34 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/SizeStatistics.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics; + +import it.unimi.dsi.fastutil.longs.LongArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; + +/** + * A structure for capturing metadata for estimating the unencoded, + * uncompressed size of data written. This is useful for readers to estimate + * how much memory is needed to reconstruct data in their memory model and for + * fine-grained filter push down on nested structures (the histograms contained + * in this structure can help determine the number of nulls at a particular + * nesting level and maximum length of lists). + */ +public class SizeStatistics { + + private final PrimitiveType type; + private long unencodedByteArrayDataBytes; + private final List repetitionLevelHistogram; + private final List definitionLevelHistogram; + + /** + * Whether the statistics has valid value. + * + * It is true by default. Only set to false while it fails to merge statistics. + */ + private boolean valid = true; + + /** + * Builder to create a SizeStatistics. + */ + public static class Builder { + private final PrimitiveType type; + private long unencodedByteArrayDataBytes; + private final long[] repetitionLevelHistogram; + private final long[] definitionLevelHistogram; + + /** + * Create a builder to create a SizeStatistics. + * + * @param type physical type of the column associated with this statistics + * @param maxRepetitionLevel maximum repetition level of the column + * @param maxDefinitionLevel maximum definition level of the column + */ + private Builder(PrimitiveType type, int maxRepetitionLevel, int maxDefinitionLevel) { + this.type = type; + this.unencodedByteArrayDataBytes = 0L; + repetitionLevelHistogram = new long[maxRepetitionLevel + 1]; + definitionLevelHistogram = new long[maxDefinitionLevel + 1]; + } + + /** + * Add repetition and definition level of a value to the statistics. + * It is called when value is null, or the column is not of BYTE_ARRAY type. + * + * @param repetitionLevel repetition level of the value + * @param definitionLevel definition level of the value + */ + public void add(int repetitionLevel, int definitionLevel) { + repetitionLevelHistogram[repetitionLevel]++; + definitionLevelHistogram[definitionLevel]++; + } + + /** + * Add repetition and definition level of a value to the statistics. + * It is called when the column is of BYTE_ARRAY type. + * + * @param repetitionLevel repetition level of the value + * @param definitionLevel definition level of the value + * @param value value of to be added + */ + public void add(int repetitionLevel, int definitionLevel, Binary value) { + add(repetitionLevel, definitionLevel); + if (type.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY && value != null) { + unencodedByteArrayDataBytes += value.length(); + } + } + + /** + * Build a SizeStatistics from the builder. + */ + public SizeStatistics build() { + return new SizeStatistics( + type, + unencodedByteArrayDataBytes, + new LongArrayList(repetitionLevelHistogram), + new LongArrayList(definitionLevelHistogram)); + } + } + + /** + * Create a builder to create a SizeStatistics. + * + * @param type physical type of the column associated with this statistics + * @param maxRepetitionLevel maximum repetition level of the column + * @param maxDefinitionLevel maximum definition level of the column + */ + public static Builder newBuilder(PrimitiveType type, int maxRepetitionLevel, int maxDefinitionLevel) { + return new Builder(type, maxRepetitionLevel, maxDefinitionLevel); + } + + /** + * Create a SizeStatistics. + * + * @param type physical type of the column associated with this statistics + * @param unencodedByteArrayDataBytes number of physical bytes stored for BYTE_ARRAY data values assuming no encoding + * @param repetitionLevelHistogram histogram for all repetition levels if non-empty + * @param definitionLevelHistogram histogram for all definition levels if non-empty + */ + public SizeStatistics( + PrimitiveType type, + long unencodedByteArrayDataBytes, + List repetitionLevelHistogram, + List definitionLevelHistogram) { + this.type = type; + this.unencodedByteArrayDataBytes = unencodedByteArrayDataBytes; + this.repetitionLevelHistogram = repetitionLevelHistogram; + this.definitionLevelHistogram = definitionLevelHistogram; + } + + /** + * Merge two SizeStatistics of the same column. + * It is used to merge size statistics from all pages of the same column chunk. + */ + public void mergeStatistics(SizeStatistics other) { + if (!valid) { + return; + } + + // Stop merge if other is invalid. + if (other == null || !other.isValid()) { + valid = false; + unencodedByteArrayDataBytes = 0L; + repetitionLevelHistogram.clear(); + definitionLevelHistogram.clear(); + return; + } + + Preconditions.checkArgument(type.equals(other.type), "Cannot merge SizeStatistics of different types"); + unencodedByteArrayDataBytes = Math.addExact(unencodedByteArrayDataBytes, other.unencodedByteArrayDataBytes); + for (int i = 0; i < repetitionLevelHistogram.size(); i++) { + repetitionLevelHistogram.set( + i, Math.addExact(repetitionLevelHistogram.get(i), other.repetitionLevelHistogram.get(i))); + } + for (int i = 0; i < definitionLevelHistogram.size(); i++) { + definitionLevelHistogram.set( + i, + Math.addExact( + definitionLevelHistogram.get(i), + other.getDefinitionLevelHistogram().get(i))); + } + } + + public PrimitiveType getType() { + return type; + } + + /** + * The number of physical bytes stored for BYTE_ARRAY data values assuming + * no encoding. This is exclusive of the bytes needed to store the length of + * each byte array. In other words, this field is equivalent to the `(size + * of PLAIN-ENCODING the byte array values) - (4 bytes * number of values + * written)`. To determine unencoded sizes of other types readers can use + * schema information multiplied by the number of non-null and null values. + * The number of null/non-null values can be inferred from the histograms + * below. + * + * For example, if a column chunk is dictionary-encoded with dictionary + * ["a", "bc", "cde"], and a data page contains the indices [0, 0, 1, 2], + * then this value for that data page should be 7 (1 + 1 + 2 + 3). + * + * This field should only be set for types that use BYTE_ARRAY as their + * physical type. + * + * It represents the field `unencoded_byte_array_data_bytes` in the + * {@link org.apache.parquet.format.SizeStatistics} + * + * @return unencoded and uncompressed byte size of the BYTE_ARRAY column, + * or empty for other types. + */ + public Optional getUnencodedByteArrayDataBytes() { + if (type.getPrimitiveTypeName() != PrimitiveType.PrimitiveTypeName.BINARY) { + return Optional.empty(); + } + return Optional.of(unencodedByteArrayDataBytes); + } + + /** + * When present, there is expected to be one element corresponding to each + * repetition (i.e. size=max repetition_level+1) where each element + * represents the number of times the repetition level was observed in the + * data. + * + * This field may be omitted if max_repetition_level is 0 without loss + * of information. + * + * It represents the field `repetition_level_histogram` in the + * {@link org.apache.parquet.format.SizeStatistics} + * + * @return repetition level histogram of all levels if not empty. + */ + public List getRepetitionLevelHistogram() { + return Collections.unmodifiableList(repetitionLevelHistogram); + } + + /** + * Same as repetition_level_histogram except for definition levels. + * + * This field may be omitted if max_definition_level is 0 or 1 without + * loss of information. + * + * It represents the field `definition_level_histogram` in the + * {@link org.apache.parquet.format.SizeStatistics} + * + * @return definition level histogram of all levels if not empty. + */ + public List getDefinitionLevelHistogram() { + return Collections.unmodifiableList(definitionLevelHistogram); + } + + /** + * @return a new independent statistics instance of this class. + */ + public SizeStatistics copy() { + return new SizeStatistics( + type, + unencodedByteArrayDataBytes, + new LongArrayList(repetitionLevelHistogram), + new LongArrayList(definitionLevelHistogram)); + } + + /** + * @return whether the statistics has valid value. + */ + public boolean isValid() { + return valid; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java index a1ece18e3f..86099717df 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java @@ -55,4 +55,20 @@ public interface ColumnIndex extends Visitor { * @return the list of the max values as {@link ByteBuffer}s; used for converting to the related thrift object */ public List getMaxValues(); + + /** + * @return the unmodifiable list of the repetition level histograms for each page concatenated together; used for + * converting to the related thrift object + */ + default List getRepetitionLevelHistogram() { + throw new UnsupportedOperationException("Repetition level histogram is not implemented"); + } + + /** + * @return the unmodifiable list of the definition level histograms for each page concatenated together; used for + * converting to the related thrift object + */ + default List getDefinitionLevelHistogram() { + throw new UnsupportedOperationException("Definition level histogram is not implemented"); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index 30b3f6fda1..bc5c809e08 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.function.IntPredicate; import org.apache.parquet.column.MinMax; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Eq; @@ -99,6 +100,10 @@ int translate(int arrayIndex) { private int[] pageIndexes; // might be null private long[] nullCounts; + // might be null + private long[] repLevelHistogram; + // might be null + private long[] defLevelHistogram; static String truncate(String str) { if (str.length() <= MAX_VALUE_LENGTH_FOR_TOSTRING) { @@ -160,6 +165,22 @@ public List getMaxValues() { return list; } + @Override + public List getRepetitionLevelHistogram() { + if (repLevelHistogram == null) { + return LongList.of(); + } + return LongLists.unmodifiable(LongArrayList.wrap(repLevelHistogram)); + } + + @Override + public List getDefinitionLevelHistogram() { + if (defLevelHistogram == null) { + return LongList.of(); + } + return LongLists.unmodifiable(LongArrayList.wrap(defLevelHistogram)); + } + @Override public String toString() { try (Formatter formatter = new Formatter()) { @@ -418,6 +439,9 @@ public ColumnIndex build() { @Override public void add(Statistics stats) {} + @Override + public void add(Statistics stats, SizeStatistics sizeStats) {} + @Override void addMinMax(Object min, Object max) {} @@ -458,6 +482,8 @@ public long getMinMaxSize() { private final LongList nullCounts = new LongArrayList(); private final IntList pageIndexes = new IntArrayList(); private int nextPageIndex; + private LongList repLevelHistogram = new LongArrayList(); + private LongList defLevelHistogram = new LongArrayList(); /** * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at @@ -516,10 +542,42 @@ public static ColumnIndex build( List nullCounts, List minValues, List maxValues) { + return build(type, boundaryOrder, nullPages, nullCounts, minValues, maxValues, null, null); + } + + /** + * @param type + * the primitive type + * @param boundaryOrder + * the boundary order of the min/max values + * @param nullPages + * the null pages (one boolean value for each page that signifies whether the page consists of nulls + * entirely) + * @param nullCounts + * the number of null values for each page + * @param minValues + * the min values for each page + * @param maxValues + * the max values for each page + * @param repLevelHistogram + * the repetition level histogram for all levels of each page + * @param defLevelHistogram + * the definition level histogram for all levels of each page + * @return the newly created {@link ColumnIndex} object based on the specified arguments + */ + public static ColumnIndex build( + PrimitiveType type, + BoundaryOrder boundaryOrder, + List nullPages, + List nullCounts, + List minValues, + List maxValues, + List repLevelHistogram, + List defLevelHistogram) { ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE); - builder.fill(nullPages, nullCounts, minValues, maxValues); + builder.fill(nullPages, nullCounts, minValues, maxValues, repLevelHistogram, defLevelHistogram); ColumnIndexBase columnIndex = builder.build(type); columnIndex.boundaryOrder = requireNonNull(boundaryOrder); return columnIndex; @@ -535,6 +593,18 @@ public static ColumnIndex build( * @param stats the statistics to be added */ public void add(Statistics stats) { + add(stats, null); + } + + /** + * Adds the data from the specified statistics to this builder + * + * @param stats + * the statistics to be added + * @param sizeStats + * the size statistics to be added + */ + public void add(Statistics stats, SizeStatistics sizeStats) { if (stats.hasNonNullValue()) { nullPages.add(false); Object min = stats.genericGetMin(); @@ -545,6 +615,16 @@ public void add(Statistics stats) { nullPages.add(true); } nullCounts.add(stats.getNumNulls()); + + // Collect repetition and definition level histograms only when all pages are valid. + if (sizeStats != null && sizeStats.isValid() && repLevelHistogram != null && defLevelHistogram != null) { + repLevelHistogram.addAll(sizeStats.getRepetitionLevelHistogram()); + defLevelHistogram.addAll(sizeStats.getDefinitionLevelHistogram()); + } else { + repLevelHistogram = null; + defLevelHistogram = null; + } + ++nextPageIndex; } @@ -553,7 +633,12 @@ public void add(Statistics stats) { abstract void addMinMax(Object min, Object max); private void fill( - List nullPages, List nullCounts, List minValues, List maxValues) { + List nullPages, + List nullCounts, + List minValues, + List maxValues, + List repLevelHistogram, + List defLevelHistogram) { clear(); int pageCount = nullPages.size(); if ((nullCounts != null && nullCounts.size() != pageCount) @@ -566,6 +651,18 @@ private void fill( minValues.size(), maxValues.size())); } + if (repLevelHistogram != null && repLevelHistogram.size() % pageCount != 0) { + /// FIXME: it is unfortunate that we don't know the max repetition level here. + throw new IllegalArgumentException(String.format( + "Size of repLevelHistogram:%d is not a multiply of pageCount:%d, ", + repLevelHistogram.size(), pageCount)); + } + if (defLevelHistogram != null && defLevelHistogram.size() % pageCount != 0) { + /// FIXME: it is unfortunate that we don't know the max definition level here. + throw new IllegalArgumentException(String.format( + "Size of defLevelHistogram:%d is not a multiply of pageCount:%d, ", + defLevelHistogram.size(), pageCount)); + } this.nullPages.addAll(nullPages); // Nullcounts is optional in the format if (nullCounts != null) { @@ -580,6 +677,14 @@ private void fill( pageIndexes.add(i); } } + + // Repetition and definition level histograms are optional in the format + if (repLevelHistogram != null) { + this.repLevelHistogram.addAll(repLevelHistogram); + } + if (defLevelHistogram != null) { + this.defLevelHistogram.addAll(defLevelHistogram); + } } /** @@ -609,6 +714,13 @@ private ColumnIndexBase build(PrimitiveType type) { columnIndex.nullCounts = nullCounts.toLongArray(); } columnIndex.pageIndexes = pageIndexes.toIntArray(); + // Repetition and definition level histograms are optional so keep them null if the builder has no values + if (repLevelHistogram != null && !repLevelHistogram.isEmpty()) { + columnIndex.repLevelHistogram = repLevelHistogram.toLongArray(); + } + if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { + columnIndex.defLevelHistogram = defLevelHistogram.toLongArray(); + } return columnIndex; } @@ -653,6 +765,8 @@ private void clear() { clearMinMax(); nextPageIndex = 0; pageIndexes.clear(); + repLevelHistogram.clear(); + defLevelHistogram.clear(); } abstract void clearMinMax(); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java index 74575be1bc..2f9bd8ce74 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.internal.column.columnindex; +import java.util.Optional; + /** * Offset index containing the offset and size of the page and the index of the first row in the page. * @@ -64,4 +66,14 @@ public default long getLastRowIndex(int pageIndex, long rowGroupRowCount) { int nextPageIndex = pageIndex + 1; return (nextPageIndex >= getPageCount() ? rowGroupRowCount : getFirstRowIndex(nextPageIndex)) - 1; } + + /** + * @param pageIndex + * the index of the page + * @return unencoded/uncompressed size for BYTE_ARRAY types; or empty for other types. + * Please note that even for BYTE_ARRAY types, this value might not have been written. + */ + default Optional getUnencodedByteArrayDataBytes(int pageIndex) { + throw new UnsupportedOperationException("Un-encoded byte array data bytes is not implemented"); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java index 7842c18c0f..b56f58d6fc 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java @@ -23,6 +23,7 @@ import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; import java.util.Formatter; +import java.util.Optional; /** * Builder implementation to create {@link OffsetIndex} objects during writing a parquet file. @@ -33,6 +34,7 @@ private static class OffsetIndexImpl implements OffsetIndex { private long[] offsets; private int[] compressedPageSizes; private long[] firstRowIndexes; + private long[] unencodedByteArrayDataBytes; @Override public String toString() { @@ -70,6 +72,14 @@ public long getFirstRowIndex(int pageIndex) { public int getPageOrdinal(int pageIndex) { return pageIndex; } + + @Override + public Optional getUnencodedByteArrayDataBytes(int pageIndex) { + if (unencodedByteArrayDataBytes == null || unencodedByteArrayDataBytes.length == 0) { + return Optional.empty(); + } + return Optional.of(unencodedByteArrayDataBytes[pageIndex]); + } } private static final OffsetIndexBuilder NO_OP_BUILDER = new OffsetIndexBuilder() { @@ -78,11 +88,22 @@ public void add(int compressedPageSize, long rowCount) {} @Override public void add(long offset, int compressedPageSize, long rowCount) {} + + @Override + public OffsetIndex build() { + return null; + } + + @Override + public OffsetIndex build(long shift) { + return null; + } }; private final LongList offsets = new LongArrayList(); private final IntList compressedPageSizes = new IntArrayList(); private final LongList firstRowIndexes = new LongArrayList(); + private final LongList unencodedDataBytes = new LongArrayList(); private long previousOffset; private int previousPageSize; private long previousRowIndex; @@ -112,7 +133,26 @@ private OffsetIndexBuilder() {} * @param rowCount the number of rows in the page */ public void add(int compressedPageSize, long rowCount) { - add(previousOffset + previousPageSize, compressedPageSize, previousRowIndex + previousRowCount); + add(compressedPageSize, rowCount, Optional.empty()); + } + + /** + * Adds the specified parameters to this builder. Used by the writers to building up {@link OffsetIndex} objects to be + * written to the Parquet file. + * + * @param compressedPageSize + * the size of the page (including header) + * @param rowCount + * the number of rows in the page + * @param unencodedDataBytes + * the number of bytes of unencoded data of BYTE_ARRAY type + */ + public void add(int compressedPageSize, long rowCount, Optional unencodedDataBytes) { + add( + previousOffset + previousPageSize, + compressedPageSize, + previousRowIndex + previousRowCount, + unencodedDataBytes); previousRowCount = rowCount; } @@ -125,12 +165,32 @@ public void add(int compressedPageSize, long rowCount) { * @param firstRowIndex the index of the first row in the page (within the row group) */ public void add(long offset, int compressedPageSize, long firstRowIndex) { + add(offset, compressedPageSize, firstRowIndex, Optional.empty()); + } + + /** + * Adds the specified parameters to this builder. Used by the metadata converter to building up {@link OffsetIndex} + * objects read from the Parquet file. + * + * @param offset + * the offset of the page in the file + * @param compressedPageSize + * the size of the page (including header) + * @param firstRowIndex + * the index of the first row in the page (within the row group) + * @param unencodedDataBytes + * the number of bytes of unencoded data of BYTE_ARRAY type + */ + public void add(long offset, int compressedPageSize, long firstRowIndex, Optional unencodedDataBytes) { previousOffset = offset; offsets.add(offset); previousPageSize = compressedPageSize; compressedPageSizes.add(compressedPageSize); previousRowIndex = firstRowIndex; firstRowIndexes.add(firstRowIndex); + if (unencodedDataBytes.isPresent()) { + this.unencodedDataBytes.add(unencodedDataBytes.get()); + } } /** @@ -149,6 +209,9 @@ public OffsetIndexBuilder fromOffsetIndex(OffsetIndex offsetIndex) { this.offsets.addAll(new LongArrayList(offsetIndexImpl.offsets)); this.compressedPageSizes.addAll(new IntArrayList(offsetIndexImpl.compressedPageSizes)); this.firstRowIndexes.addAll(new LongArrayList(offsetIndexImpl.firstRowIndexes)); + if (offsetIndexImpl.unencodedByteArrayDataBytes != null) { + this.unencodedDataBytes.addAll(new LongArrayList(offsetIndexImpl.unencodedByteArrayDataBytes)); + } this.previousOffset = 0; this.previousPageSize = 0; this.previousRowIndex = 0; @@ -178,6 +241,12 @@ public OffsetIndex build(long shift) { offsetIndex.offsets = offsets; offsetIndex.compressedPageSizes = compressedPageSizes.toIntArray(); offsetIndex.firstRowIndexes = firstRowIndexes.toLongArray(); + if (!unencodedDataBytes.isEmpty()) { + if (unencodedDataBytes.size() != this.offsets.size()) { + throw new IllegalStateException("unencodedDataBytes does not have the same size as offsets"); + } + offsetIndex.unencodedByteArrayDataBytes = unencodedDataBytes.toLongArray(); + } return offsetIndex; } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index f10b32f0fb..1ce30006d3 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -231,8 +231,12 @@ public void testPageSize() { for (int i = 0; i < 123; ++i) { // Writing 10 values per record for (int j = 0; j < 10; ++j) { - binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2); - int32ColWriter.write(42, j == 0 ? 0 : 2, 2); + binaryColWriter.write( + Binary.fromString("aaaaaaaaaaaa"), + j == 0 ? 0 : binaryCol.getMaxRepetitionLevel(), + binaryCol.getMaxDefinitionLevel()); + int32ColWriter.write( + 42, j == 0 ? 0 : int32Col.getMaxRepetitionLevel(), int32Col.getMaxDefinitionLevel()); } writeStore.endRecord(); } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java index 3def6cf466..4826987227 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java @@ -30,6 +30,7 @@ import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.io.ParquetEncodingException; import org.slf4j.Logger; @@ -81,6 +82,20 @@ public void writePage( writePage(bytesInput, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding); } + @Override + public void writePage( + BytesInput bytesInput, + int valueCount, + int rowCount, + Statistics statistics, + SizeStatistics sizeStatistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) + throws IOException { + writePage(bytesInput, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding); + } + @Override public void writePageV2( int rowCount, @@ -110,6 +125,22 @@ public void writePageV2( LOG.debug("page written for {} bytes and {} records", size, valueCount); } + @Override + public void writePageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput data, + Statistics statistics, + SizeStatistics sizeStatistics) + throws IOException { + writePageV2( + rowCount, nullCount, valueCount, repetitionLevels, definitionLevels, dataEncoding, data, statistics); + } + @Override public long getMemSize() { return memSize; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestSizeStatistics.java b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestSizeStatistics.java new file mode 100644 index 0000000000..6c166b0e7f --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestSizeStatistics.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics; + +import java.util.Arrays; +import java.util.Optional; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestSizeStatistics { + + @Test + public void testAddBinaryType() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named("a"); + final int maxRepetitionLevel = 2; + final int maxDefinitionLevel = 2; + SizeStatistics.Builder builder = SizeStatistics.newBuilder(type, maxRepetitionLevel, maxDefinitionLevel); + builder.add(0, 2, Binary.fromString("a")); + builder.add(1, 2, Binary.fromString("")); + builder.add(2, 2, Binary.fromString("bb")); + builder.add(0, 0); + builder.add(0, 1); + builder.add(1, 0); + builder.add(1, 1); + SizeStatistics statistics = builder.build(); + Assert.assertEquals(Optional.of(3L), statistics.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(3L, 3L, 1L), statistics.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(2L, 2L, 3L), statistics.getDefinitionLevelHistogram()); + } + + @Test + public void testAddNonBinaryType() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(2) + .named("a"); + final int maxRepetitionLevel = 1; + final int maxDefinitionLevel = 1; + SizeStatistics.Builder builder = SizeStatistics.newBuilder(type, maxRepetitionLevel, maxDefinitionLevel); + builder.add(0, 1, Binary.fromString("aa")); + builder.add(0, 1, Binary.fromString("aa")); + builder.add(1, 1, Binary.fromString("aa")); + builder.add(1, 0); + builder.add(1, 0); + builder.add(1, 0); + SizeStatistics statistics = builder.build(); + Assert.assertEquals(Optional.empty(), statistics.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(2L, 4L), statistics.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(3L, 3L), statistics.getDefinitionLevelHistogram()); + } + + @Test + public void testMergeStatistics() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named("a"); + final int maxRepetitionLevel = 2; + final int maxDefinitionLevel = 2; + SizeStatistics.Builder builder1 = SizeStatistics.newBuilder(type, maxRepetitionLevel, maxDefinitionLevel); + builder1.add(0, 0, Binary.fromString("a")); + builder1.add(1, 1, Binary.fromString("b")); + builder1.add(2, 2, Binary.fromString("c")); + SizeStatistics statistics1 = builder1.build(); + SizeStatistics.Builder builder2 = SizeStatistics.newBuilder(type, maxRepetitionLevel, maxDefinitionLevel); + builder2.add(0, 1, Binary.fromString("d")); + builder2.add(0, 1, Binary.fromString("e")); + SizeStatistics statistics2 = builder2.build(); + statistics1.mergeStatistics(statistics2); + Assert.assertEquals(Optional.of(5L), statistics1.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(3L, 1L, 1L), statistics1.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(1L, 3L, 1L), statistics1.getDefinitionLevelHistogram()); + } + + @Test + public void testMergeThrowException() { + // Merge different types. + PrimitiveType type1 = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named("a"); + PrimitiveType type2 = + Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("a"); + SizeStatistics.Builder builder1 = SizeStatistics.newBuilder(type1, 1, 1); + SizeStatistics.Builder builder2 = SizeStatistics.newBuilder(type2, 1, 1); + SizeStatistics statistics1 = builder1.build(); + SizeStatistics statistics2 = builder2.build(); + Assert.assertThrows(IllegalArgumentException.class, () -> statistics1.mergeStatistics(statistics2)); + } + + @Test + public void testCopyStatistics() { + PrimitiveType type = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named("a"); + final int maxRepetitionLevel = 2; + final int maxDefinitionLevel = 2; + SizeStatistics.Builder builder = SizeStatistics.newBuilder(type, maxRepetitionLevel, maxDefinitionLevel); + builder.add(0, 0, Binary.fromString("a")); + builder.add(1, 1, Binary.fromString("b")); + builder.add(2, 2, Binary.fromString("c")); + SizeStatistics statistics = builder.build(); + SizeStatistics copy = statistics.copy(); + Assert.assertEquals(Optional.of(3L), copy.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(1L, 1L, 1L), copy.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(1L, 1L, 1L), copy.getDefinitionLevelHistogram()); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index e2e109a871..e752b4ceea 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -101,6 +101,7 @@ import org.apache.parquet.format.PageType; import org.apache.parquet.format.RowGroup; import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.format.SizeStatistics; import org.apache.parquet.format.SplitBlockAlgorithm; import org.apache.parquet.format.Statistics; import org.apache.parquet.format.StringType; @@ -575,6 +576,10 @@ private void addRowGroup( if (columnMetaData.getEncodingStats() != null) { metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats())); } + if (columnMetaData.getSizeStatistics() != null + && columnMetaData.getSizeStatistics().isValid()) { + metaData.setSize_statistics(toParquetSizeStatistics(columnMetaData.getSizeStatistics())); + } if (!encryptMetaData) { columnChunk.setMeta_data(metaData); @@ -1601,7 +1606,8 @@ public ColumnChunkMetaData buildColumnChunkMetaData( metaData.dictionary_page_offset, metaData.num_values, metaData.total_compressed_size, - metaData.total_uncompressed_size); + metaData.total_uncompressed_size, + fromParquetSizeStatistics(metaData.size_statistics, type)); } public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException { @@ -2256,6 +2262,14 @@ public static ColumnIndex toParquetColumnIndex( columnIndex.getMaxValues(), toParquetBoundaryOrder(columnIndex.getBoundaryOrder())); parquetColumnIndex.setNull_counts(columnIndex.getNullCounts()); + List repLevelHistogram = columnIndex.getRepetitionLevelHistogram(); + if (repLevelHistogram != null && !repLevelHistogram.isEmpty()) { + parquetColumnIndex.setRepetition_level_histograms(repLevelHistogram); + } + List defLevelHistogram = columnIndex.getDefinitionLevelHistogram(); + if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { + parquetColumnIndex.setDefinition_level_histograms(defLevelHistogram); + } return parquetColumnIndex; } @@ -2270,27 +2284,47 @@ public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromPar parquetColumnIndex.getNull_pages(), parquetColumnIndex.getNull_counts(), parquetColumnIndex.getMin_values(), - parquetColumnIndex.getMax_values()); + parquetColumnIndex.getMax_values(), + parquetColumnIndex.getRepetition_level_histograms(), + parquetColumnIndex.getDefinition_level_histograms()); } public static OffsetIndex toParquetOffsetIndex( org.apache.parquet.internal.column.columnindex.OffsetIndex offsetIndex) { List pageLocations = new ArrayList<>(offsetIndex.getPageCount()); + List unencodedByteArrayDataBytes = new ArrayList<>(offsetIndex.getPageCount()); for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) { pageLocations.add(new PageLocation( offsetIndex.getOffset(i), offsetIndex.getCompressedPageSize(i), offsetIndex.getFirstRowIndex(i))); + Optional unencodedByteArrayDataType = offsetIndex.getUnencodedByteArrayDataBytes(i); + if (unencodedByteArrayDataType.isPresent() && unencodedByteArrayDataBytes.size() == i) { + unencodedByteArrayDataBytes.add(unencodedByteArrayDataType.get()); + } + } + OffsetIndex parquetOffsetIndex = new OffsetIndex(pageLocations); + if (unencodedByteArrayDataBytes.size() == pageLocations.size()) { + // Do not add the field if we are missing that from any page. + parquetOffsetIndex.setUnencoded_byte_array_data_bytes(unencodedByteArrayDataBytes); } - return new OffsetIndex(pageLocations); + return parquetOffsetIndex; } public static org.apache.parquet.internal.column.columnindex.OffsetIndex fromParquetOffsetIndex( OffsetIndex parquetOffsetIndex) { + boolean hasUnencodedByteArrayDataBytes = parquetOffsetIndex.isSetUnencoded_byte_array_data_bytes() + && parquetOffsetIndex.unencoded_byte_array_data_bytes.size() + == parquetOffsetIndex.page_locations.size(); OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); - for (PageLocation pageLocation : parquetOffsetIndex.getPage_locations()) { + for (int i = 0; i < parquetOffsetIndex.page_locations.size(); ++i) { + PageLocation pageLocation = parquetOffsetIndex.page_locations.get(i); + Optional unencodedByteArrayDataBytes = hasUnencodedByteArrayDataBytes + ? Optional.of(parquetOffsetIndex.unencoded_byte_array_data_bytes.get(i)) + : Optional.empty(); builder.add( pageLocation.getOffset(), pageLocation.getCompressed_page_size(), - pageLocation.getFirst_row_index()); + pageLocation.getFirst_row_index(), + unencodedByteArrayDataBytes); } return builder.build(); } @@ -2322,4 +2356,30 @@ public static BloomFilterHeader toBloomFilterHeader( bloomFilter.getAlgorithm(), bloomFilter.getHashStrategy(), bloomFilter.getCompression())); } } + + public static org.apache.parquet.column.statistics.SizeStatistics fromParquetSizeStatistics( + SizeStatistics statistics, PrimitiveType type) { + if (statistics == null) { + return null; + } + return new org.apache.parquet.column.statistics.SizeStatistics( + type, + statistics.getUnencoded_byte_array_data_bytes(), + statistics.getRepetition_level_histogram(), + statistics.getDefinition_level_histogram()); + } + + public static SizeStatistics toParquetSizeStatistics(org.apache.parquet.column.statistics.SizeStatistics stats) { + if (stats == null) { + return null; + } + SizeStatistics formatStats = new SizeStatistics(); + if (stats.getUnencodedByteArrayDataBytes().isPresent()) { + formatStats.setUnencoded_byte_array_data_bytes( + stats.getUnencodedByteArrayDataBytes().get()); + } + formatStats.setRepetition_level_histogram(stats.getRepetitionLevelHistogram()); + formatStats.setDefinition_level_histogram(stats.getDefinitionLevelHistogram()); + return formatStats; + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index cb8ca25cf8..5599d2509b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.zip.CRC32; import org.apache.parquet.bytes.ByteBufferAllocator; @@ -37,6 +38,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; @@ -88,6 +90,7 @@ private static final class ColumnChunkPageWriter implements PageWriter, BloomFil private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; private Statistics totalStatistics; + private final SizeStatistics totalSizeStatistics; private final ByteBufferReleaser releaser; private final CRC32 crc; @@ -119,9 +122,11 @@ private ColumnChunkPageWriter( this.buf = new ConcatenatingByteBufferCollector(allocator); this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + this.totalSizeStatistics = SizeStatistics.newBuilder( + path.getPrimitiveType(), path.getMaxRepetitionLevel(), path.getMaxDefinitionLevel()) + .build(); this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; this.crc = pageWriteChecksumEnabled ? new CRC32() : null; - this.headerBlockEncryptor = headerBlockEncryptor; this.pageBlockEncryptor = pageBlockEncryptor; this.fileAAD = fileAAD; @@ -164,7 +169,21 @@ public void writePage( BytesInput bytes, int valueCount, int rowCount, - Statistics statistics, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) + throws IOException { + writePage(bytes, valueCount, rowCount, statistics, null, rlEncoding, dlEncoding, valuesEncoding); + } + + @Override + public void writePage( + BytesInput bytes, + int valueCount, + int rowCount, + Statistics statistics, + SizeStatistics sizeStatistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) @@ -221,8 +240,11 @@ public void writePage( this.totalValueCount += valueCount; this.pageCount += 1; - mergeColumnStatistics(statistics); - offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount); + mergeColumnStatistics(statistics, sizeStatistics); + offsetIndexBuilder.add( + toIntWithCheck(tempOutputStream.size() + compressedSize), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); // by concatenating before collecting instead of collecting twice, // we only allocate one buffer to copy into instead of multiple. @@ -243,6 +265,30 @@ public void writePageV2( BytesInput data, Statistics statistics) throws IOException { + writePageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + data, + statistics, + /*size_statistics=*/ null); + } + + @Override + public void writePageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput data, + Statistics statistics, + SizeStatistics sizeStatistics) + throws IOException { pageOrdinal++; int rlByteLength = toIntWithCheck(repetitionLevels.size()); @@ -303,8 +349,11 @@ public void writePageV2( this.totalValueCount += valueCount; this.pageCount += 1; - mergeColumnStatistics(statistics); - offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount); + mergeColumnStatistics(statistics, sizeStatistics); + offsetIndexBuilder.add( + toIntWithCheck((long) tempOutputStream.size() + compressedSize), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); // by concatenating before collecting instead of collecting twice, // we only allocate one buffer to copy into instead of multiple. @@ -321,7 +370,13 @@ private int toIntWithCheck(long size) { return (int) size; } - private void mergeColumnStatistics(Statistics statistics) { + private void mergeColumnStatistics(Statistics statistics, SizeStatistics sizeStatistics) { + totalSizeStatistics.mergeStatistics(sizeStatistics); + if (!totalSizeStatistics.isValid()) { + // Set page size statistics to null to clear state in the ColumnIndexBuilder. + sizeStatistics = null; + } + if (totalStatistics != null && totalStatistics.isEmpty()) { return; } @@ -335,10 +390,10 @@ private void mergeColumnStatistics(Statistics statistics) { } else if (totalStatistics == null) { // Copying the statistics if it is not initialized yet, so we have the correct typed one totalStatistics = statistics.copy(); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } else { totalStatistics.mergeStatistics(statistics); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } } @@ -358,6 +413,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { uncompressedLength, compressedLength, totalStatistics, + totalSizeStatistics, columnIndexBuilder, offsetIndexBuilder, bloomFilter, @@ -374,6 +430,7 @@ public void writeToFileWriter(ParquetFileWriter writer) throws IOException { uncompressedLength, compressedLength, totalStatistics, + totalSizeStatistics, columnIndexBuilder, offsetIndexBuilder, bloomFilter, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java index 4e3674b556..4fb57ee407 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Formatter; import java.util.List; +import java.util.Optional; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.RowRanges; @@ -100,6 +101,11 @@ public long getLastRowIndex(int pageIndex, long totalRowCount) { - 1; } + @Override + public Optional getUnencodedByteArrayDataBytes(int pageIndex) { + return offsetIndex.getUnencodedByteArrayDataBytes(indexMap[pageIndex]); + } + @Override public String toString() { try (Formatter formatter = new Formatter()) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 5c7612652b..5344aa315c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; @@ -53,6 +54,7 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.crypto.AesCipher; @@ -154,6 +156,7 @@ public static enum Mode { private long uncompressedLength; private long compressedLength; private Statistics currentStatistics; // accumulated in writePage(s) + private SizeStatistics currentSizeStatistics; // accumulated in writePage(s) private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; @@ -615,6 +618,11 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio uncompressedLength = 0; // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one currentStatistics = null; + currentSizeStatistics = SizeStatistics.newBuilder( + descriptor.getPrimitiveType(), + descriptor.getMaxRepetitionLevel(), + descriptor.getMaxDefinitionLevel()) + .build(); columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); @@ -747,6 +755,7 @@ public void writeDataPage( dlEncoding, valuesEncoding, null, + null, null); } @@ -783,6 +792,47 @@ public void writeDataPage( dlEncoding, valuesEncoding, null, + null, + null); + } + + /** + * Writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics the statistics of the page + * @param rowCount the number of rows in the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + long rowCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD) + throws IOException { + writeDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rowCount, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, null); } @@ -798,7 +848,8 @@ public void writeDataPage( * @param dlEncoding encoding of the definition level * @param valuesEncoding encoding of values * @param metadataBlockEncryptor encryptor for block data - * @param pageHeaderAAD pageHeader AAD + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page * @throws IOException if any I/O error occurs during writing the file */ public void writeDataPage( @@ -811,7 +862,8 @@ public void writeDataPage( Encoding dlEncoding, Encoding valuesEncoding, BlockCipher.Encryptor metadataBlockEncryptor, - byte[] pageHeaderAAD) + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) throws IOException { long beforeHeader = out.getPos(); innerWriteDataPage( @@ -823,8 +875,12 @@ public void writeDataPage( dlEncoding, valuesEncoding, metadataBlockEncryptor, - pageHeaderAAD); - offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); + pageHeaderAAD, + sizeStatistics); + offsetIndexBuilder.add( + (int) (out.getPos() - beforeHeader), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } private void innerWriteDataPage( @@ -836,7 +892,8 @@ private void innerWriteDataPage( Encoding dlEncoding, Encoding valuesEncoding, BlockCipher.Encryptor metadataBlockEncryptor, - byte[] pageHeaderAAD) + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) throws IOException { writeDataPage( valueCount, @@ -847,7 +904,8 @@ private void innerWriteDataPage( dlEncoding, valuesEncoding, metadataBlockEncryptor, - pageHeaderAAD); + pageHeaderAAD, + sizeStatistics); } /** @@ -875,6 +933,45 @@ public void writeDataPage( BlockCipher.Encryptor metadataBlockEncryptor, byte[] pageHeaderAAD) throws IOException { + writeDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + null); + } + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param statistics statistics for the page + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page + * @throws IOException if there is an error while writing + */ + public void writeDataPage( + int valueCount, + int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) + throws IOException { state = state.write(); long beforeHeader = out.getPos(); if (currentChunkFirstDataPage < 0) { @@ -914,7 +1011,7 @@ public void writeDataPage( LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); bytes.writeAllTo(out); - mergeColumnStatistics(statistics); + mergeColumnStatistics(statistics, sizeStatistics); encodingStatsBuilder.addDataEncoding(valuesEncoding); currentEncodings.add(rlEncoding); @@ -968,6 +1065,7 @@ public void writeDataPageV2( uncompressedDataSize, statistics, null, + null, null); } @@ -1000,6 +1098,52 @@ public void writeDataPageV2( BlockCipher.Encryptor metadataBlockEncryptor, byte[] pageHeaderAAD) throws IOException { + writeDataPageV2( + rowCount, + nullCount, + valueCount, + repetitionLevels, + definitionLevels, + dataEncoding, + compressedData, + uncompressedDataSize, + statistics, + metadataBlockEncryptor, + pageHeaderAAD, + null); + } + + /** + * Writes a single v2 data page + * + * @param rowCount count of rows + * @param nullCount count of nulls + * @param valueCount count of values + * @param repetitionLevels repetition level bytes + * @param definitionLevels definition level bytes + * @param dataEncoding encoding for data + * @param compressedData compressed data bytes + * @param uncompressedDataSize the size of uncompressed data + * @param statistics the statistics of the page + * @param metadataBlockEncryptor encryptor for block data + * @param pageHeaderAAD pageHeader AAD + * @param sizeStatistics size statistics for the page + * @throws IOException if any I/O error occurs during writing the file + */ + public void writeDataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput compressedData, + int uncompressedDataSize, + Statistics statistics, + BlockCipher.Encryptor metadataBlockEncryptor, + byte[] pageHeaderAAD, + SizeStatistics sizeStatistics) + throws IOException { state = state.write(); int rlByteLength = toIntWithCheck(repetitionLevels.size()); int dlByteLength = toIntWithCheck(definitionLevels.size()); @@ -1056,14 +1200,17 @@ public void writeDataPageV2( this.uncompressedLength += uncompressedSize + headersSize; this.compressedLength += compressedSize + headersSize; - mergeColumnStatistics(statistics); + mergeColumnStatistics(statistics, sizeStatistics); currentEncodings.add(dataEncoding); encodingStatsBuilder.addDataEncoding(dataEncoding); BytesInput.concat(repetitionLevels, definitionLevels, compressedData).writeAllTo(out); - offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); + offsetIndexBuilder.add( + (int) (out.getPos() - beforeHeader), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } private void crcUpdate(BytesInput bytes) { @@ -1100,6 +1247,7 @@ void writeColumnChunk( long uncompressedTotalPageSize, long compressedTotalPageSize, Statistics totalStats, + SizeStatistics totalSizeStats, ColumnIndexBuilder columnIndexBuilder, OffsetIndexBuilder offsetIndexBuilder, BloomFilter bloomFilter, @@ -1116,6 +1264,7 @@ void writeColumnChunk( uncompressedTotalPageSize, compressedTotalPageSize, totalStats, + totalSizeStats, columnIndexBuilder, offsetIndexBuilder, bloomFilter, @@ -1137,6 +1286,7 @@ void writeColumnChunk( long uncompressedTotalPageSize, long compressedTotalPageSize, Statistics totalStats, + SizeStatistics totalSizeStats, ColumnIndexBuilder columnIndexBuilder, OffsetIndexBuilder offsetIndexBuilder, BloomFilter bloomFilter, @@ -1193,6 +1343,7 @@ void writeColumnChunk( currentEncodings.addAll(dlEncodings); currentEncodings.addAll(dataEncodings); currentStatistics = totalStats; + currentSizeStatistics = totalSizeStats; this.columnIndexBuilder = columnIndexBuilder; this.offsetIndexBuilder = offsetIndexBuilder; @@ -1238,7 +1389,8 @@ public void endColumn() throws IOException { currentChunkDictionaryPageOffset, currentChunkValueCount, compressedLength, - uncompressedLength)); + uncompressedLength, + currentSizeStatistics)); this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); this.uncompressedLength = 0; this.compressedLength = 0; @@ -1560,7 +1712,15 @@ private int toIntWithCheck(long size) { return (int) size; } - private void mergeColumnStatistics(Statistics statistics) { + private void mergeColumnStatistics(Statistics statistics, SizeStatistics sizeStatistics) { + Preconditions.checkState(currentSizeStatistics != null, "Aggregate size statistics should not be null"); + currentSizeStatistics.mergeStatistics(sizeStatistics); + if (!currentSizeStatistics.isValid()) { + // Set page size statistics to null to clear state in the ColumnIndexBuilder. + sizeStatistics = null; + } + + // Do not merge statistics and build column index if any page statistics is invalid. if (currentStatistics != null && currentStatistics.isEmpty()) { return; } @@ -1574,10 +1734,10 @@ private void mergeColumnStatistics(Statistics statistics) { } else if (currentStatistics == null) { // Copying the statistics if it is not initialized yet, so we have the correct typed one currentStatistics = statistics.copy(); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } else { currentStatistics.mergeStatistics(statistics); - columnIndexBuilder.add(statistics); + columnIndexBuilder.add(statistics, sizeStatistics); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 62564c7974..3dac15ba7c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -28,6 +28,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.statistics.BooleanStatistics; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.InternalColumnDecryptionSetup; @@ -155,6 +156,49 @@ public static ColumnChunkMetaData get( long valueCount, long totalSize, long totalUncompressedSize) { + return get( + path, + type, + codec, + encodingStats, + encodings, + statistics, + firstDataPage, + dictionaryPageOffset, + valueCount, + totalSize, + totalUncompressedSize, + null); + } + + /** + * @param path the path of this column in the write schema + * @param type primitive type for this column + * @param codec the compression codec used to compress + * @param encodingStats EncodingStats for the encodings used in this column + * @param encodings a set of encoding used in this column + * @param statistics statistics for the data in this column + * @param firstDataPage offset of the first non-dictionary page + * @param dictionaryPageOffset offset of the dictionary page + * @param valueCount number of values + * @param totalSize total compressed size + * @param totalUncompressedSize uncompressed data size + * @param sizeStatistics size statistics for the data in this column + * @return a column chunk metadata instance + */ + public static ColumnChunkMetaData get( + ColumnPath path, + PrimitiveType type, + CompressionCodecName codec, + EncodingStats encodingStats, + Set encodings, + Statistics statistics, + long firstDataPage, + long dictionaryPageOffset, + long valueCount, + long totalSize, + long totalUncompressedSize, + SizeStatistics sizeStatistics) { // to save space we store those always positive longs in ints when they fit. if (positiveLongFitsInAnInt(firstDataPage) @@ -173,7 +217,8 @@ && positiveLongFitsInAnInt(totalUncompressedSize)) { dictionaryPageOffset, valueCount, totalSize, - totalUncompressedSize); + totalUncompressedSize, + sizeStatistics); } else { return new LongColumnChunkMetaData( path, @@ -186,7 +231,8 @@ && positiveLongFitsInAnInt(totalUncompressedSize)) { dictionaryPageOffset, valueCount, totalSize, - totalUncompressedSize); + totalUncompressedSize, + sizeStatistics); } } @@ -336,6 +382,15 @@ public PrimitiveType getPrimitiveType() { */ public abstract Statistics getStatistics(); + /** + * Method should be considered private + * + * @return the size stats for this column + */ + public SizeStatistics getSizeStatistics() { + throw new UnsupportedOperationException("SizeStatistics is not implemented"); + } + /** * Method should be considered private * @@ -455,6 +510,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { private final int totalSize; private final int totalUncompressedSize; private final Statistics statistics; + private final SizeStatistics sizeStatistics; /** * @param path column identifier @@ -467,6 +523,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { * @param valueCount * @param totalSize * @param totalUncompressedSize + * @param sizeStatistics */ IntColumnChunkMetaData( ColumnPath path, @@ -479,7 +536,8 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { long dictionaryPageOffset, long valueCount, long totalSize, - long totalUncompressedSize) { + long totalUncompressedSize, + SizeStatistics sizeStatistics) { super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings)); this.firstDataPage = positiveLongToInt(firstDataPage); this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset); @@ -487,6 +545,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { this.totalSize = positiveLongToInt(totalSize); this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize); this.statistics = statistics; + this.sizeStatistics = sizeStatistics; } /** @@ -553,6 +612,14 @@ public long getTotalSize() { public Statistics getStatistics() { return statistics; } + + /** + * @return the size stats for this column + */ + @Override + public SizeStatistics getSizeStatistics() { + return sizeStatistics; + } } class LongColumnChunkMetaData extends ColumnChunkMetaData { @@ -563,6 +630,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { private final long totalSize; private final long totalUncompressedSize; private final Statistics statistics; + private final SizeStatistics sizeStatistics; /** * @param path column identifier @@ -575,6 +643,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { * @param valueCount * @param totalSize * @param totalUncompressedSize + * @param sizeStatistics */ LongColumnChunkMetaData( ColumnPath path, @@ -587,7 +656,8 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { long dictionaryPageOffset, long valueCount, long totalSize, - long totalUncompressedSize) { + long totalUncompressedSize, + SizeStatistics sizeStatistics) { super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings)); this.firstDataPageOffset = firstDataPageOffset; this.dictionaryPageOffset = dictionaryPageOffset; @@ -595,6 +665,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { this.totalSize = totalSize; this.totalUncompressedSize = totalUncompressedSize; this.statistics = statistics; + this.sizeStatistics = sizeStatistics; } /** @@ -638,6 +709,14 @@ public long getTotalSize() { public Statistics getStatistics() { return statistics; } + + /** + * @return the size stats for this column + */ + @Override + public SizeStatistics getSizeStatistics() { + return sizeStatistics; + } } class EncryptedColumnChunkMetaData extends ColumnChunkMetaData { @@ -754,6 +833,12 @@ public Statistics getStatistics() { return shadowColumnChunkMetaData.getStatistics(); } + @Override + public SizeStatistics getSizeStatistics() { + decryptIfNeeded(); + return shadowColumnChunkMetaData.getSizeStatistics(); + } + /** * @return whether or not this column is encrypted */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index d03b55172b..4dcede624f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -52,6 +52,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import it.unimi.dsi.fastutil.longs.LongArrayList; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -65,6 +66,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.TreeSet; @@ -83,6 +85,7 @@ import org.apache.parquet.column.statistics.FloatStatistics; import org.apache.parquet.column.statistics.IntStatistics; import org.apache.parquet.column.statistics.LongStatistics; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.example.Paper; import org.apache.parquet.example.data.Group; @@ -1333,69 +1336,94 @@ public void testColumnOrders() throws IOException { @Test public void testOffsetIndexConversion() { - OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); - builder.add(1000, 10000, 0); - builder.add(22000, 12000, 100); - OffsetIndex offsetIndex = ParquetMetadataConverter.fromParquetOffsetIndex( - ParquetMetadataConverter.toParquetOffsetIndex(builder.build(100000))); - assertEquals(2, offsetIndex.getPageCount()); - assertEquals(101000, offsetIndex.getOffset(0)); - assertEquals(10000, offsetIndex.getCompressedPageSize(0)); - assertEquals(0, offsetIndex.getFirstRowIndex(0)); - assertEquals(122000, offsetIndex.getOffset(1)); - assertEquals(12000, offsetIndex.getCompressedPageSize(1)); - assertEquals(100, offsetIndex.getFirstRowIndex(1)); + for (boolean withSizeStats : new boolean[] {false, true}) { + OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); + builder.add(1000, 10000, 0, withSizeStats ? Optional.of(11L) : Optional.empty()); + builder.add(22000, 12000, 100, withSizeStats ? Optional.of(22L) : Optional.empty()); + OffsetIndex offsetIndex = ParquetMetadataConverter.fromParquetOffsetIndex( + ParquetMetadataConverter.toParquetOffsetIndex(builder.build(100000))); + assertEquals(2, offsetIndex.getPageCount()); + assertEquals(101000, offsetIndex.getOffset(0)); + assertEquals(10000, offsetIndex.getCompressedPageSize(0)); + assertEquals(0, offsetIndex.getFirstRowIndex(0)); + assertEquals(122000, offsetIndex.getOffset(1)); + assertEquals(12000, offsetIndex.getCompressedPageSize(1)); + assertEquals(100, offsetIndex.getFirstRowIndex(1)); + if (withSizeStats) { + assertEquals(Optional.of(11L), offsetIndex.getUnencodedByteArrayDataBytes(0)); + assertEquals(Optional.of(22L), offsetIndex.getUnencodedByteArrayDataBytes(1)); + } else { + assertFalse(offsetIndex.getUnencodedByteArrayDataBytes(0).isPresent()); + assertFalse(offsetIndex.getUnencodedByteArrayDataBytes(1).isPresent()); + } + } } @Test public void testColumnIndexConversion() { - PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); - Statistics stats = Statistics.createStats(type); - stats.incrementNumNulls(16); - stats.updateStats(-100l); - stats.updateStats(100l); - builder.add(stats); - stats = Statistics.createStats(type); - stats.incrementNumNulls(111); - builder.add(stats); - stats = Statistics.createStats(type); - stats.updateStats(200l); - stats.updateStats(500l); - builder.add(stats); - org.apache.parquet.format.ColumnIndex parquetColumnIndex = - ParquetMetadataConverter.toParquetColumnIndex(type, builder.build()); - ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex); - assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); - assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); - assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts())); - assertTrue(Arrays.asList( - ByteBuffer.wrap(BytesUtils.longToBytes(-100l)), - ByteBuffer.allocate(0), - ByteBuffer.wrap(BytesUtils.longToBytes(200l))) - .equals(columnIndex.getMinValues())); - assertTrue(Arrays.asList( - ByteBuffer.wrap(BytesUtils.longToBytes(100l)), - ByteBuffer.allocate(0), - ByteBuffer.wrap(BytesUtils.longToBytes(500l))) - .equals(columnIndex.getMaxValues())); - - assertNull( - "Should handle null column index", - ParquetMetadataConverter.toParquetColumnIndex( - Types.required(PrimitiveTypeName.INT32).named("test_int32"), null)); - assertNull( - "Should ignore unsupported types", - ParquetMetadataConverter.toParquetColumnIndex( - Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex)); - assertNull( - "Should ignore unsupported types", - ParquetMetadataConverter.fromParquetColumnIndex( - Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) - .length(12) - .as(OriginalType.INTERVAL) - .named("test_interval"), - parquetColumnIndex)); + for (boolean withSizeStats : new boolean[] {false, true}) { + PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64"); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); + Statistics stats = Statistics.createStats(type); + stats.incrementNumNulls(16); + stats.updateStats(-100l); + stats.updateStats(100l); + builder.add( + stats, + withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(1, 2), LongArrayList.of(6, 5)) : null); + stats = Statistics.createStats(type); + stats.incrementNumNulls(111); + builder.add( + stats, + withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(3, 4), LongArrayList.of(4, 3)) : null); + stats = Statistics.createStats(type); + stats.updateStats(200l); + stats.updateStats(500l); + builder.add( + stats, + withSizeStats ? new SizeStatistics(type, 0, LongArrayList.of(5, 6), LongArrayList.of(2, 1)) : null); + org.apache.parquet.format.ColumnIndex parquetColumnIndex = + ParquetMetadataConverter.toParquetColumnIndex(type, builder.build()); + ColumnIndex columnIndex = ParquetMetadataConverter.fromParquetColumnIndex(type, parquetColumnIndex); + assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); + assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); + assertTrue(Arrays.asList(16l, 111l, 0l).equals(columnIndex.getNullCounts())); + assertTrue(Arrays.asList( + ByteBuffer.wrap(BytesUtils.longToBytes(-100l)), + ByteBuffer.allocate(0), + ByteBuffer.wrap(BytesUtils.longToBytes(200l))) + .equals(columnIndex.getMinValues())); + assertTrue(Arrays.asList( + ByteBuffer.wrap(BytesUtils.longToBytes(100l)), + ByteBuffer.allocate(0), + ByteBuffer.wrap(BytesUtils.longToBytes(500l))) + .equals(columnIndex.getMaxValues())); + + assertNull( + "Should handle null column index", + ParquetMetadataConverter.toParquetColumnIndex( + Types.required(PrimitiveTypeName.INT32).named("test_int32"), null)); + assertNull( + "Should ignore unsupported types", + ParquetMetadataConverter.toParquetColumnIndex( + Types.required(PrimitiveTypeName.INT96).named("test_int96"), columnIndex)); + assertNull( + "Should ignore unsupported types", + ParquetMetadataConverter.fromParquetColumnIndex( + Types.required(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .length(12) + .as(OriginalType.INTERVAL) + .named("test_interval"), + parquetColumnIndex)); + + if (withSizeStats) { + assertEquals(LongArrayList.of(1, 2, 3, 4, 5, 6), columnIndex.getRepetitionLevelHistogram()); + assertEquals(LongArrayList.of(6, 5, 4, 3, 2, 1), columnIndex.getDefinitionLevelHistogram()); + } else { + assertEquals(LongArrayList.of(), columnIndex.getRepetitionLevelHistogram()); + assertEquals(LongArrayList.of(), columnIndex.getDefinitionLevelHistogram()); + } + } } @Test @@ -1537,4 +1565,19 @@ private void verifyMapMessageType(final MessageType messageType, final String ke } } } + + @Test + public void testSizeStatisticsConversion() { + PrimitiveType type = Types.required(PrimitiveTypeName.BINARY).named("test"); + List repLevelHistogram = Arrays.asList(1L, 2L, 3L, 4L, 5L); + List defLevelHistogram = Arrays.asList(6L, 7L, 8L, 9L, 10L); + SizeStatistics sizeStatistics = ParquetMetadataConverter.fromParquetSizeStatistics( + ParquetMetadataConverter.toParquetSizeStatistics( + new SizeStatistics(type, 1024, repLevelHistogram, defLevelHistogram)), + type); + assertEquals(type, sizeStatistics.getType()); + assertEquals(Optional.of(1024L), sizeStatistics.getUnencodedByteArrayDataBytes()); + assertEquals(repLevelHistogram, sizeStatistics.getRepetitionLevelHistogram()); + assertEquals(defLevelHistogram, sizeStatistics.getDefinitionLevelHistogram()); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index 4efe4324c7..7079d499c1 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -305,6 +305,7 @@ public void testColumnOrderV1() throws IOException { eq(fakeData.size()), eq(fakeData.size()), eq(fakeStats), + any(), same(ColumnIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no column index same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index any(), diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java new file mode 100644 index 0000000000..59e4aff2de --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestSizeStatisticsRoundTrip.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestSizeStatisticsRoundTrip { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testBinaryColumnSizeStatistics() throws IOException { + MessageType schema = Types.buildMessage() + .optional(PrimitiveType.PrimitiveTypeName.BINARY) + .as(LogicalTypeAnnotation.stringType()) + .named("name") + .named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(2) + .withMinRowCountForPageSizeCheck(1) + .withConf(conf) + .build()) { + writer.write(factory.newGroup().append("name", "a")); + writer.write(factory.newGroup().append("name", "b")); + writer.write(factory.newGroup().append("name", "c")); + writer.write(factory.newGroup().append("name", "d")); + } + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) { + ParquetMetadata footer = reader.getFooter(); + ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0); + + SizeStatistics sizeStatistics = column.getSizeStatistics(); + Assert.assertEquals(Optional.of(4L), sizeStatistics.getUnencodedByteArrayDataBytes()); + Assert.assertEquals(Arrays.asList(4L), sizeStatistics.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(0L, 4L), sizeStatistics.getDefinitionLevelHistogram()); + + ColumnIndex columnIndex = reader.readColumnIndex(column); + Assert.assertEquals(Arrays.asList(2L, 2L), columnIndex.getRepetitionLevelHistogram()); + Assert.assertEquals(Arrays.asList(0L, 2L, 0L, 2L), columnIndex.getDefinitionLevelHistogram()); + + OffsetIndex offsetIndex = reader.readOffsetIndex(column); + Assert.assertEquals(2, offsetIndex.getPageCount()); + Assert.assertEquals(Optional.of(2L), offsetIndex.getUnencodedByteArrayDataBytes(0)); + Assert.assertEquals(Optional.of(2L), offsetIndex.getUnencodedByteArrayDataBytes(1)); + } + } + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return new Path(file.getAbsolutePath()); + } +}