From a212ce9c82d853accb772b2a54a2e844e482ae18 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Fri, 19 Jul 2024 08:10:46 +0200 Subject: [PATCH] perf: Introduce `MemReader` to file buffer in Parquet reader (#17712) --- crates/polars-io/src/parquet/read/mmap.rs | 3 +- .../deserialize/fixed_size_binary/basic.rs | 8 +- .../fixed_size_binary/dictionary.rs | 2 +- .../deserialize/fixed_size_binary/nested.rs | 10 +- .../src/arrow/read/deserialize/mod.rs | 8 +- .../arrow/read/deserialize/nested_utils.rs | 2 +- .../src/arrow/read/deserialize/null/mod.rs | 3 +- .../arrow/read/deserialize/primitive/basic.rs | 3 +- .../src/arrow/read/deserialize/utils/mod.rs | 2 +- .../src/arrow/read/row_group.rs | 8 +- .../src/arrow/write/dictionary.rs | 31 ++- .../polars-parquet/src/arrow/write/utils.rs | 3 +- crates/polars-parquet/src/parquet/page/mod.rs | 47 +++-- .../src/parquet/read/column/mod.rs | 22 +- .../src/parquet/read/compression.rs | 15 +- crates/polars-parquet/src/parquet/read/mod.rs | 13 +- .../src/parquet/read/page/indexed_reader.rs | 63 +++--- .../src/parquet/read/page/memreader.rs | 191 ++++++++++++++++++ .../src/parquet/read/page/mod.rs | 2 + .../src/parquet/read/page/reader.rs | 82 ++++---- .../src/parquet/read/page/stream.rs | 3 +- .../src/parquet/write/compression.rs | 23 ++- .../polars-parquet/src/parquet/write/page.rs | 5 +- crates/polars/tests/it/io/parquet/read/mod.rs | 15 +- .../tests/it/io/parquet/write/binary.rs | 3 +- .../tests/it/io/parquet/write/indexes.rs | 5 +- .../polars/tests/it/io/parquet/write/mod.rs | 4 +- .../tests/it/io/parquet/write/primitive.rs | 3 +- 28 files changed, 411 insertions(+), 168 deletions(-) create mode 100644 crates/polars-parquet/src/parquet/read/page/memreader.rs diff --git a/crates/polars-io/src/parquet/read/mmap.rs b/crates/polars-io/src/parquet/read/mmap.rs index 0085dfd6fabf..8ac959069f76 100644 --- a/crates/polars-io/src/parquet/read/mmap.rs +++ b/crates/polars-io/src/parquet/read/mmap.rs @@ -4,6 +4,7 @@ use bytes::Bytes; #[cfg(feature = "async")] use polars_core::datatypes::PlHashMap; use polars_error::PolarsResult; +use polars_parquet::parquet::read::MemReader; use polars_parquet::read::{ column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData, PageReader, @@ -73,7 +74,7 @@ pub(super) fn to_deserializer<'a>( .into_iter() .map(|(column_meta, chunk)| { let pages = PageReader::new( - std::io::Cursor::new(chunk), + MemReader::from_slice(chunk), column_meta, std::sync::Arc::new(|_, _| true), vec![], diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs index 175ec8250663..70c0d925636e 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs @@ -146,7 +146,7 @@ impl DecodedState for (FixedSizeBinary, MutableBitmap) { impl<'a> Decoder<'a> for BinaryDecoder { type Translation = StateTranslation<'a>; - type Dict = Vec; + type Dict = &'a [u8]; type DecodedState = (FixedSizeBinary, MutableBitmap); fn with_capacity(&self, capacity: usize) -> Self::DecodedState { @@ -156,8 +156,8 @@ impl<'a> Decoder<'a> for BinaryDecoder { ) } - fn deserialize_dict(&self, page: &DictPage) -> Self::Dict { - page.buffer.clone() + fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dict { + page.buffer.as_ref() } } @@ -207,7 +207,7 @@ impl Iterator for Iter { let maybe_state = next( &mut self.iter, &mut self.items, - &mut self.dict, + &mut self.dict.as_deref(), &mut self.remaining, self.chunk_size, &BinaryDecoder { size: self.size }, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/dictionary.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/dictionary.rs index 417abd5bf795..6401e7503d8c 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/dictionary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/dictionary.rs @@ -56,7 +56,7 @@ fn read_dict(data_type: ArrowDataType, dict: &DictPage) -> Box { let values = dict.buffer.clone(); - FixedSizeBinaryArray::try_new(data_type, values.into(), None) + FixedSizeBinaryArray::try_new(data_type, values.to_vec().into(), None) .unwrap() .boxed() } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/nested.rs index 5cb589a24b35..b43cb266c960 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/nested.rs @@ -52,7 +52,7 @@ struct BinaryDecoder { impl<'a> NestedDecoder<'a> for BinaryDecoder { type State = State<'a>; - type Dictionary = Vec; + type Dictionary = &'a [u8]; type DecodedState = (FixedSizeBinary, MutableBitmap); fn build_state( @@ -71,7 +71,7 @@ impl<'a> NestedDecoder<'a> for BinaryDecoder { let values = values.chunks_exact(self.size); StateTranslation::Unit(values) }, - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(&dict), false) => { let values = dict_indices_decoder(page)?; StateTranslation::Dictionary { values, dict } }, @@ -133,8 +133,8 @@ impl<'a> NestedDecoder<'a> for BinaryDecoder { validity.extend_constant(n, false); } - fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary { - page.buffer.clone() + fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dictionary { + page.buffer.as_ref() } } @@ -179,7 +179,7 @@ impl Iterator for NestedIter { let maybe_state = next( &mut self.iter, &mut self.items, - &mut self.dict, + &mut self.dict.as_deref(), &mut self.remaining, &self.init, self.chunk_size, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs index 8f5a86129470..007f7cfea8ac 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs @@ -36,17 +36,17 @@ use simple::page_iter_to_arrays; pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState}; pub use self::struct_::StructIterator; use super::*; -use crate::parquet::read::get_page_iterator as _get_page_iterator; +use crate::parquet::read::{get_page_iterator as _get_page_iterator, MemReader}; use crate::parquet::schema::types::PrimitiveType; /// Creates a new iterator of compressed pages. -pub fn get_page_iterator( +pub fn get_page_iterator( column_metadata: &ColumnChunkMetaData, - reader: R, + reader: MemReader, pages_filter: Option, buffer: Vec, max_header_size: usize, -) -> PolarsResult> { +) -> PolarsResult { Ok(_get_page_iterator( column_metadata, reader, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index 796d830286db..629d45a845af 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -229,7 +229,7 @@ pub(super) trait NestedDecoder<'a> { ) -> ParquetResult<()>; fn push_n_nulls(&self, decoded: &mut Self::DecodedState, n: usize); - fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary; + fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dictionary; } /// The initial info of nested data types. diff --git a/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs index 35a2efe94755..5d0adfe69bc1 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs @@ -65,6 +65,7 @@ mod tests { use crate::parquet::fallible_streaming_iterator; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page}; + use crate::parquet::read::CowBuffer; use crate::parquet::schema::types::{PhysicalType, PrimitiveType}; #[test] @@ -78,7 +79,7 @@ mod tests { repetition_level_encoding: Encoding::Plain.into(), statistics: None, }), - vec![], + CowBuffer::Owned(vec![]), Descriptor { primitive_type: PrimitiveType::from_physical( "a".to_string(), diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs index 4ab61ffb1a44..d0ceef2812eb 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs @@ -215,8 +215,7 @@ where match (self, page_validity) { (Self::Unit(page), None) => { values.extend( - page.by_ref() - .map(|v| decoder.decoder.decode(P::from_le_bytes(*v))) + page.map(|v| decoder.decoder.decode(P::from_le_bytes(*v))) .take(additional), ); }, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs index cdeb0a945326..f0bae82ec594 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs @@ -484,7 +484,7 @@ pub(super) trait Decoder<'a>: Sized { fn with_capacity(&self, capacity: usize) -> Self::DecodedState; /// Deserializes a [`DictPage`] into [`Self::Dict`]. - fn deserialize_dict(&self, page: &DictPage) -> Self::Dict; + fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dict; } pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( diff --git a/crates/polars-parquet/src/arrow/read/row_group.rs b/crates/polars-parquet/src/arrow/read/row_group.rs index 6bda582cb05b..2ff368d77c99 100644 --- a/crates/polars-parquet/src/arrow/read/row_group.rs +++ b/crates/polars-parquet/src/arrow/read/row_group.rs @@ -9,7 +9,9 @@ use super::{ArrayIter, RowGroupMetaData}; use crate::arrow::read::column_iter_to_arrays; use crate::parquet::indexes::FilteredPage; use crate::parquet::metadata::ColumnChunkMetaData; -use crate::parquet::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader}; +use crate::parquet::read::{ + BasicDecompressor, IndexedPageReader, MemReader, PageMetaData, PageReader, +}; /// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into /// an iterator of [`RecordBatchT`]. @@ -164,7 +166,7 @@ pub fn to_deserializer<'a>( .for_each(|page| page.start -= meta.column_start); meta.column_start = 0; let pages = IndexedPageReader::new_with_page_meta( - std::io::Cursor::new(chunk), + MemReader::from_vec(chunk), meta, pages, vec![], @@ -185,7 +187,7 @@ pub fn to_deserializer<'a>( .map(|(column_meta, chunk)| { let len = chunk.len(); let pages = PageReader::new( - std::io::Cursor::new(chunk), + MemReader::from_vec(chunk), column_meta, std::sync::Arc::new(|_, _| true), vec![], diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 20676f9ccf54..316d734b63e0 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -19,6 +19,7 @@ use crate::arrow::write::{slice_nested_leaf, utils}; use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DictPage, Page}; +use crate::parquet::read::CowBuffer; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::ParquetStatistics; use crate::write::DynIter; @@ -202,7 +203,10 @@ macro_rules! dyn_prim { } else { None }; - (DictPage::new(buffer, values.len(), false), stats) + ( + DictPage::new(CowBuffer::Owned(buffer), values.len(), false), + stats, + ) }}; } @@ -254,7 +258,10 @@ pub fn array_to_pages( } else { None }; - (DictPage::new(buffer, array.len(), false), stats) + ( + DictPage::new(CowBuffer::Owned(buffer), array.len(), false), + stats, + ) }, ArrowDataType::BinaryView => { let array = array @@ -274,7 +281,10 @@ pub fn array_to_pages( } else { None }; - (DictPage::new(buffer, array.len(), false), stats) + ( + DictPage::new(CowBuffer::Owned(buffer), array.len(), false), + stats, + ) }, ArrowDataType::Utf8View => { let array = array @@ -295,7 +305,10 @@ pub fn array_to_pages( } else { None }; - (DictPage::new(buffer, array.len(), false), stats) + ( + DictPage::new(CowBuffer::Owned(buffer), array.len(), false), + stats, + ) }, ArrowDataType::LargeBinary => { let values = array.values().as_any().downcast_ref().unwrap(); @@ -311,7 +324,10 @@ pub fn array_to_pages( } else { None }; - (DictPage::new(buffer, values.len(), false), stats) + ( + DictPage::new(CowBuffer::Owned(buffer), values.len(), false), + stats, + ) }, ArrowDataType::FixedSizeBinary(_) => { let mut buffer = vec![]; @@ -327,7 +343,10 @@ pub fn array_to_pages( } else { None }; - (DictPage::new(buffer, array.len(), false), stats) + ( + DictPage::new(CowBuffer::Owned(buffer), array.len(), false), + stats, + ) }, other => { polars_bail!(nyi = diff --git a/crates/polars-parquet/src/arrow/write/utils.rs b/crates/polars-parquet/src/arrow/write/utils.rs index 0ba9f4289bab..0c9f8bec8cdb 100644 --- a/crates/polars-parquet/src/arrow/write/utils.rs +++ b/crates/polars-parquet/src/arrow/write/utils.rs @@ -8,6 +8,7 @@ use crate::parquet::encoding::hybrid_rle::encode; use crate::parquet::encoding::Encoding; use crate::parquet::metadata::Descriptor; use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}; +use crate::parquet::read::CowBuffer; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::ParquetStatistics; @@ -89,7 +90,7 @@ pub fn build_plain_page( }; Ok(DataPage::new( header, - buffer, + CowBuffer::Owned(buffer), Descriptor { primitive_type: type_, max_def_level: 0, diff --git a/crates/polars-parquet/src/parquet/page/mod.rs b/crates/polars-parquet/src/parquet/page/mod.rs index 8c0e1b050e50..77b5da526fd8 100644 --- a/crates/polars-parquet/src/parquet/page/mod.rs +++ b/crates/polars-parquet/src/parquet/page/mod.rs @@ -1,3 +1,4 @@ +use super::read::CowBuffer; use crate::parquet::compression::Compression; use crate::parquet::encoding::{get_length, Encoding}; use crate::parquet::error::{ParquetError, ParquetResult}; @@ -19,7 +20,7 @@ pub enum PageResult { #[derive(Debug)] pub struct CompressedDataPage { pub(crate) header: DataPageHeader, - pub(crate) buffer: Vec, + pub(crate) buffer: CowBuffer, pub(crate) compression: Compression, uncompressed_page_size: usize, pub(crate) descriptor: Descriptor, @@ -32,7 +33,7 @@ impl CompressedDataPage { /// Returns a new [`CompressedDataPage`]. pub fn new( header: DataPageHeader, - buffer: Vec, + buffer: CowBuffer, compression: Compression, uncompressed_page_size: usize, descriptor: Descriptor, @@ -51,7 +52,7 @@ impl CompressedDataPage { /// Returns a new [`CompressedDataPage`]. pub(crate) fn new_read( header: DataPageHeader, - buffer: Vec, + buffer: CowBuffer, compression: Compression, uncompressed_page_size: usize, descriptor: Descriptor, @@ -114,6 +115,10 @@ impl CompressedDataPage { pub fn select_rows(&mut self, selected_rows: Vec) { self.selected_rows = Some(selected_rows); } + + pub fn slice_mut(&mut self) -> &mut CowBuffer { + &mut self.buffer + } } #[derive(Debug, Clone)] @@ -136,7 +141,7 @@ impl DataPageHeader { #[derive(Debug, Clone)] pub struct DataPage { pub(super) header: DataPageHeader, - pub(super) buffer: Vec, + pub(super) buffer: CowBuffer, pub descriptor: Descriptor, pub selected_rows: Option>, } @@ -144,7 +149,7 @@ pub struct DataPage { impl DataPage { pub fn new( header: DataPageHeader, - buffer: Vec, + buffer: CowBuffer, descriptor: Descriptor, rows: Option, ) -> Self { @@ -158,7 +163,7 @@ impl DataPage { pub(crate) fn new_read( header: DataPageHeader, - buffer: Vec, + buffer: CowBuffer, descriptor: Descriptor, selected_rows: Option>, ) -> Self { @@ -187,7 +192,7 @@ impl DataPage { /// Returns a mutable reference to the internal buffer. /// Useful to recover the buffer after the page has been decoded. pub fn buffer_mut(&mut self) -> &mut Vec { - &mut self.buffer + self.buffer.to_mut() } pub fn num_values(&self) -> usize { @@ -242,12 +247,13 @@ pub enum Page { } impl Page { - pub(crate) fn buffer(&mut self) -> &mut Vec { + pub(crate) fn buffer_mut(&mut self) -> &mut Vec { match self { - Self::Data(page) => &mut page.buffer, - Self::Dict(page) => &mut page.buffer, + Self::Data(page) => page.buffer.to_mut(), + Self::Dict(page) => page.buffer.to_mut(), } } + pub(crate) fn unwrap_data(self) -> DataPage { match self { Self::Data(page) => page, @@ -266,10 +272,17 @@ pub enum CompressedPage { } impl CompressedPage { - pub(crate) fn buffer(&mut self) -> &mut Vec { + pub(crate) fn buffer(&self) -> &[u8] { + match self { + CompressedPage::Data(page) => &page.buffer, + CompressedPage::Dict(page) => &page.buffer, + } + } + + pub(crate) fn buffer_mut(&mut self) -> &mut Vec { match self { - CompressedPage::Data(page) => &mut page.buffer, - CompressedPage::Dict(page) => &mut page.buffer, + CompressedPage::Data(page) => page.buffer.to_mut(), + CompressedPage::Dict(page) => page.buffer.to_mut(), } } @@ -305,13 +318,13 @@ impl CompressedPage { /// An uncompressed, encoded dictionary page. #[derive(Debug)] pub struct DictPage { - pub buffer: Vec, + pub buffer: CowBuffer, pub num_values: usize, pub is_sorted: bool, } impl DictPage { - pub fn new(buffer: Vec, num_values: usize, is_sorted: bool) -> Self { + pub fn new(buffer: CowBuffer, num_values: usize, is_sorted: bool) -> Self { Self { buffer, num_values, @@ -323,7 +336,7 @@ impl DictPage { /// A compressed, encoded dictionary page. #[derive(Debug)] pub struct CompressedDictPage { - pub(crate) buffer: Vec, + pub(crate) buffer: CowBuffer, compression: Compression, pub(crate) num_values: usize, pub(crate) uncompressed_page_size: usize, @@ -332,7 +345,7 @@ pub struct CompressedDictPage { impl CompressedDictPage { pub fn new( - buffer: Vec, + buffer: CowBuffer, compression: Compression, uncompressed_page_size: usize, num_values: usize, diff --git a/crates/polars-parquet/src/parquet/read/column/mod.rs b/crates/polars-parquet/src/parquet/read/column/mod.rs index d5783173d6d4..5ad518a3d6e5 100644 --- a/crates/polars-parquet/src/parquet/read/column/mod.rs +++ b/crates/polars-parquet/src/parquet/read/column/mod.rs @@ -1,7 +1,7 @@ use std::io::{Read, Seek}; use std::vec::IntoIter; -use super::{get_field_columns, get_page_iterator, PageFilter, PageReader}; +use super::{get_field_columns, get_page_iterator, MemReader, PageFilter, PageReader}; use crate::parquet::error::ParquetError; use crate::parquet::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use crate::parquet::page::CompressedPage; @@ -18,14 +18,14 @@ mod stream; /// For primitive fields (e.g. `i64`), [`ColumnIterator`] yields exactly one column. /// For complex fields, it yields multiple columns. /// `max_page_size` is the maximum number of bytes allowed. -pub fn get_column_iterator( - reader: R, +pub fn get_column_iterator( + reader: MemReader, row_group: &RowGroupMetaData, field_name: &str, page_filter: Option, scratch: Vec, max_page_size: usize, -) -> ColumnIterator { +) -> ColumnIterator { let columns = get_field_columns(row_group.columns(), field_name) .cloned() .collect::>(); @@ -53,20 +53,20 @@ pub trait MutStreamingIterator: Sized { /// A [`MutStreamingIterator`] that reads column chunks one by one, /// returning a [`PageReader`] per column. -pub struct ColumnIterator { - reader: Option, +pub struct ColumnIterator { + reader: Option, columns: Vec, page_filter: Option, - current: Option<(PageReader, ColumnChunkMetaData)>, + current: Option<(PageReader, ColumnChunkMetaData)>, scratch: Vec, max_page_size: usize, } -impl ColumnIterator { +impl ColumnIterator { /// Returns a new [`ColumnIterator`] /// `max_page_size` is the maximum allowed page size pub fn new( - reader: R, + reader: MemReader, mut columns: Vec, page_filter: Option, scratch: Vec, @@ -84,8 +84,8 @@ impl ColumnIterator { } } -impl MutStreamingIterator for ColumnIterator { - type Item = (PageReader, ColumnChunkMetaData); +impl MutStreamingIterator for ColumnIterator { + type Item = (PageReader, ColumnChunkMetaData); type Error = ParquetError; fn advance(mut self) -> Result, ParquetError> { diff --git a/crates/polars-parquet/src/parquet/read/compression.rs b/crates/polars-parquet/src/parquet/read/compression.rs index d7443eaa55b5..700db3b28604 100644 --- a/crates/polars-parquet/src/parquet/read/compression.rs +++ b/crates/polars-parquet/src/parquet/read/compression.rs @@ -1,6 +1,7 @@ use parquet_format_safe::DataPageHeaderV2; use super::page::PageIterator; +use super::CowBuffer; use crate::parquet::compression::{self, Compression}; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{CompressedPage, DataPage, DataPageHeader, DictPage, Page}; @@ -92,7 +93,7 @@ pub fn decompress_buffer( } else { // page.buffer is already decompressed => swap it with `buffer`, making `page.buffer` the // decompression buffer and `buffer` the decompressed buffer - std::mem::swap(compressed_page.buffer(), buffer); + std::mem::swap(&mut compressed_page.buffer().to_vec(), buffer); Ok(false) } } @@ -101,12 +102,12 @@ fn create_page(compressed_page: CompressedPage, buffer: Vec) -> Page { match compressed_page { CompressedPage::Data(page) => Page::Data(DataPage::new_read( page.header, - buffer, + CowBuffer::Owned(buffer), page.descriptor, page.selected_rows, )), CompressedPage::Dict(page) => Page::Dict(DictPage { - buffer, + buffer: CowBuffer::Owned(buffer), num_values: page.num_values, is_sorted: page.is_sorted, }), @@ -132,7 +133,7 @@ fn decompress_reuse( let was_decompressed = decompress_buffer(&mut compressed_page, buffer)?; if was_decompressed { - iterator.swap_buffer(compressed_page.buffer()) + iterator.swap_buffer(&mut compressed_page.buffer().to_vec()) }; let new_page = create_page(compressed_page, std::mem::take(buffer)); @@ -210,9 +211,9 @@ impl FallibleStreamingIterator for Decompressor

{ fn advance(&mut self) -> ParquetResult<()> { if let Some(page) = self.current.as_mut() { if self.was_decompressed { - self.buffer = std::mem::take(page.buffer()); + self.buffer = std::mem::take(page.buffer_mut()); } else { - self.iter.swap_buffer(page.buffer()); + self.iter.swap_buffer(page.buffer_mut()); } } @@ -255,7 +256,7 @@ impl streaming_decompression::Compressed for CompressedPage { impl streaming_decompression::Decompressed for Page { #[inline] fn buffer_mut(&mut self) -> &mut Vec { - self.buffer() + self.buffer_mut() } } diff --git a/crates/polars-parquet/src/parquet/read/mod.rs b/crates/polars-parquet/src/parquet/read/mod.rs index 861731e6a999..680ed676194b 100644 --- a/crates/polars-parquet/src/parquet/read/mod.rs +++ b/crates/polars-parquet/src/parquet/read/mod.rs @@ -7,7 +7,7 @@ mod page; #[cfg(feature = "async")] mod stream; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{Seek, SeekFrom}; use std::sync::Arc; pub use column::*; @@ -16,7 +16,10 @@ pub use indexes::{read_columns_indexes, read_pages_locations}; pub use metadata::{deserialize_metadata, read_metadata, read_metadata_with_size}; #[cfg(feature = "async")] pub use page::{get_page_stream, get_page_stream_from_column_start}; -pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader}; +pub use page::{ + CowBuffer, IndexedPageReader, MemReader, MemReaderSlice, PageFilter, PageIterator, + PageMetaData, PageReader, +}; #[cfg(feature = "async")] pub use stream::read_metadata as read_metadata_async; @@ -41,13 +44,13 @@ pub fn filter_row_groups( } /// Returns a new [`PageReader`] by seeking `reader` to the beginning of `column_chunk`. -pub fn get_page_iterator( +pub fn get_page_iterator( column_chunk: &ColumnChunkMetaData, - mut reader: R, + mut reader: MemReader, pages_filter: Option, scratch: Vec, max_page_size: usize, -) -> ParquetResult> { +) -> ParquetResult { let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true)); let (col_start, _) = column_chunk.byte_range(); diff --git a/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs b/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs index 1f513fbe2758..7f8bc34764c4 100644 --- a/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/indexed_reader.rs @@ -1,7 +1,9 @@ use std::collections::VecDeque; -use std::io::{Cursor, Read, Seek, SeekFrom}; +use std::io::{Seek, SeekFrom}; +use super::memreader::MemReader; use super::reader::{finish_page, read_page_header, PageMetaData}; +use super::MemReaderSlice; use crate::parquet::error::ParquetError; use crate::parquet::indexes::{FilteredPage, Interval}; use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor}; @@ -17,9 +19,9 @@ enum State { /// A fallible [`Iterator`] of [`CompressedPage`]. This iterator leverages page indexes /// to skip pages that are not needed. Consequently, the pages from this /// iterator always have [`Some`] [`crate::parquet::page::CompressedDataPage::selected_rows()`] -pub struct IndexedPageReader { +pub struct IndexedPageReader { // The source - reader: R, + reader: MemReader, column_start: u64, compression: Compression, @@ -38,43 +40,34 @@ pub struct IndexedPageReader { state: State, } -fn read_page( - reader: &mut R, +fn read_page( + reader: &mut MemReader, start: u64, length: usize, - buffer: &mut Vec, - data: &mut Vec, -) -> Result { +) -> Result<(ParquetPageHeader, MemReaderSlice), ParquetError> { // seek to the page reader.seek(SeekFrom::Start(start))?; - // read [header][data] to buffer - buffer.clear(); - buffer.try_reserve(length)?; - reader.by_ref().take(length as u64).read_to_end(buffer)?; + let start_position = reader.position(); // deserialize [header] - let mut reader = Cursor::new(buffer); - let page_header = read_page_header(&mut reader, 1024 * 1024)?; - let header_size = reader.stream_position().unwrap() as usize; - let buffer = reader.into_inner(); + let page_header = read_page_header(reader, 1024 * 1024)?; + let header_size = reader.position() - start_position; // copy [data] - data.clear(); - data.extend_from_slice(&buffer[header_size..]); - Ok(page_header) + let data = reader.read_slice(length - header_size); + + Ok((page_header, data)) } -fn read_dict_page( - reader: &mut R, +fn read_dict_page( + reader: &mut MemReader, start: u64, length: usize, - buffer: &mut Vec, - data: &mut Vec, compression: Compression, descriptor: &Descriptor, ) -> Result { - let page_header = read_page(reader, start, length, buffer, data)?; + let (page_header, data) = read_page(reader, start, length)?; let page = finish_page(page_header, data, compression, descriptor, None)?; if let CompressedPage::Dict(page) = page { @@ -86,10 +79,10 @@ fn read_dict_page( } } -impl IndexedPageReader { +impl IndexedPageReader { /// Returns a new [`IndexedPageReader`]. pub fn new( - reader: R, + reader: MemReader, column: &ColumnChunkMetaData, pages: Vec, buffer: Vec, @@ -100,7 +93,7 @@ impl IndexedPageReader { /// Returns a new [`IndexedPageReader`] with [`PageMetaData`]. pub fn new_with_page_meta( - reader: R, + reader: MemReader, column: PageMetaData, pages: Vec, buffer: Vec, @@ -120,7 +113,7 @@ impl IndexedPageReader { } /// consumes self into the reader and the two internal buffers - pub fn into_inner(self) -> (R, Vec, Vec) { + pub fn into_inner(self) -> (MemReader, Vec, Vec) { (self.reader, self.buffer, self.data_buffer) } @@ -130,14 +123,11 @@ impl IndexedPageReader { length: usize, selected_rows: Vec, ) -> Result { - // it will be read - take buffer - let mut data = std::mem::take(&mut self.data_buffer); - - let page_header = read_page(&mut self.reader, start, length, &mut self.buffer, &mut data)?; + let (page_header, data) = read_page(&mut self.reader, start, length)?; finish_page( page_header, - &mut data, + data, self.compression, &self.descriptor, Some(selected_rows), @@ -159,15 +149,10 @@ impl IndexedPageReader { None => return None, }; - // it will be read - take buffer - let mut data = std::mem::take(&mut self.data_buffer); - let maybe_page = read_dict_page( &mut self.reader, start, length, - &mut self.buffer, - &mut data, self.compression, &self.descriptor, ); @@ -175,7 +160,7 @@ impl IndexedPageReader { } } -impl Iterator for IndexedPageReader { +impl Iterator for IndexedPageReader { type Item = Result; fn next(&mut self) -> Option { diff --git a/crates/polars-parquet/src/parquet/read/page/memreader.rs b/crates/polars-parquet/src/parquet/read/page/memreader.rs new file mode 100644 index 000000000000..4c200e506474 --- /dev/null +++ b/crates/polars-parquet/src/parquet/read/page/memreader.rs @@ -0,0 +1,191 @@ +use std::io; +use std::ops::Deref; +use std::sync::Arc; + +/// A cursor over a segment of heap allocated memory. This is used for the Parquet reader to avoid +/// sequential allocations. +#[derive(Debug, Clone)] +pub struct MemReader { + data: Arc<[u8]>, + position: usize, +} + +/// A reference to a slice of a memory reader. +/// +/// This should not outlast the original the original [`MemReader`] because it still owns all the +/// memory. +#[derive(Debug, Clone)] +pub struct MemReaderSlice { + data: Arc<[u8]>, + start: usize, + end: usize, +} + +impl Default for MemReaderSlice { + fn default() -> Self { + let slice: &[u8] = &[]; + Self { + data: Arc::from(slice), + start: 0, + end: 0, + } + } +} + +impl Deref for MemReaderSlice { + type Target = [u8]; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.data[self.start..self.end] + } +} + +#[derive(Debug, Clone)] +pub enum CowBuffer { + Borrowed(MemReaderSlice), + Owned(Vec), +} + +impl Deref for CowBuffer { + type Target = [u8]; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + match self { + CowBuffer::Borrowed(v) => v.deref(), + CowBuffer::Owned(v) => v.deref(), + } + } +} + +impl MemReader { + #[inline(always)] + pub fn new(data: Arc<[u8]>) -> Self { + Self { data, position: 0 } + } + + #[inline(always)] + pub fn len(&self) -> usize { + self.data.len() + } + + #[inline(always)] + pub fn remaining_len(&self) -> usize { + self.data.len() - self.position + } + + #[inline(always)] + pub fn position(&self) -> usize { + self.position + } + + #[inline(always)] + pub fn from_slice(data: &[u8]) -> Self { + let data = data.into(); + Self { data, position: 0 } + } + + #[inline(always)] + pub fn from_vec(data: Vec) -> Self { + let data = data.into_boxed_slice().into(); + Self { data, position: 0 } + } + + #[inline(always)] + pub fn from_reader(mut reader: R) -> io::Result { + let mut vec = Vec::new(); + reader.read_to_end(&mut vec)?; + Ok(Self::from_vec(vec)) + } + + #[inline(always)] + pub fn read_slice(&mut self, n: usize) -> MemReaderSlice { + let start = self.position; + let end = usize::min(self.position + n, self.data.len()); + + self.position = end; + + MemReaderSlice { + data: self.data.clone(), + start, + end, + } + } +} + +impl io::Read for MemReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let n = usize::min(buf.len(), self.remaining_len()); + buf[..n].copy_from_slice(&self.data[self.position..self.position + n]); + self.position += n; + Ok(n) + } +} + +impl io::Seek for MemReader { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let position = match pos { + io::SeekFrom::Start(position) => usize::min(position as usize, self.len()), + io::SeekFrom::End(offset) => { + let Some(position) = self.len().checked_add_signed(offset as isize) else { + return Err(io::Error::new( + io::ErrorKind::Other, + "Seek before to before buffer", + )); + }; + + position + }, + io::SeekFrom::Current(offset) => { + let Some(position) = self.len().checked_add_signed(offset as isize) else { + return Err(io::Error::new( + io::ErrorKind::Other, + "Seek before to before buffer", + )); + }; + + position + }, + }; + + eprintln!( + "pos = {}, new_pos = {}, seek = {:?}", + self.position, position, pos + ); + + self.position = position; + + Ok(position as u64) + } +} + +impl MemReaderSlice { + #[inline(always)] + pub fn to_vec(self) -> Vec { + <[u8]>::to_vec(&self) + } + + #[inline] + pub fn from_vec(v: Vec) -> Self { + let end = v.len(); + + Self { + data: v.into(), + start: 0, + end, + } + } +} + +impl CowBuffer { + pub fn to_mut(&mut self) -> &mut Vec { + match self { + CowBuffer::Borrowed(v) => { + *self = Self::Owned(v.clone().to_vec()); + self.to_mut() + }, + CowBuffer::Owned(v) => v, + } + } +} diff --git a/crates/polars-parquet/src/parquet/read/page/mod.rs b/crates/polars-parquet/src/parquet/read/page/mod.rs index a8f1396d37d6..350e49257e79 100644 --- a/crates/polars-parquet/src/parquet/read/page/mod.rs +++ b/crates/polars-parquet/src/parquet/read/page/mod.rs @@ -1,9 +1,11 @@ mod indexed_reader; +mod memreader; mod reader; #[cfg(feature = "async")] mod stream; pub use indexed_reader::IndexedPageReader; +pub use memreader::{CowBuffer, MemReader, MemReaderSlice}; pub use reader::{PageFilter, PageMetaData, PageReader}; use crate::parquet::error::ParquetError; diff --git a/crates/polars-parquet/src/parquet/read/page/reader.rs b/crates/polars-parquet/src/parquet/read/page/reader.rs index 0549ecc26191..82b542ee1eb1 100644 --- a/crates/polars-parquet/src/parquet/read/page/reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/reader.rs @@ -1,9 +1,9 @@ -use std::io::Read; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use parquet_format_safe::thrift::protocol::TCompactInputProtocol; -use super::PageIterator; +use super::memreader::MemReader; +use super::{MemReaderSlice, PageIterator}; use crate::parquet::compression::Compression; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::indexes::Interval; @@ -13,6 +13,7 @@ use crate::parquet::page::{ ParquetPageHeader, }; use crate::parquet::parquet_bridge::Encoding; +use crate::parquet::read::CowBuffer; /// This meta is a small part of [`ColumnChunkMetaData`]. #[derive(Debug, Clone, PartialEq, Eq)] @@ -63,9 +64,9 @@ pub type PageFilter = Arc bool + Send + /// The pages from this iterator always have [`None`] [`crate::parquet::page::CompressedDataPage::selected_rows()`] since /// filter pushdown is not supported without a /// pre-computed [page index](https://github.com/apache/parquet-format/blob/master/PageIndex.md). -pub struct PageReader { +pub struct PageReader { // The source - reader: R, + reader: MemReader, compression: Compression, @@ -86,13 +87,13 @@ pub struct PageReader { max_page_size: usize, } -impl PageReader { +impl PageReader { /// Returns a new [`PageReader`]. /// /// It assumes that the reader has been `sought` (`seek`) to the beginning of `column`. /// The parameter `max_header_size` pub fn new( - reader: R, + reader: MemReader, column: &ColumnChunkMetaData, pages_filter: PageFilter, scratch: Vec, @@ -105,7 +106,7 @@ impl PageReader { /// /// It assumes that the reader has been `sought` (`seek`) to the beginning of `column`. pub fn new_with_page_meta( - reader: R, + reader: MemReader, reader_meta: PageMetaData, pages_filter: PageFilter, scratch: Vec, @@ -124,23 +125,23 @@ impl PageReader { } /// Returns the reader and this Readers' interval buffer - pub fn into_inner(self) -> (R, Vec) { + pub fn into_inner(self) -> (MemReader, Vec) { (self.reader, self.scratch) } } -impl PageIterator for PageReader { +impl PageIterator for PageReader { fn swap_buffer(&mut self, scratch: &mut Vec) { std::mem::swap(&mut self.scratch, scratch) } } -impl Iterator for PageReader { +impl Iterator for PageReader { type Item = ParquetResult; fn next(&mut self) -> Option { let mut buffer = std::mem::take(&mut self.scratch); - let maybe_maybe_page = next_page(self, &mut buffer).transpose(); + let maybe_maybe_page = next_page(self).transpose(); if let Some(ref maybe_page) = maybe_maybe_page { if let Ok(CompressedPage::Data(page)) = maybe_page { // check if we should filter it (only valid for data pages) @@ -159,8 +160,8 @@ impl Iterator for PageReader { } /// Reads Page header from Thrift. -pub(super) fn read_page_header( - reader: &mut R, +pub(super) fn read_page_header( + reader: &mut MemReader, max_size: usize, ) -> ParquetResult { let mut prot = TCompactInputProtocol::new(reader, max_size); @@ -170,20 +171,14 @@ pub(super) fn read_page_header( /// This function is lightweight and executes a minimal amount of work so that it is IO bounded. // Any un-necessary CPU-intensive tasks SHOULD be executed on individual pages. -fn next_page( - reader: &mut PageReader, - buffer: &mut Vec, -) -> ParquetResult> { +fn next_page(reader: &mut PageReader) -> ParquetResult> { if reader.seen_num_values >= reader.total_num_values { return Ok(None); }; - build_page(reader, buffer) + build_page(reader) } -pub(super) fn build_page( - reader: &mut PageReader, - buffer: &mut Vec, -) -> ParquetResult> { +pub(super) fn build_page(reader: &mut PageReader) -> ParquetResult> { let page_header = read_page_header(&mut reader.reader, reader.max_page_size)?; reader.seen_num_values += get_page_header(&page_header)? @@ -196,15 +191,9 @@ pub(super) fn build_page( return Err(ParquetError::WouldOverAllocate); } - buffer.clear(); - buffer.try_reserve(read_size)?; - let bytes_read = reader - .reader - .by_ref() - .take(read_size as u64) - .read_to_end(buffer)?; + let buffer = reader.reader.read_slice(read_size); - if bytes_read != read_size { + if buffer.len() != read_size { return Err(ParquetError::oos( "The page header reported the wrong page size", )); @@ -222,13 +211,17 @@ pub(super) fn build_page( pub(super) fn finish_page( page_header: ParquetPageHeader, - data: &mut Vec, + data: MemReaderSlice, compression: Compression, descriptor: &Descriptor, selected_rows: Option>, ) -> ParquetResult { let type_ = page_header.type_.try_into()?; let uncompressed_page_size = page_header.uncompressed_page_size.try_into()?; + + static DO_VERBOSE: OnceLock = OnceLock::new(); + let do_verbose = *DO_VERBOSE.get_or_init(|| std::env::var("PARQUET_DO_VERBOSE").is_ok()); + match type_ { PageType::DictionaryPage => { let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| { @@ -236,11 +229,16 @@ pub(super) fn finish_page( "The page header type is a dictionary page but the dictionary header is empty", ) })?; + + if do_verbose { + println!("DictPage ( )"); + } + let is_sorted = dict_header.is_sorted.unwrap_or(false); // move the buffer to `dict_page` let page = CompressedDictPage::new( - std::mem::take(data), + CowBuffer::Borrowed(data), compression, uncompressed_page_size, dict_header.num_values.try_into()?, @@ -256,9 +254,16 @@ pub(super) fn finish_page( ) })?; + if do_verbose { + println!( + "DataPageV1 ( num_values: {}, datatype: {:?}, encoding: {:?} )", + header.num_values, descriptor.primitive_type, header.encoding + ); + } + Ok(CompressedPage::Data(CompressedDataPage::new_read( DataPageHeader::V1(header), - std::mem::take(data), + CowBuffer::Borrowed(data), compression, uncompressed_page_size, descriptor.clone(), @@ -272,9 +277,16 @@ pub(super) fn finish_page( ) })?; + if do_verbose { + println!( + "DataPageV2 ( num_values: {}, datatype: {:?}, encoding: {:?} )", + header.num_values, descriptor.primitive_type, header.encoding + ); + } + Ok(CompressedPage::Data(CompressedDataPage::new_read( DataPageHeader::V2(header), - std::mem::take(data), + CowBuffer::Borrowed(data), compression, uncompressed_page_size, descriptor.clone(), diff --git a/crates/polars-parquet/src/parquet/read/page/stream.rs b/crates/polars-parquet/src/parquet/read/page/stream.rs index ef4e3ac31e4f..ee4cc4f4cd2b 100644 --- a/crates/polars-parquet/src/parquet/read/page/stream.rs +++ b/crates/polars-parquet/src/parquet/read/page/stream.rs @@ -11,6 +11,7 @@ use crate::parquet::compression::Compression; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor}; use crate::parquet::page::{CompressedPage, ParquetPageHeader}; +use crate::parquet::read::MemReaderSlice; /// Returns a stream of compressed data pages pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>( @@ -118,7 +119,7 @@ fn _get_page_stream( yield finish_page( page_header, - &mut scratch, + MemReaderSlice::from_vec(std::mem::take(&mut scratch)), compression, &descriptor, None, diff --git a/crates/polars-parquet/src/parquet/write/compression.rs b/crates/polars-parquet/src/parquet/write/compression.rs index d2338a6a69ee..aa48d08c7b26 100644 --- a/crates/polars-parquet/src/parquet/write/compression.rs +++ b/crates/polars-parquet/src/parquet/write/compression.rs @@ -4,6 +4,7 @@ use crate::parquet::page::{ CompressedDataPage, CompressedDictPage, CompressedPage, DataPage, DataPageHeader, DictPage, Page, }; +use crate::parquet::read::CowBuffer; use crate::parquet::{compression, FallibleStreamingIterator}; /// Compresses a [`DataPage`] into a [`CompressedDataPage`]. @@ -37,11 +38,12 @@ fn compress_data( }, }; } else { - std::mem::swap(&mut buffer, &mut compressed_buffer); - }; + std::mem::swap(buffer.to_mut(), &mut compressed_buffer); + } + Ok(CompressedDataPage::new_read( header, - compressed_buffer, + CowBuffer::Owned(compressed_buffer), compression.into(), uncompressed_page_size, descriptor, @@ -55,16 +57,19 @@ fn compress_dict( compression: CompressionOptions, ) -> ParquetResult { let DictPage { - mut buffer, + buffer, num_values, is_sorted, } = page; + let uncompressed_page_size = buffer.len(); - if compression != CompressionOptions::Uncompressed { + let compressed_buffer = if compression != CompressionOptions::Uncompressed { compression::compress(compression, &buffer, &mut compressed_buffer)?; + CowBuffer::Owned(compressed_buffer) } else { - std::mem::swap(&mut buffer, &mut compressed_buffer); - } + buffer + }; + Ok(CompressedDictPage::new( compressed_buffer, compression.into(), @@ -124,7 +129,7 @@ impl>> Compressor { /// Deconstructs itself into its iterator and scratch buffer. pub fn into_inner(mut self) -> (I, Vec) { let mut buffer = if let Some(page) = self.current.as_mut() { - std::mem::take(page.buffer()) + std::mem::take(page.buffer_mut()) } else { std::mem::take(&mut self.buffer) }; @@ -139,7 +144,7 @@ impl>> FallibleStreamingIterator for Comp fn advance(&mut self) -> std::result::Result<(), Self::Error> { let mut compressed_buffer = if let Some(page) = self.current.as_mut() { - std::mem::take(page.buffer()) + std::mem::take(page.buffer_mut()) } else { std::mem::take(&mut self.buffer) }; diff --git a/crates/polars-parquet/src/parquet/write/page.rs b/crates/polars-parquet/src/parquet/write/page.rs index f604afa55b86..1f02b5ba570b 100644 --- a/crates/polars-parquet/src/parquet/write/page.rs +++ b/crates/polars-parquet/src/parquet/write/page.rs @@ -218,11 +218,12 @@ async fn write_page_header_async( #[cfg(test)] mod tests { use super::*; + use crate::parquet::read::CowBuffer; #[test] fn dict_too_large() { let page = CompressedDictPage::new( - vec![], + CowBuffer::Owned(vec![]), Compression::Uncompressed, i32::MAX as usize + 1, 100, @@ -234,7 +235,7 @@ mod tests { #[test] fn dict_too_many_values() { let page = CompressedDictPage::new( - vec![], + CowBuffer::Owned(vec![]), Compression::Uncompressed, 0, i32::MAX as usize + 1, diff --git a/crates/polars/tests/it/io/parquet/read/mod.rs b/crates/polars/tests/it/io/parquet/read/mod.rs index e12a42b8f3c0..3923ca8cf537 100644 --- a/crates/polars/tests/it/io/parquet/read/mod.rs +++ b/crates/polars/tests/it/io/parquet/read/mod.rs @@ -25,8 +25,8 @@ use polars_parquet::parquet::read::get_page_stream; #[cfg(feature = "async")] use polars_parquet::parquet::read::read_metadata_async; use polars_parquet::parquet::read::{ - get_column_iterator, get_field_columns, read_metadata, BasicDecompressor, MutStreamingIterator, - State, + get_column_iterator, get_field_columns, read_metadata, BasicDecompressor, MemReader, + MutStreamingIterator, State, }; use polars_parquet::parquet::schema::types::{GroupConvertedType, ParquetType}; use polars_parquet::parquet::schema::Repetition; @@ -209,12 +209,12 @@ where } } -pub fn read_column( - reader: &mut R, +pub fn read_column( + mut reader: MemReader, row_group: usize, field_name: &str, ) -> ParquetResult<(Array, Option)> { - let metadata = read_metadata(reader)?; + let metadata = read_metadata(&mut reader)?; let field = metadata .schema() @@ -278,8 +278,9 @@ pub async fn read_column_async< } fn get_column(path: &str, column: &str) -> ParquetResult<(Array, Option)> { - let mut file = File::open(path).unwrap(); - read_column(&mut file, 0, column) + let file = File::open(path).unwrap(); + let memreader = MemReader::from_reader(file).unwrap(); + read_column(memreader, 0, column) } fn test_column(column: &str) -> ParquetResult<()> { diff --git a/crates/polars/tests/it/io/parquet/write/binary.rs b/crates/polars/tests/it/io/parquet/write/binary.rs index db2057f2c825..c53afd0bbdbd 100644 --- a/crates/polars/tests/it/io/parquet/write/binary.rs +++ b/crates/polars/tests/it/io/parquet/write/binary.rs @@ -3,6 +3,7 @@ use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::metadata::Descriptor; use polars_parquet::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page}; +use polars_parquet::parquet::read::CowBuffer; use polars_parquet::parquet::statistics::BinaryStatistics; use polars_parquet::parquet::types::ord_binary; use polars_parquet::parquet::write::WriteOptions; @@ -80,7 +81,7 @@ pub fn array_to_page_v1( Ok(Page::Data(DataPage::new( DataPageHeader::V1(header), - buffer, + CowBuffer::Owned(buffer), descriptor.clone(), Some(array.len()), ))) diff --git a/crates/polars/tests/it/io/parquet/write/indexes.rs b/crates/polars/tests/it/io/parquet/write/indexes.rs index c4524b74376d..83e13dc05bc7 100644 --- a/crates/polars/tests/it/io/parquet/write/indexes.rs +++ b/crates/polars/tests/it/io/parquet/write/indexes.rs @@ -7,7 +7,8 @@ use polars_parquet::parquet::indexes::{ }; use polars_parquet::parquet::metadata::SchemaDescriptor; use polars_parquet::parquet::read::{ - read_columns_indexes, read_metadata, read_pages_locations, BasicDecompressor, IndexedPageReader, + read_columns_indexes, read_metadata, read_pages_locations, BasicDecompressor, + IndexedPageReader, MemReader, }; use polars_parquet::parquet::schema::types::{ParquetType, PhysicalType, PrimitiveType}; use polars_parquet::parquet::write::{ @@ -59,7 +60,7 @@ fn write_file() -> ParquetResult> { #[test] fn read_indexed_page() -> ParquetResult<()> { let data = write_file()?; - let mut reader = Cursor::new(data); + let mut reader = MemReader::from_vec(data); let metadata = read_metadata(&mut reader)?; diff --git a/crates/polars/tests/it/io/parquet/write/mod.rs b/crates/polars/tests/it/io/parquet/write/mod.rs index 2f198e90a5c8..3fe24a4801a0 100644 --- a/crates/polars/tests/it/io/parquet/write/mod.rs +++ b/crates/polars/tests/it/io/parquet/write/mod.rs @@ -14,6 +14,7 @@ use polars_parquet::parquet::compression::{BrotliLevel, CompressionOptions}; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::metadata::{Descriptor, SchemaDescriptor}; use polars_parquet::parquet::page::Page; +use polars_parquet::parquet::read::MemReader; use polars_parquet::parquet::schema::types::{ParquetType, PhysicalType}; use polars_parquet::parquet::statistics::Statistics; #[cfg(feature = "async")] @@ -44,7 +45,8 @@ pub fn array_to_page( } fn read_column(reader: &mut R) -> ParquetResult<(Array, Option)> { - let (a, statistics) = super::read::read_column(reader, 0, "col")?; + let memreader = MemReader::from_reader(reader)?; + let (a, statistics) = super::read::read_column(memreader, 0, "col")?; Ok((a, statistics)) } diff --git a/crates/polars/tests/it/io/parquet/write/primitive.rs b/crates/polars/tests/it/io/parquet/write/primitive.rs index 5bae1339fa8d..305cced89d8b 100644 --- a/crates/polars/tests/it/io/parquet/write/primitive.rs +++ b/crates/polars/tests/it/io/parquet/write/primitive.rs @@ -3,6 +3,7 @@ use polars_parquet::parquet::encoding::Encoding; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::metadata::Descriptor; use polars_parquet::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page}; +use polars_parquet::parquet::read::CowBuffer; use polars_parquet::parquet::statistics::PrimitiveStatistics; use polars_parquet::parquet::types::NativeType; use polars_parquet::parquet::write::WriteOptions; @@ -71,7 +72,7 @@ pub fn array_to_page_v1( Ok(Page::Data(DataPage::new( DataPageHeader::V1(header), - buffer, + CowBuffer::Owned(buffer), descriptor.clone(), Some(array.len()), )))