Skip to content

Commit

Permalink
perf: Batch nested Parquet decoding (pola-rs#17542)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 10, 2024
1 parent daf2e49 commit a55d9bc
Show file tree
Hide file tree
Showing 19 changed files with 626 additions and 543 deletions.
64 changes: 64 additions & 0 deletions crates/polars-arrow/src/bitmap/utils/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use polars_utils::slice::load_padded_le_u64;

use super::get_bit_unchecked;
use crate::bitmap::MutableBitmap;
use crate::trusted_len::TrustedLen;

/// An iterator over bits according to the [LSB](https://en.wikipedia.org/wiki/Bit_numbering#Least_significant_bit),
Expand Down Expand Up @@ -131,6 +132,69 @@ impl<'a> BitmapIter<'a> {
pub fn num_remaining(&self) -> usize {
self.word_len + self.rest_len
}

/// Collect at most `n` elements from this iterator into `bitmap`
pub fn collect_n_into(&mut self, bitmap: &mut MutableBitmap, n: usize) {
fn collect_word(
word: &mut u64,
word_len: &mut usize,
bitmap: &mut MutableBitmap,
n: &mut usize,
) {
while *n > 0 && *word_len > 0 {
{
let trailing_ones = word.trailing_ones();
let shift = u32::min(usize::min(*n, u32::MAX as usize) as u32, trailing_ones);
*word = word.wrapping_shr(shift);
*word_len -= shift as usize;
*n -= shift as usize;

bitmap.extend_constant(shift as usize, true);
}

{
let trailing_zeros = u32::min(word.trailing_zeros(), *word_len as u32);
let shift = u32::min(usize::min(*n, u32::MAX as usize) as u32, trailing_zeros);
*word = word.wrapping_shr(shift);
*word_len -= shift as usize;
*n -= shift as usize;

bitmap.extend_constant(shift as usize, false);
}
}
}

let mut n = n;
bitmap.reserve(usize::min(n, self.num_remaining()));

collect_word(&mut self.word, &mut self.word_len, bitmap, &mut n);

if n == 0 {
return;
}

let num_words = n / 64;

if num_words > 0 {
bitmap.extend_from_slice(self.bytes, 0, num_words * 64);

self.bytes = unsafe { self.bytes.get_unchecked(num_words * 8..) };
self.rest_len -= num_words * 64;
n -= num_words * 64;
}

self.word_len = usize::min(self.rest_len, 64);
self.rest_len -= self.word_len;
unsafe {
let chunk = self.bytes.get_unchecked(..8).try_into().unwrap();
self.word = u64::from_le_bytes(chunk);
self.bytes = self.bytes.get_unchecked(8..);
}

collect_word(&mut self.word, &mut self.word_len, bitmap, &mut n);

debug_assert!(self.num_remaining() == 0 || n == 0);
}
}

impl<'a> Iterator for BitmapIter<'a> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String {

/// Construct an object_store `Path` from a string without any encoding/decoding.
pub fn object_path_from_string(path: String) -> PolarsResult<object_store::path::Path> {
object_store::path::Path::parse(&path).map_err(to_compute_err)
object_store::path::Path::parse(path).map_err(to_compute_err)
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::{delta_bitpacked, delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::error::ParquetResult;
use crate::parquet::page::{split_buffer, DataPage};
use crate::read::deserialize::utils::{page_is_filtered, page_is_optional};

pub(crate) type BinaryDict = BinaryArray<i64>;

#[derive(Debug)]
pub(crate) struct Required<'a> {
pub values: std::iter::Take<BinaryIter<'a>>,
pub values: BinaryIter<'a>,
}

impl<'a> Required<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let values = split_buffer(page)?.values;
let values = BinaryIter::new(values).take(page.num_values());
let values = BinaryIter::new(values, page.num_values());

Ok(Self { values })
}
Expand Down Expand Up @@ -139,12 +138,12 @@ impl<'a> Iterator for DeltaBytes<'a> {

#[derive(Debug)]
pub(crate) struct FilteredRequired<'a> {
pub values: SliceFilteredIter<std::iter::Take<BinaryIter<'a>>>,
pub values: SliceFilteredIter<BinaryIter<'a>>,
}

impl<'a> FilteredRequired<'a> {
pub fn new(page: &'a DataPage) -> Self {
let values = BinaryIter::new(page.buffer()).take(page.num_values());
let values = BinaryIter::new(page.buffer(), page.num_values());

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);
Expand Down Expand Up @@ -277,7 +276,7 @@ impl<'a> utils::PageState<'a> for BinaryState<'a> {
}

pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict {
let all = BinaryIter::new(values).take(num_values).collect::<Vec<_>>();
let all = BinaryIter::new(values, num_values).collect::<Vec<_>>();
let values_size = all.iter().map(|v| v.len()).sum::<usize>();
let mut dict_values = MutableBinaryValuesArray::<i64>::with_capacities(all.len(), values_size);
for v in all {
Expand Down Expand Up @@ -331,8 +330,7 @@ pub(crate) fn build_binary_state<'a>(
},
(Encoding::Plain, _, true, false) => {
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);
let values = BinaryIter::new(values, page.num_values());

Ok(BinaryState::Optional(
OptionalPageValidity::try_new(page)?,
Expand All @@ -348,7 +346,7 @@ pub(crate) fn build_binary_state<'a>(

Ok(BinaryState::FilteredOptional(
FilteredOptionalPageValidity::try_new(page)?,
BinaryIter::new(values),
BinaryIter::new(values, page.num_values()),
))
},
(Encoding::DeltaLengthByteArray, _, false, false) => {
Expand All @@ -375,54 +373,3 @@ pub(crate) fn build_binary_state<'a>(
_ => Err(utils::not_implemented(page)),
}
}

#[derive(Debug)]
pub(crate) enum BinaryNestedState<'a> {
Optional(BinaryIter<'a>),
Required(BinaryIter<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for BinaryNestedState<'a> {
fn len(&self) -> usize {
match self {
BinaryNestedState::Optional(validity) => validity.size_hint().0,
BinaryNestedState::Required(state) => state.size_hint().0,
BinaryNestedState::RequiredDictionary(required) => required.len(),
BinaryNestedState::OptionalDictionary(optional) => optional.len(),
}
}
}

pub(crate) fn build_nested_state<'a>(
page: &'a DataPage,
dict: Option<&'a BinaryDict>,
) -> PolarsResult<BinaryNestedState<'a>> {
let is_optional = page_is_optional(page);
let is_filtered = page_is_filtered(page);

match (page.encoding(), dict, is_optional, is_filtered) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
ValuesDictionary::try_new(page, dict).map(BinaryNestedState::RequiredDictionary)
},
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
ValuesDictionary::try_new(page, dict).map(BinaryNestedState::OptionalDictionary)
},
(Encoding::Plain, _, true, false) => {
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Ok(BinaryNestedState::Optional(values))
},
(Encoding::Plain, _, false, false) => {
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Ok(BinaryNestedState::Required(values))
},
_ => Err(utils::not_implemented(page)),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn read_dict<O: Offset>(data_type: ArrowDataType, dict: &DictPage) -> Box<dyn Ar
_ => data_type,
};

