Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Array statistics in parquet #15031

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/polars-parquet/src/arrow/read/statistics/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ pub struct DynMutableListArray {
impl DynMutableListArray {
pub fn try_with_capacity(data_type: ArrowDataType, capacity: usize) -> PolarsResult<Self> {
let inner = match data_type.to_logical_type() {
ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => inner.data_type(),
ArrowDataType::List(inner)
| ArrowDataType::LargeList(inner)
| ArrowDataType::FixedSizeList(inner, _) => inner.data_type(),
_ => unreachable!(),
};
let inner = make_mutable(inner, capacity)?;
Expand Down Expand Up @@ -60,6 +62,11 @@ impl MutableArray for DynMutableListArray {
None,
))
},
ArrowDataType::FixedSizeList(field, _) => Box::new(FixedSizeListArray::new(
ArrowDataType::FixedSizeList(field.clone(), inner.len()),
inner,
None,
)),
_ => unreachable!(),
}
}
Expand Down
106 changes: 44 additions & 62 deletions crates/polars-parquet/src/arrow/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,90 +62,76 @@ struct MutableStatistics {

impl From<MutableStatistics> for Statistics {
fn from(mut s: MutableStatistics) -> Self {
let null_count = if let PhysicalType::Struct = s.null_count.data_type().to_physical_type() {
s.null_count
let null_count = match s.null_count.data_type().to_physical_type() {
PhysicalType::Struct => s
.null_count
.as_box()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::Map = s.null_count.data_type().to_physical_type() {
s.null_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::List = s.null_count.data_type().to_physical_type() {
s.null_count
.boxed(),
PhysicalType::List => s
.null_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::LargeList = s.null_count.data_type().to_physical_type() {
s.null_count
.boxed(),
PhysicalType::LargeList => s
.null_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i64>>()
.unwrap()
.clone()
.boxed()
} else {
s.null_count
.boxed(),
_ => s
.null_count
.as_box()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.clone()
.boxed()
.boxed(),
};
let distinct_count = if let PhysicalType::Struct =
s.distinct_count.data_type().to_physical_type()
{
s.distinct_count

let distinct_count = match s.distinct_count.data_type().to_physical_type() {
PhysicalType::Struct => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::Map = s.distinct_count.data_type().to_physical_type() {
s.distinct_count
.as_box()
.as_any()
.downcast_ref::<MapArray>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::List = s.distinct_count.data_type().to_physical_type() {
s.distinct_count
.boxed(),
PhysicalType::List => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i32>>()
.unwrap()
.clone()
.boxed()
} else if let PhysicalType::LargeList = s.distinct_count.data_type().to_physical_type() {
s.distinct_count
.boxed(),
PhysicalType::LargeList => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<ListArray<i64>>()
.unwrap()
.clone()
.boxed()
} else {
s.distinct_count
.boxed(),
_ => s
.distinct_count
.as_box()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.clone()
.boxed()
.boxed(),
};

Self {
null_count,
distinct_count,
Expand Down Expand Up @@ -180,9 +166,10 @@ fn make_mutable(data_type: &ArrowDataType, capacity: usize) -> PolarsResult<Box<
Box::new(MutableFixedSizeBinaryArray::try_new(data_type.clone(), vec![], None).unwrap())
as _
},
PhysicalType::LargeList | PhysicalType::List => Box::new(
PhysicalType::LargeList | PhysicalType::List | PhysicalType::FixedSizeList => Box::new(
DynMutableListArray::try_with_capacity(data_type.clone(), capacity)?,
) as Box<dyn MutableArray>,
)
as Box<dyn MutableArray>,
PhysicalType::Dictionary(_) => Box::new(
dictionary::DynMutableDictionary::try_with_capacity(data_type.clone(), capacity)?,
),
Expand Down Expand Up @@ -212,32 +199,27 @@ fn make_mutable(data_type: &ArrowDataType, capacity: usize) -> PolarsResult<Box<
}

fn create_dt(data_type: &ArrowDataType) -> ArrowDataType {
if let ArrowDataType::Struct(fields) = data_type.to_logical_type() {
ArrowDataType::Struct(
match data_type.to_logical_type() {
ArrowDataType::Struct(fields) => ArrowDataType::Struct(
fields
.iter()
.map(|f| Field::new(&f.name, create_dt(&f.data_type), f.is_nullable))
.collect(),
)
} else if let ArrowDataType::Map(f, ordered) = data_type.to_logical_type() {
ArrowDataType::Map(
),
ArrowDataType::Map(f, ordered) => ArrowDataType::Map(
Box::new(Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)),
*ordered,
)
} else if let ArrowDataType::List(f) = data_type.to_logical_type() {
ArrowDataType::List(Box::new(Field::new(
&f.name,
create_dt(&f.data_type),
f.is_nullable,
)))
} else if let ArrowDataType::LargeList(f) = data_type.to_logical_type() {
ArrowDataType::LargeList(Box::new(Field::new(
),
ArrowDataType::LargeList(f) => ArrowDataType::LargeList(Box::new(Field::new(
&f.name,
create_dt(&f.data_type),
f.is_nullable,
)))
} else {
ArrowDataType::UInt64
))),
// FixedSizeList piggy backs on list
ArrowDataType::List(f) | ArrowDataType::FixedSizeList(f, _) => ArrowDataType::List(
Box::new(Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)),
),
_ => ArrowDataType::UInt64,
}
}

Expand Down Expand Up @@ -330,7 +312,7 @@ fn push(
null_count: &mut dyn MutableArray,
) -> PolarsResult<()> {
match min.data_type().to_logical_type() {
List(_) | LargeList(_) => {
List(_) | LargeList(_) | FixedSizeList(_, _) => {
let min = min
.as_mut_any()
.downcast_mut::<list::DynMutableListArray>()
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,15 @@ def test_parquet_array_dtype() -> None:
df = pl.DataFrame({"x": [[1, 2, 3]]})
df = df.cast({"x": pl.Array(pl.Int64, width=3)})
test_round_trip(df)


@pytest.mark.write_disk()
def test_parquet_array_statistics() -> None:
df = pl.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], "b": [1, 2, 3]})
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().filter(
pl.col("a") != [1, 2, 3]
).collect()
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().sink_parquet("test.parquet")
assert pl.scan_parquet("test.parquet").filter(
pl.col("a") != [1, 2, 3]
).collect().to_dict(as_series=False) == {"a": [[4, 5, 6], [7, 8, 9]], "b": [2, 3]}
Loading