From ccabe98c78b2c0529edd562652aa95faab3c96dd Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Tue, 10 Sep 2024 21:31:17 +1000 Subject: [PATCH] do not fetch if path_index==0 --- .../src/parquet/metadata/file_metadata.rs | 2 +- .../nodes/parquet_source/metadata_fetch.rs | 20 +++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/crates/polars-parquet/src/parquet/metadata/file_metadata.rs b/crates/polars-parquet/src/parquet/metadata/file_metadata.rs index 492d283f64ed8..47c9f160781d8 100644 --- a/crates/polars-parquet/src/parquet/metadata/file_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/file_metadata.rs @@ -10,7 +10,7 @@ pub use crate::parquet::thrift_format::KeyValue; /// Metadata for a Parquet file. // This is almost equal to [`parquet_format_safe::FileMetaData`] but contains the descriptors, // which are crucial to deserialize pages. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct FileMetadata { /// version of this file. pub version: i32, diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index 202b28fbd478e..746a6a5ac23e9 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -83,6 +83,11 @@ impl ParquetSourceNode { ) .await?, ); + + if path_idx == 0 { + return Ok((0, byte_source, MemSlice::from_slice(&[]))); + } + let (metadata_bytes, maybe_full_bytes) = read_parquet_metadata_bytes(byte_source.as_ref(), verbose).await?; @@ -109,20 +114,27 @@ impl ParquetSourceNode { } }; + let first_metadata = self.first_metadata.clone(); + let process_metadata_bytes = { move |handle: task_handles_ext::AbortOnDropHandle< PolarsResult<(usize, Arc, MemSlice)>, >| { let projected_arrow_fields = projected_arrow_fields.clone(); + let first_metadata = first_metadata.clone(); // Run on CPU runtime - metadata deserialization is expensive, especially // for very wide tables. let handle = async_executor::spawn(TaskPriority::Low, async move { let (path_index, byte_source, metadata_bytes) = handle.await.unwrap()?; - let metadata = polars_parquet::parquet::read::deserialize_metadata( - metadata_bytes.as_ref(), - metadata_bytes.len() * 2 + 1024, - )?; + let metadata = if path_index == 0 { + Arc::unwrap_or_clone(first_metadata) + } else { + polars_parquet::parquet::read::deserialize_metadata( + metadata_bytes.as_ref(), + metadata_bytes.len() * 2 + 1024, + )? + }; ensure_metadata_has_projected_fields( projected_arrow_fields.as_ref(),