Skip to content

Commit

Permalink
delay errors of rle-decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 9, 2024
1 parent bdd30b3 commit c8add0f
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 128 deletions.
19 changes: 12 additions & 7 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::{ArrowDataType, PhysicalType};
use arrow::offset::Offset;
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::utils::{extend_from_decoder, next, DecodedState, MaybeNext};
use super::super::{utils, PagesIter};
Expand Down Expand Up @@ -121,8 +122,9 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
&mut page_values
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize)),
)
.map(|index| page_dict.value(index as usize)),
);
page_values.values.get_result()?;
},
BinaryState::RequiredDictionary(page) => {
// Already done on the dict.
Expand All @@ -132,11 +134,12 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
for x in page
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize))
.map(|index| page_dict.value(index as usize))
.take(additional)
{
values.push(x)
}
page.values.get_result()?;
},
BinaryState::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
Expand All @@ -160,14 +163,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
// Already done on the dict.
validate_utf8 = false;
let page_dict = &page.dict;
for x in page
for x in &mut page
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize))
.map(|index| page_dict.value(index as usize))
.take(additional)
{
values.push(x)
}
page.values.iter.get_result()?;
},
BinaryState::FilteredOptionalDictionary(page_validity, page_values) => {
// Already done on the dict.
Expand All @@ -181,8 +185,9 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
&mut page_values
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize)),
)
.map(|index| page_dict.value(index as usize)),
);
page_values.values.get_result()?;
},
BinaryState::OptionalDeltaByteArray(page_validity, page_values) => extend_from_decoder(
validity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use arrow::offset::Offset;
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::nested_utils::*;
use super::super::utils::MaybeNext;
Expand Down Expand Up @@ -60,17 +61,19 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
let item = page
.values
.next()
.map(|index| dict_values.value(index.unwrap() as usize))
.map(|index| dict_values.value(index as usize))
.unwrap_or_default();
values.push(item);
page.values.get_result()?;
},
BinaryNestedState::OptionalDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
.next()
.map(|index| dict_values.value(index.unwrap() as usize))
.map(|index| dict_values.value(index as usize))
.unwrap_or_default();
page.values.get_result()?;
values.push(item);
validity.push(true);
},
Expand Down
17 changes: 11 additions & 6 deletions crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::array::{Array, ArrayRef, BinaryViewArray, MutableBinaryViewArray, Utf
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::binary::decoders::*;
use crate::parquet::page::{DataPage, DictPage};
Expand Down Expand Up @@ -110,8 +111,9 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder {
&mut page_values
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize)),
)
.map(|index| page_dict.value(index as usize)),
);
page_values.values.get_result()?;
},
BinaryState::RequiredDictionary(page) => {
// Already done on the dict.
Expand All @@ -121,11 +123,12 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder {
for x in page
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize))
.map(|index| page_dict.value(index as usize))
.take(additional)
{
values.push_value_ignore_validity(x)
}
page.values.get_result()?;
},
BinaryState::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
Expand Down Expand Up @@ -154,11 +157,12 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder {
for x in page
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize))
.map(|index| page_dict.value(index as usize))
.take(additional)
{
values.push_value_ignore_validity(x)
}
page.values.iter.get_result()?;
},
BinaryState::FilteredOptionalDictionary(page_validity, page_values) => {
// Already done on the dict.
Expand All @@ -174,8 +178,9 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder {
&mut page_values
.values
.by_ref()
.map(|index| page_dict.value(index.unwrap() as usize)),
)
.map(|index| page_dict.value(index as usize)),
);
page_values.values.get_result()?;
},
BinaryState::OptionalDeltaByteArray(page_validity, page_values) => extend_from_decoder(
validity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow::array::{ArrayRef, MutableBinaryViewArray};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::binary::decoders::{
Expand Down Expand Up @@ -60,19 +61,21 @@ impl<'a> NestedDecoder<'a> for BinViewDecoder {
let item = page
.values
.next()
.map(|index| dict_values.value(index.unwrap() as usize))
.map(|index| dict_values.value(index as usize))
.unwrap_or_default();
values.push_value_ignore_validity(item);
page.values.get_result()?;
},
BinaryNestedState::OptionalDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
.next()
.map(|index| dict_values.value(index.unwrap() as usize))
.map(|index| dict_values.value(index as usize))
.unwrap_or_default();
values.push_value_ignore_validity(item);
validity.push(true);
page.values.get_result()?;
},
}
Ok(())
Expand Down
79 changes: 40 additions & 39 deletions crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,29 +154,28 @@ where
) -> PolarsResult<()> {
let (values, validity) = decoded;
match state {
State::Optional(page) => extend_from_decoder(
validity,
&mut page.validity,
Some(remaining),
values,
&mut page.values.by_ref().map(|x| {
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Err(_) => panic!("The maximum key is too small"),
}
}),
),
State::Optional(page) => {
extend_from_decoder(
validity,
&mut page.validity,
Some(remaining),
values,
&mut page.values.by_ref().map(|x| {
match (x as usize).try_into() {
Ok(key) => key,
// todo: convert this to an error.
Err(_) => panic!("The maximum key is too small"),
}
}),
);
page.values.get_result()?;
},
State::Required(page) => {
values.extend(
page.values
.by_ref()
.map(|x| {
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
let x: K = match (x as usize).try_into() {
Ok(key) => key,
// todo: convert this to an error.
Err(_) => {
Expand All @@ -187,33 +186,33 @@ where
})
.take(remaining),
);
page.values.get_result()?;
},
State::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
&mut page_values.by_ref().map(|x| {
let x: K = match (x as usize).try_into() {
Ok(key) => key,
// todo: convert this to an error.
Err(_) => {
panic!("The maximum key is too small")
},
};
x
}),
);
page_values.get_result()?;
},
State::FilteredOptional(page_validity, page_values) => extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
&mut page_values.by_ref().map(|x| {
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Err(_) => {
panic!("The maximum key is too small")
},
};
x
}),
),
State::FilteredRequired(page) => {
values.extend(
page.values
.by_ref()
.map(|x| {
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
let x: K = match (x as usize).try_into() {
Ok(key) => key,
// todo: convert this to an error.
Err(_) => {
Expand All @@ -224,6 +223,7 @@ where
})
.take(remaining),
);
page.values.iter.get_result()?;
},
}
Ok(())
Expand Down Expand Up @@ -319,3 +319,4 @@ pub(super) fn next_dict<K: DictionaryKey, I: PagesIter, F: Fn(&DictPage) -> Box<

pub use nested::next_dict as nested_next_dict;
use polars_error::{polars_err, PolarsResult};
use polars_utils::iter::FallibleIterator;
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow::array::{Array, DictionaryArray, DictionaryKey};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::{polars_err, PolarsResult};
use polars_utils::iter::FallibleIterator;

use super::super::super::PagesIter;
use super::super::nested_utils::*;
Expand Down Expand Up @@ -112,20 +113,18 @@ impl<'a, K: DictionaryKey> NestedDecoder<'a> for DictionaryDecoder<K> {
let (values, validity) = decoded;
match state {
State::Optional(page_values) => {
let key = page_values.next().transpose()?;
// todo: convert unwrap to error
let key = match K::try_from(key.unwrap_or_default() as usize) {
Ok(key) => key,
Err(_) => todo!(),
let key = page_values.next().unwrap_or_default();
let Ok(key) = K::try_from(key as usize) else {
panic! {}
};
values.push(key);
validity.push(true);
page_values.get_result()?;
},
State::Required(page_values) => {
let key = page_values.values.next().transpose()?;
let key = match K::try_from(key.unwrap_or_default() as usize) {
Ok(key) => key,
Err(_) => todo!(),
let key = page_values.values.next().unwrap_or_default();
let Ok(key) = K::try_from(key as usize) else {
panic! {}
};
values.push(key);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use arrow::pushable::Pushable;
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::utils::{
dict_indices_decoder, extend_from_decoder, get_selected_rows, next, not_implemented,
Expand Down Expand Up @@ -229,28 +230,32 @@ impl<'a> Decoder<'a> for BinaryDecoder {
values.push(x)
}
},
State::OptionalDictionary(page) => extend_from_decoder(
validity,
&mut page.validity,
Some(remaining),
values,
page.values.by_ref().map(|index| {
let index = index.unwrap() as usize;
&page.dict[index * self.size..(index + 1) * self.size]
}),
),
State::OptionalDictionary(page) => {
extend_from_decoder(
validity,
&mut page.validity,
Some(remaining),
values,
page.values.by_ref().map(|index| {
let index = index as usize;
&page.dict[index * self.size..(index + 1) * self.size]
}),
);
page.values.get_result()?;
},
State::RequiredDictionary(page) => {
for x in page
.values
.by_ref()
.map(|index| {
let index = index.unwrap() as usize;
let index = index as usize;
&page.dict[index * self.size..(index + 1) * self.size]
})
.take(remaining)
{
values.push(x)
}
page.values.get_result()?;
},
State::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
Expand Down
Loading

0 comments on commit c8add0f

Please sign in to comment.