let values = BinaryIter::new(&dict.buffer).take(dict.num_values);
let values = BinaryIter::new(&dict.buffer, dict.num_values);

let mut data = Binary::<O>::with_capacity(dict.num_values);
data.values = Vec::with_capacity(dict.buffer.len() - 4 * dict.num_values);
Expand Down
123 changes: 82 additions & 41 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,49 @@ use arrow::array::Array;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use arrow::offset::Offset;
use arrow::pushable::Pushable;
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::nested_utils::*;
use super::super::utils::MaybeNext;
use super::basic::finish;
use super::decoders::*;
use super::utils::*;
use crate::arrow::read::PagesIter;
use crate::parquet::page::{DataPage, DictPage};
use crate::parquet::encoding::hybrid_rle::{DictionaryTranslator, Translator};
use crate::parquet::encoding::Encoding;
use crate::parquet::error::ParquetResult;
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::read::deserialize::utils::{
not_implemented, page_is_filtered, page_is_optional, PageState,
};

#[derive(Debug, Default)]
struct BinaryDecoder<O: Offset> {
phantom_o: std::marker::PhantomData<O>,
#[derive(Debug)]
pub struct State<'a> {
is_optional: bool,
translation: StateTranslation<'a>,
}

#[derive(Debug)]
pub enum StateTranslation<'a> {
Unit(BinaryIter<'a>),
Dictionary(ValuesDictionary<'a>, Option<Vec<&'a [u8]>>),
}

