From 2c7ec6771beac3364b4cb2e91e9dfe095b223763 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 17 Sep 2024 13:49:17 +0200 Subject: [PATCH] c --- crates/polars-io/src/parquet/read/mod.rs | 2 +- crates/polars-io/src/parquet/read/utils.rs | 36 ++++++++++++++++++- .../nodes/parquet_source/metadata_fetch.rs | 2 +- .../nodes/parquet_source/metadata_utils.rs | 35 +++--------------- 4 files changed, 42 insertions(+), 33 deletions(-) diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index 5c722c5b77cd..56621ce36cbc 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -36,7 +36,7 @@ use polars_error::{ErrString, PolarsError}; #[cfg(feature = "cloud")] pub use reader::ParquetAsyncReader; pub use reader::{BatchedParquetReader, ParquetReader}; -pub use utils::materialize_empty_df; +pub use utils::{ensure_schema_has_projected_fields, materialize_empty_df}; pub mod _internal { pub use super::mmap::to_deserializer; diff --git a/crates/polars-io/src/parquet/read/utils.rs b/crates/polars-io/src/parquet/read/utils.rs index 34cc752dd782..15f4495bc976 100644 --- a/crates/polars-io/src/parquet/read/utils.rs +++ b/crates/polars-io/src/parquet/read/utils.rs @@ -1,6 +1,8 @@ use std::borrow::Cow; -use polars_core::prelude::{ArrowSchema, DataFrame, Series, IDX_DTYPE}; +use polars_core::prelude::{ArrowSchema, DataFrame, DataType, PlHashMap, Series, IDX_DTYPE}; +use polars_error::{polars_bail, PolarsResult}; +use polars_utils::pl_str::PlSmallStr; use crate::hive::materialize_hive_partitions; use crate::utils::apply_projection; @@ -28,3 +30,35 @@ pub fn materialize_empty_df( df } + +/// Ensures that a parquet file has all the necessary columns for a projection with the correct +/// dtype. There are no ordering requirements and extra columns are permitted. +pub fn ensure_schema_has_projected_fields( + schema: &ArrowSchema, + projected_arrow_schema: &ArrowSchema, +) -> PolarsResult<()> { + // Note: We convert to Polars-native dtypes for timezone normalization. + let mut schema = schema + .iter_values() + .map(|x| { + let dtype = DataType::from_arrow(&x.dtype, true); + (x.name.clone(), dtype) + }) + .collect::>(); + + for field in projected_arrow_schema.iter_values() { + let Some(dtype) = schema.remove(&field.name) else { + polars_bail!(SchemaMismatch: "did not find column: {}", field.name) + }; + + let expected_dtype = DataType::from_arrow(&field.dtype, true); + + if dtype != expected_dtype { + polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}", + &field.name, dtype, expected_dtype + ) + } + } + + Ok(()) +} diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index 7f5aa8abbf3e..05d59e76f468 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -128,8 +128,8 @@ impl ParquetSourceNode { }; ensure_metadata_has_projected_fields( - projected_arrow_schema.as_ref(), &metadata, + projected_arrow_schema.as_ref(), )?; PolarsResult::Ok((path_index, byte_source, metadata)) diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs index 24184fd12b10..a45cf5e10c61 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs @@ -1,9 +1,8 @@ -use polars_core::prelude::{ArrowSchema, DataType, PlHashMap}; -use polars_error::{polars_bail, PolarsResult}; -use polars_io::prelude::FileMetadata; +use polars_core::prelude::ArrowSchema; +use polars_error::PolarsResult; +use polars_io::prelude::{ensure_schema_has_projected_fields, FileMetadata}; use polars_io::utils::byte_source::{ByteSource, DynByteSource}; use polars_utils::mmap::MemSlice; -use polars_utils::pl_str::PlSmallStr; /// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch /// the bytes of the entire file are loaded, it is returned in the second return value. @@ -124,33 +123,9 @@ pub(super) async fn read_parquet_metadata_bytes( /// Ensures that a parquet file has all the necessary columns for a projection with the correct /// dtype. There are no ordering requirements and extra columns are permitted. pub(super) fn ensure_metadata_has_projected_fields( - projected_fields: &ArrowSchema, metadata: &FileMetadata, + projected_arrow_schema: &ArrowSchema, ) -> PolarsResult<()> { let schema = polars_parquet::arrow::read::infer_schema(metadata)?; - - // Note: We convert to Polars-native dtypes for timezone normalization. - let mut schema = schema - .into_iter_values() - .map(|x| { - let dtype = DataType::from_arrow(&x.dtype, true); - (x.name, dtype) - }) - .collect::>(); - - for field in projected_fields.iter_values() { - let Some(dtype) = schema.remove(&field.name) else { - polars_bail!(SchemaMismatch: "did not find column: {}", field.name) - }; - - let expected_dtype = DataType::from_arrow(&field.dtype, true); - - if dtype != expected_dtype { - polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}", - &field.name, dtype, expected_dtype - ) - } - } - - Ok(()) + ensure_schema_has_projected_fields(&schema, projected_arrow_schema) }