From aac191fd9c573bf2a9c6c8bdc650cc1716892d0b Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 23 Feb 2024 18:02:05 -0600 Subject: [PATCH] Added support to read mixed encoded parquet files (#5176) --- .../parquet/base/ColumnChunkReader.java | 5 +- .../parquet/base/ColumnChunkReaderImpl.java | 124 ++++++++---------- .../parquet/base/ColumnPageReader.java | 9 +- .../parquet/base/ColumnPageReaderImpl.java | 89 +++---------- .../parquet/base/ParquetFileReader.java | 2 +- .../parquet/table/ParquetTableWriter.java | 13 +- .../OffsetIndexBasedColumnChunkPageStore.java | 2 +- .../topage/ToPageWithDictionary.java | 2 +- .../table/ParquetTableReadWriteTest.java | 20 +++ ...taWithMixedEncodingWithOffsetIndex.parquet | 3 + ...ithMixedEncodingWithoutOffsetIndex.parquet | 3 + .../deephaven/extensions/s3/BufferPool.java | 2 - 12 files changed, 118 insertions(+), 156 deletions(-) create mode 100644 extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithOffsetIndex.parquet create mode 100644 extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithoutOffsetIndex.parquet diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java index a3967dc24ea..2f13ec64802 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReader.java @@ -61,8 +61,11 @@ interface ColumnPageReaderIterator { interface ColumnPageDirectAccessor { /** * Directly access a page reader for a given page number. + * + * @param pageNum The page number to access. + * @param channelContext The channel context to use for constructing the reader */ - ColumnPageReader getPageReader(int pageNum); + ColumnPageReader getPageReader(int pageNum, SeekableChannelContext channelContext); } /** diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index ee77e129db1..eb01e2f0b79 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -98,18 +98,13 @@ public int getMaxRl() { return path.getMaxRepetitionLevel(); } - public final OffsetIndex getOffsetIndex() { + public OffsetIndex getOffsetIndex() { return offsetIndex; } @Override public ColumnPageReaderIterator getPageIterator() { - final long dataPageOffset = columnChunk.meta_data.getData_page_offset(); - if (offsetIndex == null) { - return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values()); - } else { - return new ColumnPageReaderIteratorIndexImpl(); - } + return new ColumnPageReaderIteratorImpl(); } @Override @@ -230,9 +225,9 @@ private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIter private long nextHeaderOffset; private long remainingValues; - ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) { - this.remainingValues = numValues; - this.nextHeaderOffset = startOffset; + ColumnPageReaderIteratorImpl() { + this.remainingValues = columnChunk.meta_data.getNum_values(); + this.nextHeaderOffset = columnChunk.meta_data.getData_page_offset(); } @Override @@ -251,37 +246,41 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) { ch.position(headerOffset); - final PageHeader pageHeader; - try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) { - pageHeader = Util.readPageHeader(in); - } + final PageHeader pageHeader = readPageHeader(ch); // relying on exact position of ch final long dataOffset = ch.position(); nextHeaderOffset = dataOffset + pageHeader.getCompressed_page_size(); - if (pageHeader.isSetDictionary_page_header()) { - // Dictionary page; skip it + final PageType pageType = pageHeader.type; + if (pageType == PageType.DICTIONARY_PAGE && headerOffset == columnChunk.meta_data.getData_page_offset() + && columnChunk.meta_data.getDictionary_page_offset() == 0) { + // https://stackoverflow.com/questions/55225108/why-is-dictionary-page-offset-0-for-plain-dictionary-encoding + // Skip the dictionary page and jump to the data page return next(holder.get()); } - if (!pageHeader.isSetData_page_header() && !pageHeader.isSetData_page_header_v2()) { - throw new IllegalStateException( - "Expected data page, but neither v1 nor v2 data page header is set in file " - + ch + " at offset " + headerOffset); + if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) { + throw new IllegalStateException("Expected data page, but got " + pageType + " at offset " + + headerOffset + " for file " + getURI()); } - remainingValues -= getNumValues(pageHeader); - final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader); + final int numValuesInPage = getNumValues(pageHeader); + remainingValues -= numValuesInPage; final Function pageDictionarySupplier = - (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) - ? dictionarySupplier - : (SeekableChannelContext context) -> NULL_DICTIONARY; - return new ColumnPageReaderImpl(channelsProvider, decompressor, - pageDictionarySupplier, nullMaterializerFactory, path, getURI(), fieldTypes, - dataOffset, pageHeader, ColumnPageReaderImpl.NULL_NUM_VALUES); + getPageDictionarySupplier(pageHeader); + return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier, + nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage); } catch (IOException e) { - throw new UncheckedDeephavenException("Error reading page header", e); + throw new UncheckedDeephavenException("Error reading page header at offset " + headerOffset + " for " + + "file " + getURI(), e); } } } + private Function getPageDictionarySupplier(final PageHeader pageHeader) { + final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader); + return (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY) + ? dictionarySupplier + : (SeekableChannelContext context) -> NULL_DICTIONARY; + } + private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHeader) { switch (pageHeader.type) { case DATA_PAGE: @@ -294,58 +293,51 @@ private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHea } } + private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException { + try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) { + return Util.readPageHeader(in); + } + } + private static int getNumValues(PageHeader pageHeader) { return pageHeader.isSetData_page_header() ? pageHeader.getData_page_header().getNum_values() : pageHeader.getData_page_header_v2().getNum_values(); } - private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator { - private int pos; - - ColumnPageReaderIteratorIndexImpl() { - pos = 0; - } - - @Override - public boolean hasNext() { - return offsetIndex.getPageCount() > pos; - } - - @Override - public ColumnPageReader next(@NotNull final SeekableChannelContext channelContext) { - if (!hasNext()) { - throw new NoSuchElementException("No next element"); - } - // Following logic assumes that offsetIndex will store the number of values for a page instead of number - // of rows (which can be different for array and vector columns). This behavior is because of a bug on - // parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading - // parquet files written before deephaven-core/pull/4844. - final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values()) - - offsetIndex.getFirstRowIndex(pos) + 1); - final ColumnPageReader columnPageReader = - new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, - nullMaterializerFactory, path, getURI(), fieldTypes, offsetIndex.getOffset(pos), null, - numValues); - pos++; - return columnPageReader; - } - } - private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor { ColumnPageDirectAccessorImpl() {} @Override - public ColumnPageReader getPageReader(final int pageNum) { + public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) { if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) { throw new IndexOutOfBoundsException( "pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount()); } - // Page header and number of values will be populated later when we read the page header from the file - return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory, - path, getURI(), fieldTypes, offsetIndex.getOffset(pageNum), null, - ColumnPageReaderImpl.NULL_NUM_VALUES); + + // Read the page header to determine whether we need to use dictionary for this page + final long headerOffset = offsetIndex.getOffset(pageNum); + try ( + final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); + final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) { + ch.position(headerOffset); + final PageHeader pageHeader = readPageHeader(ch); + final long dataOffset = ch.position(); + final PageType pageType = pageHeader.type; + if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) { + throw new IllegalStateException("Expected data page, but got " + pageType + " for page number " + + pageNum + " at offset " + headerOffset + " for file " + getURI()); + } + final Function pageDictionarySupplier = + getPageDictionarySupplier(pageHeader); + return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier, + nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, + getNumValues(pageHeader)); + } catch (final IOException e) { + throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum + + " at offset " + headerOffset + " for file " + getURI(), e); + } } } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java index 58b66e6abdd..b3192fda9bc 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReader.java @@ -18,10 +18,10 @@ public interface ColumnPageReader extends AutoCloseable { /** * @param channelContext The channel context to use for reading the parquet file - * @return The number of rows in this ColumnChunk, or -1 if it's unknown. + * @return The number of rows in this page, or -1 if it's unknown. */ default long numRows(final SeekableChannelContext channelContext) throws IOException { - return numValues(channelContext); + return numValues(); } /** @@ -47,10 +47,9 @@ IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder, SeekableChannelContext channelContext) throws IOException; /** - * @param channelContext The channel context to use for reading the parquet file - * @return The value stored under number DataPageHeader.num_values + * @return The number of values in this page */ - int numValues(SeekableChannelContext channelContext) throws IOException; + int numValues(); /** * @param channelContext The channel context to use for reading the parquet file diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 743926ea86c..8bbbe661945 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -4,7 +4,7 @@ package io.deephaven.parquet.base; import io.deephaven.base.Pair; -import io.deephaven.base.verify.Assert; +import io.deephaven.base.verify.Require; import io.deephaven.parquet.compress.CompressorAdapter; import io.deephaven.util.channel.SeekableChannelContext; import io.deephaven.util.channel.SeekableChannelsProvider; @@ -22,7 +22,6 @@ import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.PageType; -import org.apache.parquet.format.Util; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.Type; import org.jetbrains.annotations.NotNull; @@ -41,8 +40,7 @@ import static org.apache.parquet.column.ValuesType.VALUES; final class ColumnPageReaderImpl implements ColumnPageReader { - public static final int NULL_OFFSET = -1; - static final int NULL_NUM_VALUES = -1; + private static final int NULL_OFFSET = -1; private final SeekableChannelsProvider channelsProvider; private final CompressorAdapter compressorAdapter; @@ -53,10 +51,9 @@ final class ColumnPageReaderImpl implements ColumnPageReader { private final List fieldTypes; /** - * Stores the offset from where the next byte should be read. Can be the offset of page header if - * {@link #pageHeader} is {@code null}, else will be the offset of data. + * The offset for data following the page header in the file. */ - private long offset; + private final long dataOffset; private PageHeader pageHeader; private int numValues; private int rowCount = -1; @@ -72,11 +69,9 @@ final class ColumnPageReaderImpl implements ColumnPageReader { * @param path The path of the column. * @param uri The uri of the parquet file. * @param fieldTypes The types of the fields in the column. - * @param offset The offset for page header if supplied {@code pageHeader} is {@code null}. Else, the offset of data - * following the header in the page. - * @param pageHeader The page header if it is already read from the file. Else, {@code null}. - * @param numValues The number of values in the page if it is already read from the file. Else, - * {@value #NULL_NUM_VALUES} + * @param dataOffset The offset for data following the page header in the file. + * @param pageHeader The page header, should not be {@code null}. + * @param numValues The number of values in the page. */ ColumnPageReaderImpl(SeekableChannelsProvider channelsProvider, CompressorAdapter compressorAdapter, @@ -85,7 +80,7 @@ final class ColumnPageReaderImpl implements ColumnPageReader { ColumnDescriptor path, URI uri, List fieldTypes, - long offset, + long dataOffset, PageHeader pageHeader, int numValues) { this.channelsProvider = channelsProvider; @@ -95,9 +90,9 @@ final class ColumnPageReaderImpl implements ColumnPageReader { this.path = path; this.uri = uri; this.fieldTypes = fieldTypes; - this.offset = offset; - this.pageHeader = pageHeader; - this.numValues = numValues; + this.dataOffset = dataOffset; + this.pageHeader = Require.neqNull(pageHeader, "pageHeader"); + this.numValues = Require.geqZero(numValues, "numValues"); } @Override @@ -106,7 +101,7 @@ public Object materialize(@NotNull final Object nullValue, try ( final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); + ch.position(dataOffset); return readDataPage(nullValue, ch, holder.get()); } } @@ -115,7 +110,7 @@ public int readRowCount(@NotNull final SeekableChannelContext channelContext) th try ( final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); + ch.position(dataOffset); return readRowCountFromDataPage(ch); } } @@ -126,51 +121,11 @@ public IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder, try ( final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); + ch.position(dataOffset); return readKeysFromDataPage(keyDest, nullPlaceholder, ch, holder.get()); } } - /** - * If {@link #pageHeader} is {@code null}, read it from the channel, and increment the {@link #offset} by the length - * of page header. Channel position would be set to the end of page header or beginning of data before returning. - */ - private void ensurePageHeader(SeekableChannelsProvider provider, SeekableByteChannel ch) throws IOException { - // Set this channel's position to appropriate offset for reading. If pageHeader is null, this offset would be - // the offset of page header, else it would be the offset of data. - ch.position(offset); - synchronized (this) { - if (pageHeader == null) { - try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(provider, ch)) { - pageHeader = Util.readPageHeader(in); - } - offset = ch.position(); - if (numValues >= 0) { - final int numValuesFromHeader = readNumValuesFromPageHeader(pageHeader); - if (numValues != numValuesFromHeader) { - throw new IllegalStateException( - "numValues = " + numValues + " different from number of values " + - "read from the page header = " + numValuesFromHeader + " for column " + path); - } - } - } - if (numValues == NULL_NUM_VALUES) { - numValues = readNumValuesFromPageHeader(pageHeader); - } - } - } - - private static int readNumValuesFromPageHeader(@NotNull final PageHeader header) throws IOException { - switch (header.type) { - case DATA_PAGE: - return header.getData_page_header().getNum_values(); - case DATA_PAGE_V2: - return header.getData_page_header_v2().getNum_values(); - default: - throw new IOException(String.format("Unexpected page of type {%s}", header.getType())); - } - } - /** * Callers must ensure resulting data page does not outlive the input stream. */ @@ -588,18 +543,8 @@ private ValuesReader getDataReader(Encoding dataEncoding, ByteBuffer in, int val } @Override - public int numValues(@NotNull final SeekableChannelContext channelContext) throws IOException { - if (numValues >= 0) { - return numValues; - } - try ( - final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext); - final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), uri)) { - ensurePageHeader(channelsProvider, ch); - // Above will block till it populates numValues - Assert.geqZero(numValues, "numValues"); - return numValues; - } + public int numValues() { + return numValues; } @NotNull @@ -617,7 +562,7 @@ public void close() throws Exception { public long numRows(@NotNull final SeekableChannelContext channelContext) throws IOException { if (rowCount == -1) { if (path.getMaxRepetitionLevel() == 0) { - rowCount = numValues(channelContext); + rowCount = numValues(); } else { rowCount = readRowCount(channelContext); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 8c8ed1112fe..f11f7ad21fa 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -123,7 +123,7 @@ public Set getColumnsWithDictionaryUsedOnEveryDataPage() { * True only if we are certain every data page in this column chunk uses dictionary encoding; note false also covers * the "we can't tell" case. */ - private boolean columnChunkUsesDictionaryOnEveryPage(final ColumnChunk columnChunk) { + private static boolean columnChunkUsesDictionaryOnEveryPage(final ColumnChunk columnChunk) { final ColumnMetaData columnMeta = columnChunk.getMeta_data(); if (columnMeta.encoding_stats == null) { return false; // this is false as "don't know". diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index ad733d6abbf..34b6983c727 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -60,25 +60,24 @@ public class ParquetTableWriter { /** * Helper struct used to pass information about where to write the grouping files for each grouping column */ - public static class GroupingColumnWritingInfo { + static class GroupingColumnWritingInfo { /** * Parquet name of this grouping column */ - public final String parquetColumnName; + final String parquetColumnName; /** * File path to be added in the grouping metadata of main parquet file */ - public final File metadataFilePath; + final File metadataFilePath; /** * Destination path for writing the grouping file. The two filenames can differ because we write grouping files * to shadow file paths first and then place them at the final path once the write is complete. But the metadata * should always hold the accurate path. */ - public final File destFile; + final File destFile; - public GroupingColumnWritingInfo(final String parquetColumnName, final File metadataFilePath, - final File destFile) { + GroupingColumnWritingInfo(final String parquetColumnName, final File metadataFilePath, final File destFile) { this.parquetColumnName = parquetColumnName; this.metadataFilePath = metadataFilePath; this.destFile = destFile; @@ -97,7 +96,7 @@ public GroupingColumnWritingInfo(final String parquetColumnName, final File meta * unsupported types) * @throws IOException For file writing related errors */ - public static void write( + static void write( @NotNull final Table t, @NotNull final TableDefinition definition, @NotNull final ParquetInstructions writeInstructions, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java index 2634c65b0b9..807837d13aa 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/OffsetIndexBasedColumnChunkPageStore.java @@ -127,9 +127,9 @@ private ChunkPage getPage(@Nullable final FillContext fillContext, final i } private ChunkPage getPageImpl(@Nullable FillContext fillContext, int pageNum) { - final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum); // Use the latest context while reading the page, or create (and close) new one try (final ContextHolder holder = ensureContext(fillContext)) { + final ColumnPageReader reader = columnPageDirectAccessor.getPageReader(pageNum, holder.get()); return toPage(offsetIndex.getFirstRowIndex(pageNum), reader, holder.get()); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java index d733e8a198f..c947b281c82 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToPageWithDictionary.java @@ -58,7 +58,7 @@ public final Object getResult(@NotNull final ColumnPageReader columnPageReader, return ToPage.super.getResult(columnPageReader, channelContext); } - final int[] keys = new int[columnPageReader.numValues(channelContext)]; + final int[] keys = new int[columnPageReader.numValues()]; final IntBuffer offsets = columnPageReader.readKeyValues(IntBuffer.wrap(keys), NULL_INT, channelContext); return offsets == null ? keys : new DataWithOffsets(offsets, keys); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index e2c6638d5ce..1c10cad04c8 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -93,6 +93,7 @@ import static io.deephaven.engine.util.TableTools.byteCol; import static io.deephaven.engine.util.TableTools.charCol; import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.emptyTable; import static io.deephaven.engine.util.TableTools.floatCol; import static io.deephaven.engine.util.TableTools.instantCol; import static io.deephaven.engine.util.TableTools.intCol; @@ -1427,6 +1428,25 @@ public void dictionaryEncodingTest() { assertTrue(thirdColumnMetadata.contains("someIntColumn") && !thirdColumnMetadata.contains("RLE_DICTIONARY")); } + @Test + public void mixedDictionaryEncodingTest() { + // Test the behavior of writing parquet files with some pages dictionary encoded and some not + String path = ParquetTableReadWriteTest.class + .getResource("/ParquetDataWithMixedEncodingWithoutOffsetIndex.parquet").getFile(); + Table fromDisk = readParquetFileFromGitLFS(new File(path)).select(); + Table expected = + emptyTable(2_000_000).update("Broken=String.format(`%015d`, ii < 1200000 ? (ii % 30000) : ii)"); + assertTableEquals(expected, fromDisk); + + path = ParquetTableReadWriteTest.class.getResource("/ParquetDataWithMixedEncodingWithOffsetIndex.parquet") + .getFile(); + fromDisk = readParquetFileFromGitLFS(new File(path)).select(); + final Collection columns = new ArrayList<>(Arrays.asList("shortStringColumn = `Some data`")); + final int numRows = 20; + expected = TableTools.emptyTable(numRows).select(Selectable.from(columns)); + assertTableEquals(expected, fromDisk); + } + @Test public void overflowingStringsTest() { // Test the behavior of writing parquet files if entries exceed the page size limit diff --git a/extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithOffsetIndex.parquet b/extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithOffsetIndex.parquet new file mode 100644 index 00000000000..1fbae5c0ac3 --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithOffsetIndex.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:22f5fb1bc52f71684db340c3c1f364e24eab49d7609095a891bfaf86d06f20df +size 434 diff --git a/extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithoutOffsetIndex.parquet b/extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithoutOffsetIndex.parquet new file mode 100644 index 00000000000..4237591668e --- /dev/null +++ b/extensions/parquet/table/src/test/resources/ParquetDataWithMixedEncodingWithoutOffsetIndex.parquet @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:b42f72b400404a9f7252cc05086fa38cc2cfeabf967284879b1f2adffbc03fbb +size 6581890 diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java index b528fbf8ef5..38d4ba03d08 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/BufferPool.java @@ -1,9 +1,7 @@ package io.deephaven.extensions.s3; import io.deephaven.base.reference.PooledObjectReference; -import io.deephaven.base.reference.SimpleReference; import io.deephaven.util.datastructures.SegmentedSoftPool; -import io.deephaven.util.referencecounting.ReferenceCounted; import org.jetbrains.annotations.NotNull; import java.nio.ByteBuffer;