impl<'a> PageState<'a> for State<'a> {
fn len(&self) -> usize {
match &self.translation {
StateTranslation::Unit(iter) => iter.size_hint().0,
StateTranslation::Dictionary(values, _) => values.len(),
}
}
}

#[derive(Debug, Default)]
struct BinaryDecoder<O: Offset>(std::marker::PhantomData<O>);

impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
type State = BinaryNestedState<'a>;
type State = State<'a>;
type Dictionary = BinaryDict;
type DecodedState = (Binary<O>, MutableBitmap);

Expand All @@ -30,7 +55,29 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
page: &'a DataPage,
dict: Option<&'a Self::Dictionary>,
) -> PolarsResult<Self::State> {
build_nested_state(page, dict)
let is_optional = page_is_optional(page);
let is_filtered = page_is_filtered(page);

if is_filtered {
return Err(not_implemented(page));
}

let translation = match (page.encoding(), dict) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => {
StateTranslation::Dictionary(ValuesDictionary::try_new(page, dict)?, None)
},
(Encoding::Plain, _) => {
let values = split_buffer(page)?.values;
let values = BinaryIter::new(values, page.num_values());
StateTranslation::Unit(values)
},
_ => return Err(not_implemented(page)),
};

Ok(State {
is_optional,
translation,
})
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
Expand All @@ -40,51 +87,45 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
)
}

fn push_valid(
fn push_n_valid(
&self,
state: &mut Self::State,
decoded: &mut Self::DecodedState,
) -> PolarsResult<()> {
n: usize,
) -> ParquetResult<()> {
let (values, validity) = decoded;
match state {
BinaryNestedState::Optional(page) => {
let value = page.next().unwrap_or_default();
values.push(value);
validity.push(true);
},
BinaryNestedState::Required(page) => {
let value = page.next().unwrap_or_default();
values.push(value);
},
BinaryNestedState::RequiredDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
.next()
.map(|index| dict_values.value(index as usize))
.unwrap_or_default();
values.push(item);
page.values.get_result()?;

match &mut state.translation {
StateTranslation::Unit(page) => {
// @TODO: This can be optimized to not be a constantly polling
for value in page.by_ref().take(n) {
values.push(value);
}
},
BinaryNestedState::OptionalDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
.next()
.map(|index| dict_values.value(index as usize))
.unwrap_or_default();
page.values.get_result()?;
values.push(item);
validity.push(true);
StateTranslation::Dictionary(page, dict) => {
let dict =
dict.get_or_insert_with(|| page.dict.values_iter().collect::<Vec<&[u8]>>());
let translator = DictionaryTranslator(dict);

// @TODO: This can be optimized to not be a constantly polling
for value in page.values.by_ref().take(n) {
values.push(translator.translate(value)?);
}
},
}

if state.is_optional {
validity.extend_constant(n, true);
}

Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
fn push_n_nulls(&self, decoded: &mut Self::DecodedState, n: usize) {
let (values, validity) = decoded;
values.push(&[]);
validity.push(false);

values.extend_null_constant(n);
validity.extend_constant(n, false);
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary {
Expand Down
Loading

0 comments on commit a55d9bc

Please sign in to comment.