diff --git a/crates/polars-parquet/src/arrow/read/mod.rs b/crates/polars-parquet/src/arrow/read/mod.rs index d663c4cc743d..b7fe702e8107 100644 --- a/crates/polars-parquet/src/arrow/read/mod.rs +++ b/crates/polars-parquet/src/arrow/read/mod.rs @@ -2,9 +2,7 @@ #![allow(clippy::type_complexity)] mod deserialize; -mod file; pub mod indexes; -mod row_group; pub mod schema; pub mod statistics; @@ -15,11 +13,9 @@ pub use deserialize::{ column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns, Filter, InitNested, NestedState, }; -pub use file::{FileReader, RowGroupReader}; #[cfg(feature = "async")] use futures::{AsyncRead, AsyncSeek}; use polars_error::PolarsResult; -pub use row_group::*; pub use schema::{infer_schema, FileMetaData}; use crate::parquet::error::ParquetResult; @@ -44,6 +40,33 @@ pub use crate::parquet::{ FallibleStreamingIterator, }; +/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. +/// For non-nested parquet types, this returns a single column +pub fn get_field_columns<'a>( + columns: &'a [ColumnChunkMetaData], + field_name: &str, +) -> Vec<&'a ColumnChunkMetaData> { + columns + .iter() + .filter(|x| x.descriptor().path_in_schema[0] == field_name) + .collect() +} + +/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. +/// For non-nested parquet types, this returns a single column +pub fn get_field_pages<'a, T>( + columns: &'a [ColumnChunkMetaData], + items: &'a [T], + field_name: &str, +) -> Vec<&'a T> { + columns + .iter() + .zip(items) + .filter(|(metadata, _)| metadata.descriptor().path_in_schema[0] == field_name) + .map(|(_, item)| item) + .collect() +} + /// Reads parquets' metadata synchronously. pub fn read_metadata(reader: &mut R) -> PolarsResult { Ok(_read_metadata(reader)?) diff --git a/crates/polars/tests/it/io/parquet/arrow/mod.rs b/crates/polars/tests/it/io/parquet/arrow/mod.rs index 477b9a1321df..11bee66dba73 100644 --- a/crates/polars/tests/it/io/parquet/arrow/mod.rs +++ b/crates/polars/tests/it/io/parquet/arrow/mod.rs @@ -15,6 +15,8 @@ use polars_parquet::read as p_read; use polars_parquet::read::statistics::*; use polars_parquet::write::*; +use super::read::file::FileReader; + type ArrayStats = (Box, Statistics); fn new_struct( @@ -52,7 +54,7 @@ pub fn read_column(mut reader: R, column: &str) -> PolarsResult< let statistics = deserialize(field, row_group)?; - let mut reader = p_read::FileReader::new(reader, metadata.row_groups, schema, None); + let mut reader = FileReader::new(reader, metadata.row_groups, schema, None); let array = reader.next().unwrap()?.into_arrays().pop().unwrap(); @@ -1302,7 +1304,7 @@ fn integration_read(data: &[u8], limit: Option) -> PolarsResult PolarsResult<()> { .map(|(_, row_group)| row_group) .collect(); - let reader = p_read::FileReader::new(reader, row_groups, schema, None); + let reader = FileReader::new(reader, row_groups, schema, None); let new_chunks = reader.collect::>>()?; diff --git a/crates/polars/tests/it/io/parquet/arrow/read.rs b/crates/polars/tests/it/io/parquet/arrow/read.rs index 6aaeb8c297cb..ffff1c99667b 100644 --- a/crates/polars/tests/it/io/parquet/arrow/read.rs +++ b/crates/polars/tests/it/io/parquet/arrow/read.rs @@ -3,9 +3,12 @@ use std::path::PathBuf; use polars_parquet::arrow::read::*; use super::*; +use crate::io::parquet::read::file::FileReader; #[cfg(feature = "parquet")] #[test] fn all_types() -> PolarsResult<()> { + use crate::io::parquet::read::file::FileReader; + let dir = env!("CARGO_MANIFEST_DIR"); let path = PathBuf::from(dir).join("../../docs/data/alltypes_plain.parquet"); @@ -49,6 +52,8 @@ fn all_types() -> PolarsResult<()> { #[test] fn all_types_chunked() -> PolarsResult<()> { // this has one batch with 8 elements + + use crate::io::parquet::read::file::FileReader; let dir = env!("CARGO_MANIFEST_DIR"); let path = PathBuf::from(dir).join("../../docs/data/alltypes_plain.parquet"); let mut reader = std::fs::File::open(path)?; diff --git a/crates/polars/tests/it/io/parquet/mod.rs b/crates/polars/tests/it/io/parquet/mod.rs index 5cc89e7452d7..3c756bbea274 100644 --- a/crates/polars/tests/it/io/parquet/mod.rs +++ b/crates/polars/tests/it/io/parquet/mod.rs @@ -1,6 +1,6 @@ #![forbid(unsafe_code)] mod arrow; -mod read; +pub(crate) mod read; mod roundtrip; mod write; diff --git a/crates/polars-parquet/src/arrow/read/file.rs b/crates/polars/tests/it/io/parquet/read/file.rs similarity index 97% rename from crates/polars-parquet/src/arrow/read/file.rs rename to crates/polars/tests/it/io/parquet/read/file.rs index a390022331be..1f069d651af2 100644 --- a/crates/polars-parquet/src/arrow/read/file.rs +++ b/crates/polars/tests/it/io/parquet/read/file.rs @@ -4,10 +4,9 @@ use arrow::array::Array; use arrow::datatypes::ArrowSchema; use arrow::record_batch::RecordBatchT; use polars_error::PolarsResult; +use polars_parquet::read::{Filter, RowGroupMetaData}; -use super::deserialize::Filter; -use super::{RowGroupDeserializer, RowGroupMetaData}; -use crate::arrow::read::read_columns_many; +use super::row_group::{read_columns_many, RowGroupDeserializer}; /// An iterator of [`RecordBatchT`]s coming from row groups of a parquet file. /// diff --git a/crates/polars/tests/it/io/parquet/read/mod.rs b/crates/polars/tests/it/io/parquet/read/mod.rs index 2e98f5dcacaa..f9e16619556c 100644 --- a/crates/polars/tests/it/io/parquet/read/mod.rs +++ b/crates/polars/tests/it/io/parquet/read/mod.rs @@ -4,10 +4,12 @@ mod binary; /// but OTOH it has no external dependencies and is very familiar to Rust developers. mod boolean; mod dictionary; +pub(crate) mod file; mod fixed_binary; mod indexes; mod primitive; mod primitive_nested; +pub(crate) mod row_group; mod struct_; mod utils; diff --git a/crates/polars-parquet/src/arrow/read/row_group.rs b/crates/polars/tests/it/io/parquet/read/row_group.rs similarity index 79% rename from crates/polars-parquet/src/arrow/read/row_group.rs rename to crates/polars/tests/it/io/parquet/read/row_group.rs index 81be2243669a..44e6622589c5 100644 --- a/crates/polars-parquet/src/arrow/read/row_group.rs +++ b/crates/polars/tests/it/io/parquet/read/row_group.rs @@ -4,14 +4,12 @@ use arrow::array::Array; use arrow::datatypes::Field; use arrow::record_batch::RecordBatchT; use polars_error::PolarsResult; +use polars_parquet::arrow::read::{column_iter_to_arrays, Filter}; +use polars_parquet::parquet::metadata::ColumnChunkMetaData; +use polars_parquet::parquet::read::{get_field_columns, BasicDecompressor, PageReader}; +use polars_parquet::read::RowGroupMetaData; use polars_utils::mmap::MemReader; -use super::RowGroupMetaData; -use crate::arrow::read::column_iter_to_arrays; -use crate::arrow::read::deserialize::Filter; -use crate::parquet::metadata::ColumnChunkMetaData; -use crate::parquet::read::{BasicDecompressor, PageReader}; - /// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into /// an iterator of [`RecordBatchT`]. /// @@ -65,33 +63,6 @@ impl Iterator for RowGroupDeserializer { } } -/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. -/// For non-nested parquet types, this returns a single column -pub fn get_field_columns<'a>( - columns: &'a [ColumnChunkMetaData], - field_name: &str, -) -> Vec<&'a ColumnChunkMetaData> { - columns - .iter() - .filter(|x| x.descriptor().path_in_schema[0] == field_name) - .collect() -} - -/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. -/// For non-nested parquet types, this returns a single column -pub fn get_field_pages<'a, T>( - columns: &'a [ColumnChunkMetaData], - items: &'a [T], - field_name: &str, -) -> Vec<&'a T> { - columns - .iter() - .zip(items) - .filter(|(metadata, _)| metadata.descriptor().path_in_schema[0] == field_name) - .map(|(_, item)| item) - .collect() -} - /// Reads all columns that are part of the parquet field `field_name` /// # Implementation /// This operation is IO-bounded `O(C)` where C is the number of columns associated to @@ -99,10 +70,9 @@ pub fn get_field_pages<'a, T>( pub fn read_columns<'a, R: Read + Seek>( reader: &mut R, columns: &'a [ColumnChunkMetaData], - field_name: &str, + field_name: &'a str, ) -> PolarsResult)>> { get_field_columns(columns, field_name) - .into_iter() .map(|meta| _read_single_column(reader, meta)) .collect() } @@ -175,7 +145,7 @@ pub fn read_columns_many( field_columns .into_iter() - .zip(fields) - .map(|(columns, field)| to_deserializer(columns, field, filter.clone())) + .zip(fields.clone()) + .map(|(columns, field)| to_deserializer(columns.clone(), field, filter.clone())) .collect() } diff --git a/crates/polars/tests/it/io/parquet/roundtrip.rs b/crates/polars/tests/it/io/parquet/roundtrip.rs index aa4eacb0e04d..bc77d50afd8c 100644 --- a/crates/polars/tests/it/io/parquet/roundtrip.rs +++ b/crates/polars/tests/it/io/parquet/roundtrip.rs @@ -10,6 +10,8 @@ use polars_parquet::write::{ CompressionOptions, Encoding, RowGroupIterator, StatisticsOptions, Version, }; +use crate::io::parquet::read::file::FileReader; + fn round_trip( array: &ArrayRef, version: Version, @@ -53,7 +55,7 @@ fn round_trip( .collect(); // we can then read the row groups into chunks - let chunks = polars_parquet::read::FileReader::new(reader, row_groups, schema, None); + let chunks = FileReader::new(reader, row_groups, schema, None); let mut arrays = vec![]; for chunk in chunks {