Skip to content

Commit

Permalink
chore: Remove unused Parquet source files (#18193)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Aug 14, 2024
1 parent 7686025 commit 4157e92
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 49 deletions.
31 changes: 27 additions & 4 deletions crates/polars-parquet/src/arrow/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
#![allow(clippy::type_complexity)]

mod deserialize;
mod file;
pub mod indexes;
mod row_group;
pub mod schema;
pub mod statistics;

Expand All @@ -15,11 +13,9 @@ pub use deserialize::{
column_iter_to_arrays, create_list, create_map, get_page_iterator, init_nested, n_columns,
Filter, InitNested, NestedState,
};
pub use file::{FileReader, RowGroupReader};
#[cfg(feature = "async")]
use futures::{AsyncRead, AsyncSeek};
use polars_error::PolarsResult;
pub use row_group::*;
pub use schema::{infer_schema, FileMetaData};

use crate::parquet::error::ParquetResult;
Expand All @@ -44,6 +40,33 @@ pub use crate::parquet::{
FallibleStreamingIterator,
};

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect()
}

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_pages<'a, T>(
columns: &'a [ColumnChunkMetaData],
items: &'a [T],
field_name: &str,
) -> Vec<&'a T> {
columns
.iter()
.zip(items)
.filter(|(metadata, _)| metadata.descriptor().path_in_schema[0] == field_name)
.map(|(_, item)| item)
.collect()
}

