Skip to content

Commit

Permalink
perf: Introduce MemReader to file buffer in Parquet reader (#17712)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Jul 19, 2024
1 parent 83d225b commit a212ce9
Show file tree
Hide file tree
Showing 28 changed files with 411 additions and 168 deletions.
3 changes: 2 additions & 1 deletion crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::parquet::read::MemReader;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
Expand Down Expand Up @@ -73,7 +74,7 @@ pub(super) fn to_deserializer<'a>(
.into_iter()
.map(|(column_meta, chunk)| {
let pages = PageReader::new(
std::io::Cursor::new(chunk),
MemReader::from_slice(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl DecodedState for (FixedSizeBinary, MutableBitmap) {

impl<'a> Decoder<'a> for BinaryDecoder {
type Translation = StateTranslation<'a>;
type Dict = Vec<u8>;
type Dict = &'a [u8];
type DecodedState = (FixedSizeBinary, MutableBitmap);

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
Expand All @@ -156,8 +156,8 @@ impl<'a> Decoder<'a> for BinaryDecoder {
)
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dict {
page.buffer.clone()
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dict {
page.buffer.as_ref()
}
}

Expand Down Expand Up @@ -207,7 +207,7 @@ impl<I: PagesIter> Iterator for Iter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.dict.as_deref(),
&mut self.remaining,
self.chunk_size,
&BinaryDecoder { size: self.size },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn read_dict(data_type: ArrowDataType, dict: &DictPage) -> Box<dyn Array> {

let values = dict.buffer.clone();

FixedSizeBinaryArray::try_new(data_type, values.into(), None)
FixedSizeBinaryArray::try_new(data_type, values.to_vec().into(), None)
.unwrap()
.boxed()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct BinaryDecoder {

impl<'a> NestedDecoder<'a> for BinaryDecoder {
type State = State<'a>;
type Dictionary = Vec<u8>;
type Dictionary = &'a [u8];
type DecodedState = (FixedSizeBinary, MutableBitmap);

fn build_state(
Expand All @@ -71,7 +71,7 @@ impl<'a> NestedDecoder<'a> for BinaryDecoder {
let values = values.chunks_exact(self.size);
StateTranslation::Unit(values)
},
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(&dict), false) => {
let values = dict_indices_decoder(page)?;
StateTranslation::Dictionary { values, dict }
},
Expand Down Expand Up @@ -133,8 +133,8 @@ impl<'a> NestedDecoder<'a> for BinaryDecoder {
validity.extend_constant(n, false);
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary {
page.buffer.clone()
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dictionary {
page.buffer.as_ref()
}
}

Expand Down Expand Up @@ -179,7 +179,7 @@ impl<I: PagesIter> Iterator for NestedIter<I> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.dict.as_deref(),
&mut self.remaining,
&self.init,
self.chunk_size,
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ use simple::page_iter_to_arrays;
pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState};
pub use self::struct_::StructIterator;
use super::*;
use crate::parquet::read::get_page_iterator as _get_page_iterator;
use crate::parquet::read::{get_page_iterator as _get_page_iterator, MemReader};
use crate::parquet::schema::types::PrimitiveType;

/// Creates a new iterator of compressed pages.
pub fn get_page_iterator<R: Read + Seek>(
pub fn get_page_iterator(
column_metadata: &ColumnChunkMetaData,
reader: R,
reader: MemReader,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
max_header_size: usize,
) -> PolarsResult<PageReader<R>> {
) -> PolarsResult<PageReader> {
Ok(_get_page_iterator(
column_metadata,
reader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub(super) trait NestedDecoder<'a> {
) -> ParquetResult<()>;
fn push_n_nulls(&self, decoded: &mut Self::DecodedState, n: usize);

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary;
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dictionary;
}

/// The initial info of nested data types.
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/null/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod tests {
use crate::parquet::fallible_streaming_iterator;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::{PhysicalType, PrimitiveType};

#[test]
Expand All @@ -78,7 +79,7 @@ mod tests {
repetition_level_encoding: Encoding::Plain.into(),
statistics: None,
}),
vec![],
CowBuffer::Owned(vec![]),
Descriptor {
primitive_type: PrimitiveType::from_physical(
"a".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ where
match (self, page_validity) {
(Self::Unit(page), None) => {
values.extend(
page.by_ref()
.map(|v| decoder.decoder.decode(P::from_le_bytes(*v)))
page.map(|v| decoder.decoder.decode(P::from_le_bytes(*v)))
.take(additional),
);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ pub(super) trait Decoder<'a>: Sized {
fn with_capacity(&self, capacity: usize) -> Self::DecodedState;

/// Deserializes a [`DictPage`] into [`Self::Dict`].
fn deserialize_dict(&self, page: &DictPage) -> Self::Dict;
fn deserialize_dict(&self, page: &'a DictPage) -> Self::Dict;
}

pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>(
Expand Down
8 changes: 5 additions & 3 deletions crates/polars-parquet/src/arrow/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use super::{ArrayIter, RowGroupMetaData};
use crate::arrow::read::column_iter_to_arrays;
use crate::parquet::indexes::FilteredPage;
use crate::parquet::metadata::ColumnChunkMetaData;
use crate::parquet::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader};
use crate::parquet::read::{
BasicDecompressor, IndexedPageReader, MemReader, PageMetaData, PageReader,
};

/// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into
/// an iterator of [`RecordBatchT`].
Expand Down Expand Up @@ -164,7 +166,7 @@ pub fn to_deserializer<'a>(
.for_each(|page| page.start -= meta.column_start);
meta.column_start = 0;
let pages = IndexedPageReader::new_with_page_meta(
std::io::Cursor::new(chunk),
MemReader::from_vec(chunk),
meta,
pages,
vec![],
Expand All @@ -185,7 +187,7 @@ pub fn to_deserializer<'a>(
.map(|(column_meta, chunk)| {
let len = chunk.len();
let pages = PageReader::new(
std::io::Cursor::new(chunk),
MemReader::from_vec(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
Expand Down
31 changes: 25 additions & 6 deletions crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::arrow::write::{slice_nested_leaf, utils};
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DictPage, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;
use crate::write::DynIter;
Expand Down Expand Up @@ -202,7 +203,10 @@ macro_rules! dyn_prim {
} else {
None
};
(DictPage::new(buffer, values.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), values.len(), false),
stats,
)
}};
}

Expand Down Expand Up @@ -254,7 +258,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
ArrowDataType::BinaryView => {
let array = array
Expand All @@ -274,7 +281,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
ArrowDataType::Utf8View => {
let array = array
Expand All @@ -295,7 +305,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
ArrowDataType::LargeBinary => {
let values = array.values().as_any().downcast_ref().unwrap();
Expand All @@ -311,7 +324,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, values.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), values.len(), false),
stats,
)
},
ArrowDataType::FixedSizeBinary(_) => {
let mut buffer = vec![];
Expand All @@ -327,7 +343,10 @@ pub fn array_to_pages<K: DictionaryKey>(
} else {
None
};
(DictPage::new(buffer, array.len(), false), stats)
(
DictPage::new(CowBuffer::Owned(buffer), array.len(), false),
stats,
)
},
other => {
polars_bail!(nyi =
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-parquet/src/arrow/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;

Expand Down Expand Up @@ -89,7 +90,7 @@ pub fn build_plain_page(
};
Ok(DataPage::new(
header,
buffer,
CowBuffer::Owned(buffer),
Descriptor {
primitive_type: type_,
max_def_level: 0,
Expand Down
Loading

0 comments on commit a212ce9

Please sign in to comment.