From b69c7877b53255722d9518688dc4ca8aa993a8e5 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Thu, 2 Feb 2023 16:59:53 +0530 Subject: [PATCH 1/3] Remove unnecessary ColumnChunkDescriptor class --- .../parquet/reader/ColumnChunkDescriptor.java | 39 ------------------- .../io/trino/parquet/reader/PageReader.java | 3 +- .../reader/ParquetColumnChunkIterator.java | 17 +++++--- 3 files changed, 13 insertions(+), 46 deletions(-) delete mode 100644 lib/trino-parquet/src/main/java/io/trino/parquet/reader/ColumnChunkDescriptor.java diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ColumnChunkDescriptor.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ColumnChunkDescriptor.java deleted file mode 100644 index cdfe4c9f7c02..000000000000 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ColumnChunkDescriptor.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.parquet.reader; - -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; - -public class ColumnChunkDescriptor -{ - private final ColumnDescriptor columnDescriptor; - private final ColumnChunkMetaData columnChunkMetaData; - - public ColumnChunkDescriptor(ColumnDescriptor columnDescriptor, ColumnChunkMetaData columnChunkMetaData) - { - this.columnDescriptor = columnDescriptor; - this.columnChunkMetaData = columnChunkMetaData; - } - - public ColumnDescriptor getColumnDescriptor() - { - return columnDescriptor; - } - - public ColumnChunkMetaData getColumnChunkMetaData() - { - return columnChunkMetaData; - } -} diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java index a6d900b336e4..36c20f6e6435 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java @@ -64,7 +64,8 @@ public static PageReader createPageReader( boolean hasOnlyDictionaryEncodedPages = isOnlyDictionaryEncodingPages(metadata); ParquetColumnChunkIterator compressedPages = new ParquetColumnChunkIterator( fileCreatedBy, - new ColumnChunkDescriptor(columnDescriptor, metadata), + columnDescriptor, + metadata, columnChunk, offsetIndex); return new PageReader(metadata.getCodec(), compressedPages, compressedPages.hasDictionaryPage(), hasOnlyDictionaryEncodedPages, hasNoNulls); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java index 99eb21234f0c..859c90f9fc79 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java @@ -18,12 +18,14 @@ import io.trino.parquet.DictionaryPage; import io.trino.parquet.Page; import io.trino.parquet.ParquetCorruptionException; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.format.DataPageHeader; import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.DictionaryPageHeader; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import javax.annotation.Nullable; @@ -41,7 +43,8 @@ public final class ParquetColumnChunkIterator implements Iterator { private final Optional fileCreatedBy; - private final ColumnChunkDescriptor descriptor; + private final ColumnDescriptor descriptor; + private final ColumnChunkMetaData metadata; private final ChunkedInputStream input; private final OffsetIndex offsetIndex; @@ -51,19 +54,21 @@ public final class ParquetColumnChunkIterator public ParquetColumnChunkIterator( Optional fileCreatedBy, - ColumnChunkDescriptor descriptor, + ColumnDescriptor descriptor, + ColumnChunkMetaData metadata, ChunkedInputStream input, @Nullable OffsetIndex offsetIndex) { this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null"); this.descriptor = requireNonNull(descriptor, "descriptor is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); this.input = requireNonNull(input, "input is null"); this.offsetIndex = offsetIndex; } public boolean hasDictionaryPage() { - return descriptor.getColumnChunkMetaData().hasDictionaryPage(); + return metadata.hasDictionaryPage(); } @Override @@ -85,7 +90,7 @@ public Page next() switch (pageHeader.type) { case DICTIONARY_PAGE: if (dataPageCount != 0) { - throw new ParquetCorruptionException("%s has dictionary page at not first position in column chunk", descriptor.getColumnDescriptor()); + throw new ParquetCorruptionException("%s has dictionary page at not first position in column chunk", descriptor); } result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size()); dictionaryWasRead = true; @@ -118,7 +123,7 @@ private PageHeader readPageHeader() private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) { if (offsetIndex == null) { - return valuesCountReadSoFar < descriptor.getColumnChunkMetaData().getValueCount(); + return valuesCountReadSoFar < metadata.getValueCount(); } return dataPageCountReadSoFar < offsetIndex.getPageCount(); } @@ -176,7 +181,7 @@ private DataPageV2 readDataPageV2( MetadataReader.readStats( fileCreatedBy, Optional.ofNullable(dataHeaderV2.getStatistics()), - descriptor.getColumnDescriptor().getPrimitiveType()), + descriptor.getPrimitiveType()), dataHeaderV2.isIs_compressed()); } From a68b2b418493b4edae5c7ba746614b11c915c40a Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Thu, 2 Feb 2023 17:12:03 +0530 Subject: [PATCH 2/3] Convert DataPage into a sealed class --- lib/trino-parquet/src/main/java/io/trino/parquet/DataPage.java | 3 ++- .../src/main/java/io/trino/parquet/DataPageV1.java | 2 +- .../src/main/java/io/trino/parquet/DataPageV2.java | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/DataPage.java b/lib/trino-parquet/src/main/java/io/trino/parquet/DataPage.java index da4cf0318836..bbece17c9b7e 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/DataPage.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/DataPage.java @@ -15,8 +15,9 @@ import java.util.OptionalLong; -public abstract class DataPage +public abstract sealed class DataPage extends Page + permits DataPageV1, DataPageV2 { protected final int valueCount; private final OptionalLong firstRowIndex; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV1.java b/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV1.java index 3f6a68f7ebef..b0895445d813 100755 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV1.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV1.java @@ -20,7 +20,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public class DataPageV1 +public final class DataPageV1 extends DataPage { private final Slice slice; diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV2.java b/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV2.java index a9c00aea4f8b..b0cbfd9ed8fc 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV2.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/DataPageV2.java @@ -21,7 +21,7 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public class DataPageV2 +public final class DataPageV2 extends DataPage { private final int rowCount; From 31b14d94ed40bb6d1bdf6f0e8486db82f26c454d Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Thu, 2 Feb 2023 16:24:53 +0530 Subject: [PATCH 3/3] Fix reading parquet column with unused dictionary A parquet file produced by Impala was found to have an empty dictionary which is not used in the encoding of data pages in the column. For such a case we cannot rely on ColumnChunkMetaData#hasDictionaryPage as that checks for whether the data pages are also encoded using the dictionary. This change removes usage of hasDictionaryPage to fix query failures with such files. --- .../io/trino/parquet/reader/PageReader.java | 25 +++------ .../reader/ParquetColumnChunkIterator.java | 15 ++--- .../reader/AbstractColumnReaderBenchmark.java | 2 +- .../parquet/reader/TestColumnReader.java | 1 - .../parquet/reader/TestInt96Timestamp.java | 2 +- .../trino/parquet/reader/TestPageReader.java | 56 ++++++++++++++----- .../reader/flat/TestFlatColumnReader.java | 5 +- 7 files changed, 58 insertions(+), 48 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java index 36c20f6e6435..49935a2ccf8d 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java @@ -41,7 +41,6 @@ public final class PageReader { private final CompressionCodecName codec; - private final boolean hasDictionaryPage; private final boolean hasOnlyDictionaryEncodedPages; private final boolean hasNoNulls; private final PeekingIterator compressedPages; @@ -68,20 +67,18 @@ public static PageReader createPageReader( metadata, columnChunk, offsetIndex); - return new PageReader(metadata.getCodec(), compressedPages, compressedPages.hasDictionaryPage(), hasOnlyDictionaryEncodedPages, hasNoNulls); + return new PageReader(metadata.getCodec(), compressedPages, hasOnlyDictionaryEncodedPages, hasNoNulls); } @VisibleForTesting public PageReader( CompressionCodecName codec, Iterator compressedPages, - boolean hasDictionaryPage, boolean hasOnlyDictionaryEncodedPages, boolean hasNoNulls) { this.codec = codec; this.compressedPages = Iterators.peekingIterator(compressedPages); - this.hasDictionaryPage = hasDictionaryPage; this.hasOnlyDictionaryEncodedPages = hasOnlyDictionaryEncodedPages; this.hasNoNulls = hasNoNulls; } @@ -98,13 +95,11 @@ public boolean hasOnlyDictionaryEncodedPages() public DataPage readPage() { - if (hasDictionaryPage) { - checkState(dictionaryAlreadyRead, "Dictionary has to be read first"); - } if (!compressedPages.hasNext()) { return null; } Page compressedPage = compressedPages.next(); + checkState(compressedPage instanceof DataPage, "Found page %s instead of a DataPage", compressedPage); dataPageReadCount++; try { if (compressedPage instanceof DataPageV1 dataPageV1) { @@ -144,16 +139,14 @@ public DataPage readPage() public DictionaryPage readDictionaryPage() { - if (!hasDictionaryPage) { + checkState(!dictionaryAlreadyRead, "Dictionary was already read"); + checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already"); + dictionaryAlreadyRead = true; + if (!(compressedPages.peek() instanceof DictionaryPage)) { return null; } try { - checkState(!dictionaryAlreadyRead, "Dictionary was already read"); - checkState(dataPageReadCount == 0, "Dictionary has to be read first but " + dataPageReadCount + " was read already"); - dictionaryAlreadyRead = true; - Page firstPage = compressedPages.next(); - checkArgument(firstPage instanceof DictionaryPage, "DictionaryPage has to be the first page in the column chunk but got %s", firstPage); - DictionaryPage compressedDictionaryPage = (DictionaryPage) firstPage; + DictionaryPage compressedDictionaryPage = (DictionaryPage) compressedPages.next(); return new DictionaryPage( decompress(codec, compressedDictionaryPage.getSlice(), compressedDictionaryPage.getUncompressedSize()), compressedDictionaryPage.getDictionarySize(), @@ -189,8 +182,6 @@ public boolean arePagesCompressed() private void verifyDictionaryPageRead() { - if (hasDictionaryPage) { - checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first"); - } + checkArgument(dictionaryAlreadyRead, "Dictionary has to be read first"); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java index 859c90f9fc79..e7a9d3bc1f71 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java @@ -35,7 +35,7 @@ import java.util.Optional; import java.util.OptionalLong; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding; import static java.util.Objects.requireNonNull; @@ -50,7 +50,6 @@ public final class ParquetColumnChunkIterator private long valueCount; private int dataPageCount; - private boolean dictionaryWasRead; public ParquetColumnChunkIterator( Optional fileCreatedBy, @@ -66,21 +65,16 @@ public ParquetColumnChunkIterator( this.offsetIndex = offsetIndex; } - public boolean hasDictionaryPage() - { - return metadata.hasDictionaryPage(); - } - @Override public boolean hasNext() { - return hasMorePages(valueCount, dataPageCount) || (hasDictionaryPage() && !dictionaryWasRead); + return hasMorePages(valueCount, dataPageCount); } @Override public Page next() { - checkArgument(hasNext()); + checkState(hasNext(), "No more data left to read in column (%s), metadata (%s), valueCount %s, dataPageCount %s", descriptor, metadata, valueCount, dataPageCount); try { PageHeader pageHeader = readPageHeader(); @@ -90,10 +84,9 @@ public Page next() switch (pageHeader.type) { case DICTIONARY_PAGE: if (dataPageCount != 0) { - throw new ParquetCorruptionException("%s has dictionary page at not first position in column chunk", descriptor); + throw new ParquetCorruptionException("Column (%s) has a dictionary page after the first position in column chunk", descriptor); } result = readDictionaryPage(pageHeader, pageHeader.getUncompressed_page_size(), pageHeader.getCompressed_page_size()); - dictionaryWasRead = true; break; case DATA_PAGE: result = readDataPageV1(pageHeader, uncompressedPageSize, compressedPageSize, getFirstRowIndex(dataPageCount, offsetIndex)); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java index 39f3ab2c3326..4012531958c4 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/AbstractColumnReaderBenchmark.java @@ -102,7 +102,7 @@ public int read() throws IOException { ColumnReader columnReader = ColumnReaderFactory.create(field, UTC, newSimpleAggregatedMemoryContext(), true); - columnReader.setPageReader(new PageReader(UNCOMPRESSED, dataPages.iterator(), false, false, false), Optional.empty()); + columnReader.setPageReader(new PageReader(UNCOMPRESSED, dataPages.iterator(), false, false), Optional.empty()); int rowsRead = 0; while (rowsRead < dataPositions) { int remaining = dataPositions - rowsRead; diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java index 9369c8833002..5ca2212e7a89 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java @@ -378,7 +378,6 @@ else if (dictionaryEncoding == DictionaryEncoding.MIXED) { return new PageReader( UNCOMPRESSED, inputPages.iterator(), - dictionaryEncoding != DictionaryEncoding.NONE, dictionaryEncoding == DictionaryEncoding.ALL || (dictionaryEncoding == DictionaryEncoding.MIXED && testingPages.size() == 1), false); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java index 02c1c370223d..e5894aee421d 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java @@ -108,7 +108,7 @@ public void testVariousTimestamps(TimestampType type, BiFunction createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))).readDictionaryPage()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageStartingWith("DictionaryPage has to be the first page in the column chunk"); + // There is a dictionary, but it's there as the second page + PageReader pageReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))); + assertThat(pageReader.readDictionaryPage()).isNull(); + assertThat(pageReader.readPage()).isNotNull(); + assertThatThrownBy(pageReader::readPage) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("has a dictionary page after the first position"); + } + + @Test + public void unusedDictionaryPage() + throws Exception + { + // A parquet file produced by Impala was found to have an empty dictionary + // which is not used in the encoding of data pages in the column + CompressionCodecName compressionCodec = UNCOMPRESSED; + byte[] compressedDictionaryPage = TestPageReader.compress(compressionCodec, new byte[0], 0, 0); + PageHeader dictionaryPageHeader = new PageHeader(DICTIONARY_PAGE, 0, compressedDictionaryPage.length); + dictionaryPageHeader.setDictionary_page_header(new DictionaryPageHeader(0, Encoding.PLAIN)); + ByteArrayOutputStream out = new ByteArrayOutputStream(100); + Util.writePageHeader(dictionaryPageHeader, out); + out.write(compressedDictionaryPage); + + DataPageType dataPageType = V2; + byte[] compressedDataPage = DATA_PAGE; + + PageHeader pageHeader = new PageHeader(dataPageType.pageType(), DATA_PAGE.length, compressedDataPage.length); + int valueCount = 10; + dataPageType.setDataPageHeader(pageHeader, valueCount); + + Util.writePageHeader(pageHeader, out); + out.write(compressedDataPage); + byte[] bytes = out.toByteArray(); - // metadata says there is no dictionary, but it's there as second page - PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes))); - assertTrue(pageReader.hasNext()); - pageReader.skipNextPage(); - assertThatThrownBy(pageReader::readPage).isInstanceOf(RuntimeException.class).hasCauseInstanceOf(ParquetCorruptionException.class); + // There is a dictionary, but it's there as the second page + PageReader pageReader = createPageReader(valueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes))); + assertThat(pageReader.readDictionaryPage()).isNotNull(); + assertThat(pageReader.readPage()).isNotNull(); + assertThat(pageReader.readPage()).isNull(); } private static void assertSinglePage(CompressionCodecName compressionCodec, int valueCount, PageHeader pageHeader, byte[] compressedDataPage, List slices) @@ -363,7 +392,6 @@ private static PageReader createPageReader(int valueCount, CompressionCodecName EncodingStats.Builder encodingStats = new EncodingStats.Builder(); if (hasDictionary) { encodingStats.addDictEncoding(PLAIN); - encodingStats.addDataEncoding(RLE_DICTIONARY); } ColumnChunkMetaData columnChunkMetaData = ColumnChunkMetaData.get( ColumnPath.get(""), diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java index 38bc15bc30a5..3683cc2f1179 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestFlatColumnReader.java @@ -728,7 +728,7 @@ private static PageReader getSimplePageReaderMock(ParquetEncoding encoding) encoding, encoding, PLAIN)); - return new PageReader(UNCOMPRESSED, pages.iterator(), false, false, false); + return new PageReader(UNCOMPRESSED, pages.iterator(), false, false); } private static PageReader getNullOnlyPageReaderMock() @@ -745,7 +745,7 @@ private static PageReader getNullOnlyPageReaderMock() RLE, RLE, PLAIN)); - return new PageReader(UNCOMPRESSED, pages.iterator(), false, false, false); + return new PageReader(UNCOMPRESSED, pages.iterator(), false, false); } private static PageReader getPageReaderMock(List dataPages, @Nullable DictionaryPage dictionaryPage) @@ -762,7 +762,6 @@ private static PageReader getPageReaderMock(List dataPages, @Nullable return new PageReader( UNCOMPRESSED, pagesBuilder.addAll(dataPages).build().iterator(), - dictionaryPage != null, dataPages.stream() .map(page -> { if (page instanceof DataPageV1 pageV1) {