Skip to content

Commit

Permalink
fix: Parquet several smaller issues (#18325)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored and ritchie46 committed Aug 23, 2024
1 parent bbab1a7 commit 11ba04f
Show file tree
Hide file tree
Showing 27 changed files with 671 additions and 249 deletions.
8 changes: 8 additions & 0 deletions crates/polars-arrow/src/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,14 @@ impl<O: Offset> OffsetsBuffer<O> {
pub fn into_inner(self) -> Buffer<O> {
self.0
}

/// Returns the offset difference between `start` and `end`.
#[inline]
pub fn delta(&self, start: usize, end: usize) -> usize {
assert!(start <= end);

(self.0[end + 1] - self.0[start]).to_usize()
}
}

impl From<&OffsetsBuffer<i32>> for OffsetsBuffer<i64> {
Expand Down
26 changes: 7 additions & 19 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ fn rg_to_dfs_prefiltered(
// column indexes of the schema.
let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns);
let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns);
for (i, col) in file_metadata.schema().columns().iter().enumerate() {
if live_variables.contains(col.path_in_schema[0].deref()) {
for (i, field) in schema.fields.iter().enumerate() {
if live_variables.contains(&field.name[..]) {
live_idx_to_col_idx.push(i);
} else {
dead_idx_to_col_idx.push(i);
Expand Down Expand Up @@ -437,22 +437,10 @@ fn rg_to_dfs_prefiltered(
})
.collect::<PolarsResult<Vec<_>>>()?;

let mut rearranged_schema: Schema = Schema::new();
if let Some(rc) = &row_index {
rearranged_schema.insert_at_index(
0,
SmartString::from(rc.name.deref()),
IdxType::get_dtype(),
)?;
}
for i in live_idx_to_col_idx.iter().copied() {
rearranged_schema.insert_at_index(
rearranged_schema.len(),
schema.fields[i].name.clone().into(),
schema.fields[i].data_type().into(),
)?;
}
rearranged_schema.merge(Schema::from(schema.as_ref()));
let Some(df) = dfs.first().map(|(_, df)| df) else {
return Ok(Vec::new());
};
let rearranged_schema = df.schema();

rg_columns
.par_chunks_exact_mut(num_dead_columns)
Expand Down Expand Up @@ -554,7 +542,7 @@ fn rg_to_dfs_optionally_par_over_columns(
materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1);
apply_predicate(&mut df, predicate, true)?;

*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or(
*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
polars_err!(
ComputeError: "Parquet file produces more than pow(2, 32) rows; \
consider compiling with polars-bigidx feature (polars-u64-idx package on python), \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaCollector<'a,
target.extend_constant(n);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaBytesCollector<'a, 'b, O> {
Expand All @@ -159,6 +163,10 @@ impl<'a, 'b, O: Offset> BatchableCollector<(), Binary<O>> for DeltaBytesCollecto
target.extend_constant(n);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder<O>> for BinaryStateTranslation<'a> {
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ impl<'a, 'b> BatchableCollector<(), MutableBinaryViewArray<[u8]>> for &mut Delta
target.extend_constant(n, <Option<&[u8]>>::None);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl<'a, 'b> DeltaCollector<'a, 'b> {
Expand Down Expand Up @@ -426,6 +430,10 @@ impl<'a, 'b> BatchableCollector<(), MutableBinaryViewArray<[u8]>> for DeltaBytes
target.extend_constant(n, <Option<&[u8]>>::None);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}

impl utils::Decoder for BinViewDecoder {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl<'a, 'b> BatchableCollector<u32, MutableBitmap> for BitmapCollector<'a, 'b>
target.extend_constant(n, false);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.0.skip_in_place(n)
}
}

impl ExactSize for (MutableBitmap, MutableBitmap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl<'a, 'b, K: DictionaryKey> BatchableCollector<(), Vec<K>> for DictArrayColle
target.resize(target.len() + n, K::default());
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.values.skip_in_place(n)
}
}

impl<K: DictionaryKey> Translator<K> for DictArrayTranslator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) enum StateTranslation<'a> {
Dictionary(hybrid_rle::HybridRleDecoder<'a>, &'a Vec<u8>),
}

#[derive(Debug)]
pub struct FixedSizeBinary {
pub values: Vec<u8>,
pub size: usize,
Expand Down Expand Up @@ -164,6 +165,12 @@ impl Decoder for BinaryDecoder {
target.resize(target.len() + n * self.size, 0);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
let n = usize::min(n, self.slice.len() / self.size);
*self.slice = &self.slice[n * self.size..];
Ok(())
}
}

let mut collector = FixedSizeBinaryCollector {
Expand Down
11 changes: 11 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ pub fn columns_to_iter_recursive(
)?
.collect_n(filter)?
},
Binary | Utf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
PageNestedDecoder::new(
columns.pop().unwrap(),
field.data_type().clone(),
binary::BinaryDecoder::<i32>::default(),
init,
)?
.collect_n(filter)?
},
_ => match field.data_type().to_logical_type() {
ArrowDataType::Dictionary(key_type, _, _) => {
init.push(InitNested::Primitive(field.is_nullable));
Expand Down
89 changes: 77 additions & 12 deletions crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Nested {

fn invalid_num_values(&self) -> usize {
match &self.content {
NestedContent::Primitive => 0,
NestedContent::Primitive => 1,
NestedContent::List { .. } => 0,
NestedContent::FixedSizeList { width } => *width,
NestedContent::Struct => 1,
Expand Down Expand Up @@ -204,6 +204,10 @@ impl<'a, 'b, 'c, D: utils::NestedDecoder> BatchableCollector<(), D::DecodedState
self.decoder.push_n_nulls(self.state, target, n);
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.state.skip_in_place(n)
}
}

/// The initial info of nested data types.
Expand Down Expand Up @@ -290,6 +294,67 @@ impl NestedState {
}
}

/// Calculate the number of leaf values that are covered by the first `limit` definition level
/// values.
fn limit_to_num_values(
def_iter: &HybridRleDecoder<'_>,
def_levels: &[u16],
limit: usize,
) -> ParquetResult<usize> {
struct NumValuesGatherer {
leaf_def_level: u16,
}
struct NumValuesState {
num_values: usize,
length: usize,
}

impl HybridRleGatherer<u32> for NumValuesGatherer {
type Target = NumValuesState;

fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {}

fn target_num_elements(&self, target: &Self::Target) -> usize {
target.length
}

fn hybridrle_to_target(&self, value: u32) -> ParquetResult<u32> {
Ok(value)
}

fn gather_one(&self, target: &mut Self::Target, value: u32) -> ParquetResult<()> {
target.num_values += usize::from(value == self.leaf_def_level as u32);
target.length += 1;
Ok(())
}

fn gather_repeated(
&self,
target: &mut Self::Target,
value: u32,
n: usize,
) -> ParquetResult<()> {
target.num_values += n * usize::from(value == self.leaf_def_level as u32);
target.length += n;
Ok(())
}
}

let mut state = NumValuesState {
num_values: 0,
length: 0,
};
def_iter.clone().gather_n_into(
&mut state,
limit,
&NumValuesGatherer {
leaf_def_level: *def_levels.last().unwrap(),
},
)?;

Ok(state.num_values)
}

fn idx_to_limit(rep_iter: &HybridRleDecoder<'_>, idx: usize) -> ParquetResult<usize> {
struct RowIdxOffsetGatherer;
struct RowIdxOffsetState {
Expand Down Expand Up @@ -384,7 +449,7 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(
>,
nested: &mut [Nested],
filter: Option<Filter>,
// Amortized allocations

def_levels: &[u16],
rep_levels: &[u16],
) -> PolarsResult<()> {
Expand Down Expand Up @@ -416,6 +481,9 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(
if start > 0 {
let start_cell = idx_to_limit(&rep_iter, start)?;

let num_skipped_values = limit_to_num_values(&def_iter, def_levels, start_cell)?;
batched_collector.skip_in_place(num_skipped_values)?;

rep_iter.skip_in_place(start_cell)?;
def_iter.skip_in_place(start_cell)?;
}
Expand All @@ -436,6 +504,8 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(

// @NOTE: This is kind of unused
let last_skip = def_iter.len();
let num_skipped_values = limit_to_num_values(&def_iter, def_levels, last_skip)?;
batched_collector.skip_in_place(num_skipped_values)?;
rep_iter.skip_in_place(last_skip)?;
def_iter.skip_in_place(last_skip)?;

Expand All @@ -447,6 +517,8 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>(
let num_zeros = iter.take_leading_zeros();
if num_zeros > 0 {
let offset = idx_to_limit(&rep_iter, num_zeros)?;
let num_skipped_values = limit_to_num_values(&def_iter, def_levels, offset)?;
batched_collector.skip_in_place(num_skipped_values)?;
rep_iter.skip_in_place(offset)?;
def_iter.skip_in_place(offset)?;
}
Expand Down Expand Up @@ -601,23 +673,16 @@ fn extend_offsets_limited<'a, D: utils::NestedDecoder>(
}
}

if embed_depth == max_depth - 1 {
for _ in 0..num_elements {
batched_collector.push_invalid();
}

break;
}

let embed_num_values = embed_nest.invalid_num_values();
num_elements *= embed_num_values;

if embed_num_values == 0 {
break;
}

num_elements *= embed_num_values;
}

batched_collector.push_n_invalids(num_elements);

break;
}

Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::parquet::error::ParquetResult;
use crate::parquet::page::{DataPage, DictPage};

pub(crate) struct NullDecoder;
#[derive(Debug)]
pub(crate) struct NullArrayLength {
length: usize,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ where
target.resize(target.len() + n, T::default());
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.chunks.skip_in_place(n);
Ok(())
}
}

#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -206,7 +211,7 @@ where
}

match self {
Self::Plain(t) => _ = t.nth(n - 1),
Self::Plain(t) => t.skip_in_place(n),
Self::Dictionary(t) => t.values.skip_in_place(n)?,
Self::ByteStreamSplit(t) => _ = t.iter_converted(|_| ()).nth(n - 1),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ where
}

match self {
Self::Plain(v) => _ = v.nth(n - 1),
Self::Plain(v) => v.skip_in_place(n),
Self::Dictionary(v) => v.values.skip_in_place(n)?,
Self::ByteStreamSplit(v) => _ = v.iter_converted(|_| ()).nth(n - 1),
Self::DeltaBinaryPacked(v) => v.skip_in_place(n)?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,8 @@ where
target.resize(target.len() + n, T::default());
Ok(())
}

fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> {
self.decoder.skip_in_place(n)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ impl<'a, P: ParquetNativeType> ArrayChunks<'a, P> {

Some(Self { bytes })
}

pub(crate) fn skip_in_place(&mut self, n: usize) {
let n = usize::min(self.bytes.len(), n);
self.bytes = &self.bytes[n..];
}
}

impl<'a, P: ParquetNativeType> Iterator for ArrayChunks<'a, P> {
Expand All @@ -36,13 +41,6 @@ impl<'a, P: ParquetNativeType> Iterator for ArrayChunks<'a, P> {
Some(item)
}

#[inline(always)]
fn nth(&mut self, n: usize) -> Option<Self::Item> {
let item = self.bytes.get(n)?;
self.bytes = &self.bytes[n + 1..];
Some(item)
}

#[inline(always)]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.bytes.len(), Some(self.bytes.len()))
Expand Down
Loading

0 comments on commit 11ba04f

Please sign in to comment.