From e8cbe81896d712ca01465be5a8c8260ab9279893 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Fri, 23 Aug 2024 11:43:50 +0200 Subject: [PATCH] perf: Lazy decompress Parquet pages (#18326) --- .../arrow/read/deserialize/nested_utils.rs | 10 ++++- .../src/arrow/read/deserialize/null.rs | 16 ++++---- .../src/arrow/read/deserialize/utils/mod.rs | 11 +++--- .../src/parquet/read/compression.rs | 39 ++++++++++++++----- crates/polars/tests/it/io/parquet/read/mod.rs | 1 + 5 files changed, 53 insertions(+), 24 deletions(-) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index fd135a9b63ace..fa32c504a417f 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -705,6 +705,7 @@ impl PageNestedDecoder { break; }; let page = page?; + let page = page.decompress(&mut self.iter)?; let mut state = utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?; @@ -743,9 +744,11 @@ impl PageNestedDecoder { break; }; let page = page?; + // We cannot lazily decompress because we don't have the number of leaf values + // at this point. This is encoded within the `definition level` values. *sign*. + // In general, lazy decompression is quite difficult with nested values. + let page = page.decompress(&mut self.iter)?; - let mut state = - utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?; let (def_iter, rep_iter) = level_iters(&page)?; let mut count = ZeroCount::default(); @@ -762,6 +765,9 @@ impl PageNestedDecoder { None }; + let mut state = + utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?; + let start_length = nested_state.len(); // @TODO: move this to outside the loop. diff --git a/crates/polars-parquet/src/arrow/read/deserialize/null.rs b/crates/polars-parquet/src/arrow/read/deserialize/null.rs index 8c28a7fc66bb4..3958f176432ad 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/null.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/null.rs @@ -136,18 +136,20 @@ pub fn iter_to_arrays( }; let page = page?; - let rows = page.num_values(); - let page_filter; - (page_filter, filter) = Filter::opt_split_at(&filter, rows); + let state_filter; + (state_filter, filter) = Filter::opt_split_at(&filter, page.num_values()); - let num_rows = match page_filter { - None => rows, + // Skip the whole page if we don't need any rows from it + if state_filter.as_ref().is_some_and(|f| f.num_rows() == 0) { + continue; + } + + let num_rows = match state_filter { + None => page.num_values(), Some(filter) => filter.num_rows(), }; len = (len + num_rows).min(num_rows); - - iter.reuse_page_buffer(page); } Ok(Box::new(NullArray::new(data_type, len))) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs index b96f7b6a429c2..2fdf0f9501cc7 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs @@ -653,21 +653,22 @@ impl PageDecoder { while num_rows_remaining > 0 { let Some(page) = self.iter.next() else { - return self.decoder.finalize(self.data_type, self.dict, target); + break; }; let page = page?; - let mut state = State::new(&self.decoder, &page, self.dict.as_ref())?; - let state_len = state.len(); - let state_filter; - (state_filter, filter) = Filter::opt_split_at(&filter, state_len); + (state_filter, filter) = Filter::opt_split_at(&filter, page.num_values()); // Skip the whole page if we don't need any rows from it if state_filter.as_ref().is_some_and(|f| f.num_rows() == 0) { continue; } + let page = page.decompress(&mut self.iter)?; + + let mut state = State::new(&self.decoder, &page, self.dict.as_ref())?; + let start_length = target.len(); state.extend_from_state(&mut self.decoder, &mut target, state_filter)?; let end_length = target.len(); diff --git a/crates/polars-parquet/src/parquet/read/compression.rs b/crates/polars-parquet/src/parquet/read/compression.rs index 0996093b31f0a..d9e14d9a6e5d3 100644 --- a/crates/polars-parquet/src/parquet/read/compression.rs +++ b/crates/polars-parquet/src/parquet/read/compression.rs @@ -3,7 +3,9 @@ use parquet_format_safe::DataPageHeaderV2; use super::PageReader; use crate::parquet::compression::{self, Compression}; use crate::parquet::error::{ParquetError, ParquetResult}; -use crate::parquet::page::{CompressedPage, DataPage, DataPageHeader, DictPage, Page}; +use crate::parquet::page::{ + CompressedDataPage, CompressedPage, DataPage, DataPageHeader, DictPage, Page, +}; use crate::parquet::CowBuffer; fn decompress_v1( @@ -205,8 +207,27 @@ impl BasicDecompressor { } } +pub struct DataPageItem { + page: CompressedDataPage, +} + +impl DataPageItem { + pub fn num_values(&self) -> usize { + self.page.num_values() + } + + pub fn decompress(self, decompressor: &mut BasicDecompressor) -> ParquetResult { + let p = decompress(CompressedPage::Data(self.page), &mut decompressor.buffer)?; + let Page::Data(p) = p else { + panic!("Decompressing a data page should result in a data page"); + }; + + Ok(p) + } +} + impl Iterator for BasicDecompressor { - type Item = ParquetResult; + type Item = ParquetResult; fn next(&mut self) -> Option { let page = match self.reader.next() { @@ -215,15 +236,13 @@ impl Iterator for BasicDecompressor { Some(Ok(p)) => p, }; - Some(decompress(page, &mut self.buffer).and_then(|p| { - let Page::Data(p) = p else { - return Err(ParquetError::oos( - "Found dictionary page beyond the first page of a column chunk", - )); - }; + let CompressedPage::Data(page) = page else { + return Some(Err(ParquetError::oos( + "Found dictionary page beyond the first page of a column chunk", + ))); + }; - Ok(p) - })) + Some(Ok(DataPageItem { page })) } fn size_hint(&self) -> (usize, Option) { diff --git a/crates/polars/tests/it/io/parquet/read/mod.rs b/crates/polars/tests/it/io/parquet/read/mod.rs index f9e16619556c7..ae9bb763f2c51 100644 --- a/crates/polars/tests/it/io/parquet/read/mod.rs +++ b/crates/polars/tests/it/io/parquet/read/mod.rs @@ -159,6 +159,7 @@ where .map(|dict| dictionary::deserialize(&dict, column.physical_type())) .transpose()?; while let Some(page) = iterator.next().transpose()? { + let page = page.decompress(&mut iterator)?; if !has_filled { struct_::extend_validity(&mut validity, &page)?; }