/// Reads parquets' metadata synchronously.
pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> PolarsResult<FileMetaData> {
Ok(_read_metadata(reader)?)
Expand Down
8 changes: 5 additions & 3 deletions crates/polars/tests/it/io/parquet/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use polars_parquet::read as p_read;
use polars_parquet::read::statistics::*;
use polars_parquet::write::*;

use super::read::file::FileReader;

type ArrayStats = (Box<dyn Array>, Statistics);

fn new_struct(
Expand Down Expand Up @@ -52,7 +54,7 @@ pub fn read_column<R: Read + Seek>(mut reader: R, column: &str) -> PolarsResult<

let statistics = deserialize(field, row_group)?;

let mut reader = p_read::FileReader::new(reader, metadata.row_groups, schema, None);
let mut reader = FileReader::new(reader, metadata.row_groups, schema, None);

let array = reader.next().unwrap()?.into_arrays().pop().unwrap();

Expand Down Expand Up @@ -1302,7 +1304,7 @@ fn integration_read(data: &[u8], limit: Option<usize>) -> PolarsResult<Integrati
let mut _statistics = deserialize(field, row_group)?;
}

let reader = p_read::FileReader::new(
let reader = FileReader::new(
Cursor::new(data),
metadata.row_groups,
schema.clone(),
Expand Down Expand Up @@ -1644,7 +1646,7 @@ fn filter_chunk() -> PolarsResult<()> {
.map(|(_, row_group)| row_group)
.collect();

let reader = p_read::FileReader::new(reader, row_groups, schema, None);
let reader = FileReader::new(reader, row_groups, schema, None);

let new_chunks = reader.collect::<PolarsResult<Vec<_>>>()?;

Expand Down
5 changes: 5 additions & 0 deletions crates/polars/tests/it/io/parquet/arrow/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use std::path::PathBuf;
use polars_parquet::arrow::read::*;

use super::*;
use crate::io::parquet::read::file::FileReader;
#[cfg(feature = "parquet")]
#[test]
fn all_types() -> PolarsResult<()> {
use crate::io::parquet::read::file::FileReader;

let dir = env!("CARGO_MANIFEST_DIR");
let path = PathBuf::from(dir).join("../../docs/data/alltypes_plain.parquet");

Expand Down Expand Up @@ -49,6 +52,8 @@ fn all_types() -> PolarsResult<()> {
#[test]
fn all_types_chunked() -> PolarsResult<()> {
// this has one batch with 8 elements

use crate::io::parquet::read::file::FileReader;
let dir = env!("CARGO_MANIFEST_DIR");
let path = PathBuf::from(dir).join("../../docs/data/alltypes_plain.parquet");
let mut reader = std::fs::File::open(path)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![forbid(unsafe_code)]
mod arrow;
mod read;
pub(crate) mod read;
mod roundtrip;
mod write;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use arrow::array::Array;
use arrow::datatypes::ArrowSchema;
use arrow::record_batch::RecordBatchT;
use polars_error::PolarsResult;
use polars_parquet::read::{Filter, RowGroupMetaData};

use super::deserialize::Filter;
use super::{RowGroupDeserializer, RowGroupMetaData};
use crate::arrow::read::read_columns_many;
use super::row_group::{read_columns_many, RowGroupDeserializer};

/// An iterator of [`RecordBatchT`]s coming from row groups of a parquet file.
///
Expand Down
2 changes: 2 additions & 0 deletions crates/polars/tests/it/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ mod binary;
/// but OTOH it has no external dependencies and is very familiar to Rust developers.
mod boolean;
mod dictionary;
pub(crate) mod file;
mod fixed_binary;
mod indexes;
mod primitive;
mod primitive_nested;
pub(crate) mod row_group;
mod struct_;
mod utils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ use arrow::array::Array;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatchT;
use polars_error::PolarsResult;
use polars_parquet::arrow::read::{column_iter_to_arrays, Filter};
use polars_parquet::parquet::metadata::ColumnChunkMetaData;
use polars_parquet::parquet::read::{get_field_columns, BasicDecompressor, PageReader};
use polars_parquet::read::RowGroupMetaData;
use polars_utils::mmap::MemReader;

use super::RowGroupMetaData;
use crate::arrow::read::column_iter_to_arrays;
use crate::arrow::read::deserialize::Filter;
use crate::parquet::metadata::ColumnChunkMetaData;
use crate::parquet::read::{BasicDecompressor, PageReader};

/// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into
/// an iterator of [`RecordBatchT`].
///
Expand Down Expand Up @@ -65,44 +63,16 @@ impl Iterator for RowGroupDeserializer {
}
}

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect()
}

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_pages<'a, T>(
columns: &'a [ColumnChunkMetaData],
items: &'a [T],
field_name: &str,
) -> Vec<&'a T> {
columns
.iter()
.zip(items)
.filter(|(metadata, _)| metadata.descriptor().path_in_schema[0] == field_name)
.map(|(_, item)| item)
.collect()
}

/// Reads all columns that are part of the parquet field `field_name`
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
pub fn read_columns<'a, R: Read + Seek>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
field_name: &'a str,
) -> PolarsResult<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| _read_single_column(reader, meta))
.collect()
}
Expand Down Expand Up @@ -175,7 +145,7 @@ pub fn read_columns_many<R: Read + Seek>(

field_columns
.into_iter()
.zip(fields)
.map(|(columns, field)| to_deserializer(columns, field, filter.clone()))
.zip(fields.clone())
.map(|(columns, field)| to_deserializer(columns.clone(), field, filter.clone()))
.collect()
}
4 changes: 3 additions & 1 deletion crates/polars/tests/it/io/parquet/roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use polars_parquet::write::{
CompressionOptions, Encoding, RowGroupIterator, StatisticsOptions, Version,
};

use crate::io::parquet::read::file::FileReader;

fn round_trip(
array: &ArrayRef,
version: Version,
Expand Down Expand Up @@ -53,7 +55,7 @@ fn round_trip(
.collect();

// we can then read the row groups into chunks
let chunks = polars_parquet::read::FileReader::new(reader, row_groups, schema, None);
let chunks = FileReader::new(reader, row_groups, schema, None);

let mut arrays = vec![];
for chunk in chunks {
Expand Down

0 comments on commit 4157e92

Please sign in to comment.