From 975d7a0f39df8f7de7370415255a0e81e8026d0b Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Mon, 11 Mar 2024 13:58:13 +0100 Subject: [PATCH 1/7] feat(rust): Implement IpcReaderAsync --- crates/polars-arrow/src/io/ipc/read/file.rs | 2 +- crates/polars-arrow/src/io/ipc/read/mod.rs | 4 +- crates/polars-io/src/ipc/ipc_reader_async.rs | 210 +++++++++++++++++++ crates/polars-io/src/ipc/mod.rs | 5 + 4 files changed, 219 insertions(+), 2 deletions(-) create mode 100644 crates/polars-io/src/ipc/ipc_reader_async.rs diff --git a/crates/polars-arrow/src/io/ipc/read/file.rs b/crates/polars-arrow/src/io/ipc/read/file.rs index 91d276326430..0eb60d0a566c 100644 --- a/crates/polars-arrow/src/io/ipc/read/file.rs +++ b/crates/polars-arrow/src/io/ipc/read/file.rs @@ -215,7 +215,7 @@ fn deserialize_footer_blocks( Ok((footer, blocks)) } -pub(super) fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult { +pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult { let (footer, blocks) = deserialize_footer_blocks(footer_data)?; let ipc_schema = footer diff --git a/crates/polars-arrow/src/io/ipc/read/mod.rs b/crates/polars-arrow/src/io/ipc/read/mod.rs index f79376934427..74d9a93a9309 100644 --- a/crates/polars-arrow/src/io/ipc/read/mod.rs +++ b/crates/polars-arrow/src/io/ipc/read/mod.rs @@ -30,7 +30,9 @@ pub mod file_async; pub(crate) use common::first_dict_field; #[cfg(feature = "io_flight")] pub(crate) use common::{read_dictionary, read_record_batch}; -pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata}; +pub use file::{ + deserialize_footer, read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, +}; use polars_utils::aliases::PlHashMap; pub use reader::FileReader; pub use schema::deserialize_schema; diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs new file mode 100644 index 000000000000..54b279af4274 --- /dev/null +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -0,0 +1,210 @@ +use std::ops::Range; +use std::sync::Arc; + +use arrow::io::ipc::read::FileMetadata; +use bytes::Bytes; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use polars_core::frame::DataFrame; +use polars_error::{to_compute_err, PolarsResult}; +use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard}; + +use crate::cloud::{build_object_store, CloudLocation, CloudOptions}; +use crate::pl_async::{ + tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, +}; +use crate::prelude::IpcReader; + +/// Polars specific wrapper for Arc that limits the number of +/// concurrent requests for the entire application. +#[derive(Debug, Clone)] +pub struct PolarsObjectStore(Arc); + +impl From for PolarsObjectStore +where + O: ObjectStore, +{ + fn from(value: O) -> Self { + Self::new(Arc::new(value)) + } +} + +impl PolarsObjectStore { + pub fn new(store: Arc) -> Self { + Self(store) + } + + pub async fn get(&self, path: &Path) -> PolarsResult { + tune_with_concurrency_budget(1, || async { + self + .0 + .get(path) + .await + .map_err(to_compute_err)? + .bytes() + .await + .map_err(to_compute_err) + }) + .await + } + + pub async fn get_range(&self, path: &Path, range: Range) -> PolarsResult { + tune_with_concurrency_budget(1, || self.0.get_range(path, range)) + .await + .map_err(to_compute_err) + } + + pub async fn get_ranges( + &self, + path: &Path, + ranges: &[Range], + ) -> PolarsResult> { + tune_with_concurrency_budget( + (ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32), + || self.0.get_ranges(path, ranges), + ) + .await + .map_err(to_compute_err) + } + + /// Fetch the metadata of the parquet file, do not memoize it. + pub async fn head(&self, path: &Path) -> PolarsResult { + with_concurrency_budget(1, || self.0.head(path)) + .await + .map_err(to_compute_err) + } +} + +/// An Arrow IPC reader implemented on top of PolarsObjectStore. +pub struct IpcReaderAsync { + store: PolarsObjectStore, + path: Path, + object_metadata: Mutex>, + ipc_metadata: Mutex>, +} + +impl IpcReaderAsync { + pub async fn from_uri( + uri: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { + let ( + CloudLocation { + prefix, expansion, .. + }, + store, + ) = build_object_store(uri, cloud_options).await?; + + Ok(Self { + store: PolarsObjectStore::new(store), + path: { + // Any wildcards should already have been resolved here. Without this assertion they would + // be ignored. + debug_assert!(expansion.is_none(), "path should not contain wildcards"); + Path::from_url_path(prefix).map_err(to_compute_err)? + }, + object_metadata: Default::default(), + ipc_metadata: Default::default(), + }) + } + + async fn fetch_object_metadata(&self) -> PolarsResult { + self.store.head(&self.path).await + } + + async fn object_metadata(&self) -> PolarsResult> { + // NOTE: We have to be careful of deadlocks because we wait here for a + // `head` request to finish which waits for a semaphore. If that + // semaphore is also used in places that call this function, we can get + // a deadlock. + let mut object_metadata = self.object_metadata.lock().await; + if object_metadata.is_none() { + *object_metadata = Some(self.fetch_object_metadata().await?); + } + Ok(MutexGuard::map(object_metadata, |x| { + x.as_mut().unwrap_or_else(|| unreachable!()) + })) + } + + async fn file_size(&self) -> PolarsResult { + Ok(self.object_metadata().await?.size) + } + + async fn fetch_ipc_metadata(&self) -> PolarsResult { + let file_size = self.file_size().await?; + + // TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip. + let footer_metadata = + self.store + .get_range( + &self.path, + file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| { + to_compute_err("ipc file size is smaller than the minimum") + })?..file_size, + ) + .await?; + + let footer_size = deserialize_footer_metadata( + footer_metadata + .as_ref() + .try_into() + .map_err(to_compute_err)?, + )?; + + let footer = self + .store + .get_range( + &self.path, + file_size + .checked_sub(FOOTER_METADATA_SIZE + footer_size) + .ok_or_else(|| { + to_compute_err("invalid ipc footer metadata: footer size too large") + })?..file_size, + ) + .await?; + + arrow::io::ipc::read::deserialize_footer( + footer.as_ref(), + footer_size.try_into().map_err(to_compute_err)?, + ) + } + + pub async fn metadata(&self) -> PolarsResult> { + let mut ipc_metadata = self.ipc_metadata.lock().await; + if ipc_metadata.is_none() { + *ipc_metadata = Some(self.fetch_ipc_metadata().await?) + } + Ok(MutexGuard::map(ipc_metadata, |x| { + x.as_mut().unwrap_or_else(|| unreachable!()) + })) + } + + pub async fn data(&self) -> PolarsResult { + let bytes = self.store.get(&self.path).await?; + let reader = + as crate::SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref())) + .with_row_index(None) // TODO + .with_n_rows(None) // TODO + .with_columns(None) // TODO + .with_projection(None); // TODO + let predicate = None; // TODO + let verbose = false; // TODO + reader.finish_with_scan_ops(predicate, verbose) + } +} + +const FOOTER_METADATA_SIZE: usize = 10; + +fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult { + let footer_size: usize = + i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!())) + .try_into() + .map_err(to_compute_err)?; + + // TODO: Move to polars-arrow and deduplicate parsing of footer metadata in sync and async readers. + if &bytes[4..] != b"ARROW1" { + Err(to_compute_err("invalid ipc footer magic"))? + } + + Ok(footer_size) +} diff --git a/crates/polars-io/src/ipc/mod.rs b/crates/polars-io/src/ipc/mod.rs index 1366aa84324f..813fc7f5df78 100644 --- a/crates/polars-io/src/ipc/mod.rs +++ b/crates/polars-io/src/ipc/mod.rs @@ -16,3 +16,8 @@ pub use ipc_file::IpcReader; #[cfg(feature = "ipc_streaming")] pub use ipc_stream::*; pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption}; + +#[cfg(feature = "cloud")] +mod ipc_reader_async; +#[cfg(feature = "cloud")] +pub use ipc_reader_async::*; From caa7e5eda5daafef62ec658c24823ee940c69e59 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Mon, 11 Mar 2024 16:36:26 +0100 Subject: [PATCH 2/7] Integrate IpcReaderAsync with execution engine --- crates/polars-io/src/ipc/ipc_file.rs | 8 +- crates/polars-io/src/ipc/ipc_reader_async.rs | 172 +++++++++++++----- .../src/physical_plan/executors/scan/ipc.rs | 58 ++++++ .../src/physical_plan/planner/lp.rs | 10 +- crates/polars-lazy/src/scan/ipc.rs | 7 + crates/polars-lazy/src/tests/io.rs | 2 + .../polars-plan/src/logical_plan/builder.rs | 72 ++++++-- .../polars-plan/src/logical_plan/file_scan.rs | 32 +++- .../src/logical_plan/functions/count.rs | 78 +++++++- py-polars/polars/io/ipc/functions.py | 5 + py-polars/polars/lazyframe/frame.py | 3 + py-polars/src/lazyframe/mod.rs | 33 +++- py-polars/tests/unit/io/test_lazy_ipc.py | 27 ++- 13 files changed, 425 insertions(+), 82 deletions(-) diff --git a/crates/polars-io/src/ipc/ipc_file.rs b/crates/polars-io/src/ipc/ipc_file.rs index e18b3d3bb763..718c5cf92e67 100644 --- a/crates/polars-io/src/ipc/ipc_file.rs +++ b/crates/polars-io/src/ipc/ipc_file.rs @@ -164,10 +164,10 @@ impl IpcReader { let rechunk = self.rechunk; let metadata = read::read_file_metadata(&mut self.reader)?; - debug_assert!( - self.columns.is_none(), - "column names must have already been converted into indices", - ); + // TODO: Replace this back with the assertion before merging https://github.com/pola-rs/polars/pull/14861. + if let Some(columns) = &self.columns { + self.projection = Some(columns_to_projection(columns, &metadata.schema)?); + } let schema = if let Some(projection) = &self.projection { Arc::new(apply_projection(&metadata.schema, projection)) diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index 54b279af4274..40ba28f9900b 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -5,30 +5,24 @@ use arrow::io::ipc::read::FileMetadata; use bytes::Bytes; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; +use polars_core::datatypes::IDX_DTYPE; use polars_core::frame::DataFrame; +use polars_core::schema::Schema; use polars_error::{to_compute_err, PolarsResult}; -use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard}; use crate::cloud::{build_object_store, CloudLocation, CloudOptions}; use crate::pl_async::{ tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, }; -use crate::prelude::IpcReader; +use crate::predicates::PhysicalIoExpr; +use crate::prelude::{materialize_projection, IpcReader}; +use crate::RowIndex; -/// Polars specific wrapper for Arc that limits the number of +/// Polars specific wrapper for `Arc` that limits the number of /// concurrent requests for the entire application. #[derive(Debug, Clone)] pub struct PolarsObjectStore(Arc); -impl From for PolarsObjectStore -where - O: ObjectStore, -{ - fn from(value: O) -> Self { - Self::new(Arc::new(value)) - } -} - impl PolarsObjectStore { pub fn new(store: Arc) -> Self { Self(store) @@ -36,8 +30,7 @@ impl PolarsObjectStore { pub async fn get(&self, path: &Path) -> PolarsResult { tune_with_concurrency_budget(1, || async { - self - .0 + self.0 .get(path) .await .map_err(to_compute_err)? @@ -79,8 +72,45 @@ impl PolarsObjectStore { pub struct IpcReaderAsync { store: PolarsObjectStore, path: Path, - object_metadata: Mutex>, - ipc_metadata: Mutex>, + // object_metadata: Mutex>, + // ipc_metadata: Mutex>, +} + +#[derive(Default, Clone)] +pub struct IpcReadOptions { + // Names of the columns to include in the output. + projection: Option>, + + // The maximum number of rows to include in the output. + row_limit: Option, + + // Include a column with the row number under the provided name starting at the provided index. + row_index: Option, + + // Only include rows that pass this predicate. + predicate: Option>, +} + +impl IpcReadOptions { + pub fn with_projection(mut self, indices: impl Into>>) -> Self { + self.projection = indices.into(); + self + } + + pub fn with_row_limit(mut self, row_limit: impl Into>) -> Self { + self.row_limit = row_limit.into(); + self + } + + pub fn with_row_index(mut self, row_index: impl Into>) -> Self { + self.row_index = row_index.into(); + self + } + + pub fn with_predicate(mut self, predicate: impl Into>>) -> Self { + self.predicate = predicate.into(); + self + } } impl IpcReaderAsync { @@ -103,34 +133,34 @@ impl IpcReaderAsync { debug_assert!(expansion.is_none(), "path should not contain wildcards"); Path::from_url_path(prefix).map_err(to_compute_err)? }, - object_metadata: Default::default(), - ipc_metadata: Default::default(), + // object_metadata: Default::default(), + // ipc_metadata: Default::default(), }) } - async fn fetch_object_metadata(&self) -> PolarsResult { + async fn object_metadata(&self) -> PolarsResult { self.store.head(&self.path).await } - async fn object_metadata(&self) -> PolarsResult> { - // NOTE: We have to be careful of deadlocks because we wait here for a - // `head` request to finish which waits for a semaphore. If that - // semaphore is also used in places that call this function, we can get - // a deadlock. - let mut object_metadata = self.object_metadata.lock().await; - if object_metadata.is_none() { - *object_metadata = Some(self.fetch_object_metadata().await?); - } - Ok(MutexGuard::map(object_metadata, |x| { - x.as_mut().unwrap_or_else(|| unreachable!()) - })) - } + // async fn object_metadata(&self) -> PolarsResult> { + // // NOTE: We have to be careful of deadlocks because we wait here for a + // // `head` request to finish which waits for a semaphore. If that + // // semaphore is also used in places that call this function, we can get + // // a deadlock. + // let mut object_metadata = self.object_metadata.lock().await; + // if object_metadata.is_none() { + // *object_metadata = Some(self.fetch_object_metadata().await?); + // } + // Ok(MutexGuard::map(object_metadata, |x| { + // x.as_mut().unwrap_or_else(|| unreachable!()) + // })) + // } async fn file_size(&self) -> PolarsResult { Ok(self.object_metadata().await?.size) } - async fn fetch_ipc_metadata(&self) -> PolarsResult { + pub async fn metadata(&self) -> PolarsResult { let file_size = self.file_size().await?; // TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip. @@ -169,27 +199,69 @@ impl IpcReaderAsync { ) } - pub async fn metadata(&self) -> PolarsResult> { - let mut ipc_metadata = self.ipc_metadata.lock().await; - if ipc_metadata.is_none() { - *ipc_metadata = Some(self.fetch_ipc_metadata().await?) - } - Ok(MutexGuard::map(ipc_metadata, |x| { - x.as_mut().unwrap_or_else(|| unreachable!()) - })) - } + // pub async fn metadata(&self) -> PolarsResult> { + // let mut ipc_metadata = self.ipc_metadata.lock().await; + // if ipc_metadata.is_none() { + // *ipc_metadata = Some(self.fetch_ipc_metadata().await?) + // } + // Ok(MutexGuard::map(ipc_metadata, |x| { + // x.as_mut().unwrap_or_else(|| unreachable!()) + // })) + // } - pub async fn data(&self) -> PolarsResult { + // TODO: Utilize previously obtained metadata. + pub async fn data( + &self, + metadata: Option<&FileMetadata>, + options: IpcReadOptions, + verbose: bool, + ) -> PolarsResult { let bytes = self.store.get(&self.path).await?; + + let projection = match options.projection.as_deref() { + Some(projection) => { + fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema { + if let Some(rc) = row_index { + let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + } + schema + } + + // Retrieve the metadata for the schema so we can map column names to indices. + let fetched_metadata; + let metadata = if let Some(metadata) = metadata { + metadata + } else { + // This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan. + fetched_metadata = self.metadata().await?; + &fetched_metadata + }; + + let schema = prepare_schema((&metadata.schema).into(), options.row_index.as_ref()); + + let hive_partitions = None; // TODO + materialize_projection( + Some(projection), + &schema, + hive_partitions, + options.row_index.is_some(), + ) + }, + None => None, + }; + let reader = as crate::SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref())) - .with_row_index(None) // TODO - .with_n_rows(None) // TODO - .with_columns(None) // TODO - .with_projection(None); // TODO - let predicate = None; // TODO - let verbose = false; // TODO - reader.finish_with_scan_ops(predicate, verbose) + .with_row_index(options.row_index) + .with_n_rows(options.row_limit) + .with_projection(projection); + reader.finish_with_scan_ops(options.predicate, verbose) + } + + // TODO: Return type i64/u64/usize/alias? + // TODO: Utilize previously obtained metadata. + pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult { + todo!(); } } diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 08f37ab566aa..123eeb9461e1 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -1,5 +1,10 @@ use std::path::PathBuf; +use polars_core::config::env_force_async; +#[cfg(feature = "cloud")] +use polars_io::cloud::CloudOptions; +use polars_io::is_cloud_url; + use super::*; pub struct IpcExec { @@ -8,10 +13,43 @@ pub struct IpcExec { pub(crate) predicate: Option>, pub(crate) options: IpcScanOptions, pub(crate) file_options: FileScanOptions, + #[cfg(feature = "cloud")] + pub(crate) cloud_options: Option, + pub(crate) metadata: Option, } impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { + let is_cloud = is_cloud_url(&self.path); + let force_async = env_force_async(); + + let mut out = if is_cloud || force_async { + #[cfg(not(feature = "cloud"))] + { + panic!("activate cloud feature") + } + + #[cfg(feature = "cloud")] + { + if !is_cloud && verbose { + eprintln!("ASYNC READING FORCED"); + } + + polars_io::pl_async::get_runtime() + .block_on_potential_spawn(self.read_async(verbose))? + } + } else { + self.read_sync(verbose)? + }; + + if self.file_options.rechunk { + out.as_single_chunk_par(); + } + + Ok(out) + } + + fn read_sync(&mut self, verbose: bool) -> PolarsResult { let file = std::fs::File::open(&self.path)?; let (projection, predicate) = prepare_scan_args( self.predicate.clone(), @@ -28,6 +66,26 @@ impl IpcExec { .memory_mapped(self.options.memmap) .finish_with_scan_ops(predicate, verbose) } + + #[cfg(feature = "cloud")] + async fn read_async(&mut self, verbose: bool) -> PolarsResult { + let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + + let reader = + IpcReaderAsync::from_uri(self.path.to_str().unwrap(), self.cloud_options.as_ref()) + .await?; + reader + .data( + self.metadata.as_ref(), + IpcReadOptions::default() + .with_row_limit(self.file_options.n_rows) + .with_row_index(self.file_options.row_index.clone()) + .with_projection(self.file_options.with_columns.as_deref().cloned()) + .with_predicate(predicate), + verbose, + ) + .await + } } impl Executor for IpcExec { diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 8673538f3725..dc6cbc81b255 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -232,7 +232,12 @@ pub fn create_physical_plan( })) }, #[cfg(feature = "ipc")] - FileScan::Ipc { options } => { + FileScan::Ipc { + options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, + } => { assert_eq!(paths.len(), 1); let path = paths[0].clone(); Ok(Box::new(executors::IpcExec { @@ -241,6 +246,9 @@ pub fn create_physical_plan( predicate, options, file_options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, })) }, #[cfg(feature = "parquet")] diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index 653b7e368f91..0cfcb5333a2c 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; use polars_core::prelude::*; +use polars_io::cloud::CloudOptions; use polars_io::RowIndex; use crate::prelude::*; @@ -12,6 +13,8 @@ pub struct ScanArgsIpc { pub rechunk: bool, pub row_index: Option, pub memmap: bool, + #[cfg(feature = "cloud")] + pub cloud_options: Option, } impl Default for ScanArgsIpc { @@ -22,6 +25,8 @@ impl Default for ScanArgsIpc { rechunk: false, row_index: None, memmap: true, + #[cfg(feature = "cloud")] + cloud_options: Default::default(), } } } @@ -58,6 +63,8 @@ impl LazyFileListReader for LazyIpcReader { args.cache, args.row_index.clone(), args.rechunk, + #[cfg(feature = "cloud")] + args.cloud_options, )? .build() .into(); diff --git a/crates/polars-lazy/src/tests/io.rs b/crates/polars-lazy/src/tests/io.rs index 0b22a1a33a4d..24110424f20c 100644 --- a/crates/polars-lazy/src/tests/io.rs +++ b/crates/polars-lazy/src/tests/io.rs @@ -416,6 +416,8 @@ fn test_ipc_globbing() -> PolarsResult<()> { rechunk: false, row_index: None, memmap: true, + #[cfg(feature = "cloud")] + cloud_options: None, }, )? .collect()?; diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 12af640e2d68..31904e371f0d 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -4,8 +4,6 @@ use std::io::{Read, Seek}; use polars_core::prelude::*; #[cfg(feature = "parquet")] use polars_io::cloud::CloudOptions; -#[cfg(feature = "ipc")] -use polars_io::ipc::IpcReader; #[cfg(all(feature = "parquet", feature = "async"))] use polars_io::parquet::ParquetAsyncReader; #[cfg(feature = "parquet")] @@ -239,21 +237,64 @@ impl LogicalPlanBuilder { cache: bool, row_index: Option, rechunk: bool, + #[cfg(feature = "cloud")] cloud_options: Option, ) -> PolarsResult { - use polars_io::SerReader as _; + use polars_io::is_cloud_url; let path = path.into(); - let file = polars_utils::open_file(&path)?; - let mut reader = IpcReader::new(file); - let reader_schema = reader.schema()?; - let mut schema: Schema = (&reader_schema).into(); - if let Some(rc) = &row_index { - let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); - } + let (schema, reader_schema, num_rows, metadata) = if is_cloud_url(&path) { + #[cfg(not(feature = "cloud"))] + panic!( + "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." + ); + + #[cfg(feature = "cloud")] + { + let uri = path.to_string_lossy(); + get_runtime().block_on(async { + let reader = + polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref()) + .await?; + let metadata = reader.metadata().await?; + // TODO: Remove cache in reader and return by value? We drop + // the reader anyways... + let metadata = arrow::io::ipc::read::FileMetadata::clone(&metadata); + let reader_schema = Arc::clone(&metadata.schema); + + let schema = prepare_schema((&reader_schema).into(), row_index.as_ref()); + PolarsResult::Ok((schema, reader_schema, None, metadata)) + })? + } + } else { + // NOTE: We do not really need IpcReader and all of it's memoization + // to read the metadata... + let mut file = polars_utils::open_file(&path)?; + + let metadata = arrow::io::ipc::read::read_file_metadata(&mut file)?; + let reader_schema = Arc::clone(&metadata.schema); + let mut schema: Schema = (&reader_schema).into(); + if let Some(rc) = &row_index { + let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + } + + pub fn _num_rows(metadata: &arrow::io::ipc::read::FileMetadata) -> PolarsResult { + let n_cols = metadata.schema.fields.len(); + // this magic number 10 is computed from the yellow trip dataset + Ok((metadata.size as usize) / n_cols / 10) + } + + let num_rows = _num_rows(&metadata)?; - let num_rows = reader._num_rows()?; - let file_info = FileInfo::new(Arc::new(schema), Some(reader_schema), (None, num_rows)); + let schema = prepare_schema((&reader_schema).into(), row_index.as_ref()); + (schema, reader_schema, Some(num_rows), metadata) + }; + + let file_info = FileInfo::new( + schema, + Some(reader_schema), + (num_rows, num_rows.unwrap_or_default()), + ); let file_options = FileScanOptions { with_columns: None, @@ -270,7 +311,12 @@ impl LogicalPlanBuilder { file_info, file_options, predicate: None, - scan_type: FileScan::Ipc { options }, + scan_type: FileScan::Ipc { + options, + #[cfg(feature = "cloud")] + cloud_options, + metadata: Some(metadata), + }, } .into()) } diff --git a/crates/polars-plan/src/logical_plan/file_scan.rs b/crates/polars-plan/src/logical_plan/file_scan.rs index 2364711eef3b..f0720ed4664b 100644 --- a/crates/polars-plan/src/logical_plan/file_scan.rs +++ b/crates/polars-plan/src/logical_plan/file_scan.rs @@ -16,7 +16,13 @@ pub enum FileScan { metadata: Option>, }, #[cfg(feature = "ipc")] - Ipc { options: IpcScanOptions }, + Ipc { + options: IpcScanOptions, + #[cfg(feature = "cloud")] + cloud_options: Option, + #[cfg_attr(feature = "serde", serde(skip))] + metadata: Option, + }, #[cfg_attr(feature = "serde", serde(skip))] Anonymous { options: Arc, @@ -43,7 +49,29 @@ impl PartialEq for FileScan { }, ) => opt_l == opt_r && c_l == c_r, #[cfg(feature = "ipc")] - (FileScan::Ipc { options: l }, FileScan::Ipc { options: r }) => l == r, + ( + FileScan::Ipc { + options: l, + #[cfg(feature = "cloud")] + cloud_options: c_l, + .. + }, + FileScan::Ipc { + options: r, + #[cfg(feature = "cloud")] + cloud_options: c_r, + .. + }, + ) => { + #[cfg(not(feature = "cloud"))] + { + l == r + } + #[cfg(feature = "cloud")] + { + l == r && c_l == c_r + } + }, _ => false, } } diff --git a/crates/polars-plan/src/logical_plan/functions/count.rs b/crates/polars-plan/src/logical_plan/functions/count.rs index 5e28ad4a37fb..1bbdb36400e2 100644 --- a/crates/polars-plan/src/logical_plan/functions/count.rs +++ b/crates/polars-plan/src/logical_plan/functions/count.rs @@ -1,5 +1,7 @@ #[cfg(feature = "ipc")] -use arrow::io::ipc::read::get_row_count as count_rows_ipc; +use arrow::io::ipc::read::get_row_count as count_rows_ipc_sync; +#[cfg(feature = "ipc")] +use polars_core::error::to_compute_err; #[cfg(feature = "parquet")] use polars_io::cloud::CloudOptions; #[cfg(feature = "csv")] @@ -41,15 +43,21 @@ pub fn count_rows(paths: &Arc<[PathBuf]>, scan_type: &FileScan) -> PolarsResult< Ok(DataFrame::new(vec![Series::new("len", [n_rows as IdxSize])]).unwrap()) }, #[cfg(feature = "ipc")] - FileScan::Ipc { options } => { - let n_rows: PolarsResult = paths - .iter() - .map(|path| { - let mut reader = polars_utils::open_file(path)?; - count_rows_ipc(&mut reader) - }) - .sum(); - Ok(DataFrame::new(vec![Series::new("len", [n_rows? as IdxSize])]).unwrap()) + FileScan::Ipc { + options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, + } => { + let count: IdxSize = count_rows_ipc( + paths, + #[cfg(feature = "cloud")] + cloud_options.as_ref(), + metadata.as_ref(), + )? + .try_into() + .map_err(to_compute_err)?; + Ok(DataFrame::new(vec![Series::new("len", [count])]).unwrap()) }, FileScan::Anonymous { .. } => { unreachable!(); @@ -103,3 +111,53 @@ async fn count_rows_cloud_parquet( .await .map(|rows| rows.iter().sum()) } + +#[cfg(feature = "ipc")] +pub(super) fn count_rows_ipc( + paths: &Arc<[PathBuf]>, + #[cfg(feature = "cloud")] cloud_options: Option<&CloudOptions>, + // TODO: Utilize metadata. + metadata: Option<&arrow::io::ipc::read::FileMetadata>, +) -> PolarsResult { + if paths.is_empty() { + return Ok(0); + }; + let is_cloud = is_cloud_url(paths.first().unwrap().as_path()); + + if is_cloud { + #[cfg(not(feature = "cloud"))] + panic!("One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled."); + + #[cfg(feature = "cloud")] + { + get_runtime().block_on(count_rows_cloud_ipc(paths, cloud_options, metadata)) + } + } else { + paths + .iter() + .map(|path| { + let mut reader = polars_utils::open_file(path)?; + count_rows_ipc_sync(&mut reader) + }) + .sum() + } +} + +#[cfg(all(feature = "ipc", feature = "async"))] +async fn count_rows_cloud_ipc( + paths: &Arc<[PathBuf]>, + cloud_options: Option<&CloudOptions>, + metadata: Option<&arrow::io::ipc::read::FileMetadata>, +) -> PolarsResult { + use polars_io::ipc::IpcReaderAsync; + + let collection = paths.iter().map(|path| { + with_concurrency_budget(1, || async { + let reader = IpcReaderAsync::from_uri(&path.to_string_lossy(), cloud_options).await?; + reader.count_rows(metadata).await + }) + }); + futures::future::try_join_all(collection) + .await + .map(|rows| rows.iter().sum()) +} diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index ddec69601732..55d07848cb5e 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -222,6 +222,7 @@ def scan_ipc( row_index_offset: int = 0, storage_options: dict[str, Any] | None = None, memory_map: bool = True, + retries: int = 0, ) -> LazyFrame: """ Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns. @@ -252,6 +253,9 @@ def scan_ipc( Try to memory map the file. This can greatly improve performance on repeated queries as the OS may cache pages. Only uncompressed IPC files can be memory mapped. + retries + Number of retries if accessing a cloud instance fails. + """ return pl.LazyFrame._scan_ipc( source, @@ -262,4 +266,5 @@ def scan_ipc( row_index_offset=row_index_offset, storage_options=storage_options, memory_map=memory_map, + retries=retries, ) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index ccb109c48cf6..a099ef55b893 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -491,6 +491,7 @@ def _scan_ipc( row_index_offset: int = 0, storage_options: dict[str, object] | None = None, memory_map: bool = True, + retries: int = 0, ) -> Self: """ Lazily read from an Arrow IPC (Feather v2) file. @@ -528,6 +529,8 @@ def _scan_ipc( rechunk, _prepare_row_index_args(row_index_name, row_index_offset), memory_map=memory_map, + cloud_options=storage_options, + retries=retries, ) return self diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index eeb9f28eb731..b1c3f6b80d39 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -304,7 +304,7 @@ impl PyLazyFrame { #[cfg(feature = "ipc")] #[staticmethod] - #[pyo3(signature = (path, paths, n_rows, cache, rechunk, row_index, memory_map))] + #[pyo3(signature = (path, paths, n_rows, cache, rechunk, row_index, memory_map, cloud_options, retries))] fn new_from_ipc( path: Option, paths: Vec, @@ -313,14 +313,45 @@ impl PyLazyFrame { rechunk: bool, row_index: Option<(String, IdxSize)>, memory_map: bool, + cloud_options: Option>, + retries: usize, ) -> PyResult { let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + + #[cfg(feature = "cloud")] + let cloud_options = { + let first_path = if let Some(path) = &path { + path + } else { + paths + .first() + .ok_or_else(|| PyValueError::new_err("expected a path argument"))? + }; + + let first_path_url = first_path.to_string_lossy(); + let mut cloud_options = cloud_options + .map(|kv| parse_cloud_options(&first_path_url, kv)) + .transpose()?; + if retries > 0 { + cloud_options = + cloud_options + .or_else(|| Some(CloudOptions::default())) + .map(|mut options| { + options.max_retries = retries; + options + }); + } + cloud_options + }; + let args = ScanArgsIpc { n_rows, cache, rechunk, row_index, memmap: memory_map, + #[cfg(feature = "cloud")] + cloud_options, }; let lf = if let Some(path) = &path { diff --git a/py-polars/tests/unit/io/test_lazy_ipc.py b/py-polars/tests/unit/io/test_lazy_ipc.py index 8702e83af538..354a912ea4b6 100644 --- a/py-polars/tests/unit/io/test_lazy_ipc.py +++ b/py-polars/tests/unit/io/test_lazy_ipc.py @@ -1,10 +1,11 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import pytest import polars as pl +from polars.testing.asserts.frame import assert_frame_equal if TYPE_CHECKING: from pathlib import Path @@ -85,3 +86,27 @@ def test_ipc_list_arg(io_files_path: Path) -> None: assert df.shape == (54, 4) assert df.row(-1) == ("seafood", 194, 12.0, 1) assert df.row(0) == ("vegetables", 45, 0.5, 2) + + +def test_scan_ipc_local_with_async( + capfd: Any, + monkeypatch: Any, + io_files_path: Path, +) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + + assert_frame_equal( + pl.scan_ipc(io_files_path / "foods1.ipc").head(1).collect(), + pl.DataFrame( + { + "category": ["vegetables"], + "calories": [45], + "fats_g": [0.5], + "sugars_g": [2], + } + ), + ) + + captured = capfd.readouterr().err + assert "ASYNC READING FORCED" in captured From 0a5ef8862c73c2621211f155c1e21721e173816b Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 12 Mar 2024 10:21:55 +0100 Subject: [PATCH 3/7] Ignore scan_ipc async test on windows --- py-polars/tests/unit/io/test_lazy_ipc.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/py-polars/tests/unit/io/test_lazy_ipc.py b/py-polars/tests/unit/io/test_lazy_ipc.py index 354a912ea4b6..ecf2a3e657c2 100644 --- a/py-polars/tests/unit/io/test_lazy_ipc.py +++ b/py-polars/tests/unit/io/test_lazy_ipc.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from typing import TYPE_CHECKING, Any import pytest @@ -88,6 +89,9 @@ def test_ipc_list_arg(io_files_path: Path) -> None: assert df.row(0) == ("vegetables", 45, 0.5, 2) +@pytest.mark.skipif( + sys.platform == "win32", reason="object_store does not handle windows-style paths." +) def test_scan_ipc_local_with_async( capfd: Any, monkeypatch: Any, From 325ca98d6ce6f40498f3c920b9a3f0bf12b89dc9 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 12 Mar 2024 13:15:20 +0100 Subject: [PATCH 4/7] Address comments --- crates/polars-io/src/ipc/ipc_reader_async.rs | 56 ++++++------------- .../polars-plan/src/logical_plan/builder.rs | 3 - 2 files changed, 18 insertions(+), 41 deletions(-) diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index 40ba28f9900b..5799d99f21ab 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -72,8 +72,6 @@ impl PolarsObjectStore { pub struct IpcReaderAsync { store: PolarsObjectStore, path: Path, - // object_metadata: Mutex>, - // ipc_metadata: Mutex>, } #[derive(Default, Clone)] @@ -125,16 +123,16 @@ impl IpcReaderAsync { store, ) = build_object_store(uri, cloud_options).await?; + let path = { + // Any wildcards should already have been resolved here. Without this assertion they would + // be ignored. + debug_assert!(expansion.is_none(), "path should not contain wildcards"); + Path::from_url_path(prefix).map_err(to_compute_err)? + }; + Ok(Self { store: PolarsObjectStore::new(store), - path: { - // Any wildcards should already have been resolved here. Without this assertion they would - // be ignored. - debug_assert!(expansion.is_none(), "path should not contain wildcards"); - Path::from_url_path(prefix).map_err(to_compute_err)? - }, - // object_metadata: Default::default(), - // ipc_metadata: Default::default(), + path, }) } @@ -142,20 +140,6 @@ impl IpcReaderAsync { self.store.head(&self.path).await } - // async fn object_metadata(&self) -> PolarsResult> { - // // NOTE: We have to be careful of deadlocks because we wait here for a - // // `head` request to finish which waits for a semaphore. If that - // // semaphore is also used in places that call this function, we can get - // // a deadlock. - // let mut object_metadata = self.object_metadata.lock().await; - // if object_metadata.is_none() { - // *object_metadata = Some(self.fetch_object_metadata().await?); - // } - // Ok(MutexGuard::map(object_metadata, |x| { - // x.as_mut().unwrap_or_else(|| unreachable!()) - // })) - // } - async fn file_size(&self) -> PolarsResult { Ok(self.object_metadata().await?.size) } @@ -199,23 +183,14 @@ impl IpcReaderAsync { ) } - // pub async fn metadata(&self) -> PolarsResult> { - // let mut ipc_metadata = self.ipc_metadata.lock().await; - // if ipc_metadata.is_none() { - // *ipc_metadata = Some(self.fetch_ipc_metadata().await?) - // } - // Ok(MutexGuard::map(ipc_metadata, |x| { - // x.as_mut().unwrap_or_else(|| unreachable!()) - // })) - // } - - // TODO: Utilize previously obtained metadata. pub async fn data( &self, metadata: Option<&FileMetadata>, options: IpcReadOptions, verbose: bool, ) -> PolarsResult { + // TODO: Only download what is needed rather than the entire file by + // making use of the projection, row limit, predicate and such. let bytes = self.store.get(&self.path).await?; let projection = match options.projection.as_deref() { @@ -239,7 +214,11 @@ impl IpcReaderAsync { let schema = prepare_schema((&metadata.schema).into(), options.row_index.as_ref()); - let hive_partitions = None; // TODO + // TODO: According to + // https://github.com/pola-rs/polars/pull/14984#discussion_r1521226321 + // we need to re-design hive partitions. + let hive_partitions = None; + materialize_projection( Some(projection), &schema, @@ -259,9 +238,10 @@ impl IpcReaderAsync { } // TODO: Return type i64/u64/usize/alias? - // TODO: Utilize previously obtained metadata. pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult { - todo!(); + unimplemented!( + "the row count specialization for the async IPC reader is not yet implemented" + ) } } diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 31904e371f0d..1b79384b3a8c 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -257,9 +257,6 @@ impl LogicalPlanBuilder { polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref()) .await?; let metadata = reader.metadata().await?; - // TODO: Remove cache in reader and return by value? We drop - // the reader anyways... - let metadata = arrow::io::ipc::read::FileMetadata::clone(&metadata); let reader_schema = Arc::clone(&metadata.schema); let schema = prepare_schema((&reader_schema).into(), row_index.as_ref()); From 56a81fb74d33401537a1917cc86d3246d4670080 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 12 Mar 2024 13:38:51 +0100 Subject: [PATCH 5/7] Remove obsolete todo --- crates/polars-plan/src/logical_plan/functions/count.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/polars-plan/src/logical_plan/functions/count.rs b/crates/polars-plan/src/logical_plan/functions/count.rs index 1bbdb36400e2..1da3484928fc 100644 --- a/crates/polars-plan/src/logical_plan/functions/count.rs +++ b/crates/polars-plan/src/logical_plan/functions/count.rs @@ -116,7 +116,6 @@ async fn count_rows_cloud_parquet( pub(super) fn count_rows_ipc( paths: &Arc<[PathBuf]>, #[cfg(feature = "cloud")] cloud_options: Option<&CloudOptions>, - // TODO: Utilize metadata. metadata: Option<&arrow::io::ipc::read::FileMetadata>, ) -> PolarsResult { if paths.is_empty() { From fc79da2ffe12afc59e07753765336ab6df81316a Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Tue, 12 Mar 2024 15:35:40 +0100 Subject: [PATCH 6/7] address comments and massive simplification --- crates/polars-io/src/ipc/ipc_file.rs | 14 +--- crates/polars-io/src/ipc/ipc_reader_async.rs | 19 ++--- .../polars-plan/src/logical_plan/builder.rs | 73 +++++++------------ 3 files changed, 38 insertions(+), 68 deletions(-) diff --git a/crates/polars-io/src/ipc/ipc_file.rs b/crates/polars-io/src/ipc/ipc_file.rs index 718c5cf92e67..ec0cf8cf2ec0 100644 --- a/crates/polars-io/src/ipc/ipc_file.rs +++ b/crates/polars-io/src/ipc/ipc_file.rs @@ -90,16 +90,6 @@ fn check_mmap_err(err: PolarsError) -> PolarsResult<()> { } impl IpcReader { - #[doc(hidden)] - /// A very bad estimate of the number of rows - /// This estimation will be entirely off if the file is compressed. - /// And will be varying off depending on the data types. - pub fn _num_rows(&mut self) -> PolarsResult { - let metadata = self.get_metadata()?; - let n_cols = metadata.schema.fields.len(); - // this magic number 10 is computed from the yellow trip dataset - Ok((metadata.size as usize) / n_cols / 10) - } fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> { if self.metadata.is_none() { let metadata = read::read_file_metadata(&mut self.reader)?; @@ -164,7 +154,9 @@ impl IpcReader { let rechunk = self.rechunk; let metadata = read::read_file_metadata(&mut self.reader)?; - // TODO: Replace this back with the assertion before merging https://github.com/pola-rs/polars/pull/14861. + // NOTE: For some code paths this already happened. See + // https://github.com/pola-rs/polars/pull/14984#discussion_r1520125000 + // where this was introduced. if let Some(columns) = &self.columns { self.projection = Some(columns_to_projection(columns, &metadata.schema)?); } diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index 5799d99f21ab..69dbd4335552 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -1,14 +1,14 @@ use std::ops::Range; use std::sync::Arc; -use arrow::io::ipc::read::FileMetadata; +use arrow::io::ipc::read::{get_row_count, FileMetadata, OutOfSpecKind}; use bytes::Bytes; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use polars_core::datatypes::IDX_DTYPE; use polars_core::frame::DataFrame; use polars_core::schema::Schema; -use polars_error::{to_compute_err, PolarsResult}; +use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult}; use crate::cloud::{build_object_store, CloudLocation, CloudOptions}; use crate::pl_async::{ @@ -237,25 +237,26 @@ impl IpcReaderAsync { reader.finish_with_scan_ops(options.predicate, verbose) } - // TODO: Return type i64/u64/usize/alias? pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult { - unimplemented!( - "the row count specialization for the async IPC reader is not yet implemented" - ) + // TODO: Only download what is needed rather than the entire file by + // making use of the projection, row limit, predicate and such. + let bytes = self.store.get(&self.path).await?; + get_row_count(&mut std::io::Cursor::new(bytes.as_ref())) } } const FOOTER_METADATA_SIZE: usize = 10; +// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in +// sync and async readers. fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult { let footer_size: usize = i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!())) .try_into() - .map_err(to_compute_err)?; + .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - // TODO: Move to polars-arrow and deduplicate parsing of footer metadata in sync and async readers. if &bytes[4..] != b"ARROW1" { - Err(to_compute_err("invalid ipc footer magic"))? + polars_bail!(oos = OutOfSpecKind::InvalidFooter); } Ok(footer_size) diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 1b79384b3a8c..11966371e374 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -243,7 +243,7 @@ impl LogicalPlanBuilder { let path = path.into(); - let (schema, reader_schema, num_rows, metadata) = if is_cloud_url(&path) { + let metadata = if is_cloud_url(&path) { #[cfg(not(feature = "cloud"))] panic!( "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." @@ -253,60 +253,37 @@ impl LogicalPlanBuilder { { let uri = path.to_string_lossy(); get_runtime().block_on(async { - let reader = - polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref()) - .await?; - let metadata = reader.metadata().await?; - let reader_schema = Arc::clone(&metadata.schema); - - let schema = prepare_schema((&reader_schema).into(), row_index.as_ref()); - PolarsResult::Ok((schema, reader_schema, None, metadata)) + polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref()) + .await? + .metadata() + .await })? } } else { - // NOTE: We do not really need IpcReader and all of it's memoization - // to read the metadata... - let mut file = polars_utils::open_file(&path)?; - - let metadata = arrow::io::ipc::read::read_file_metadata(&mut file)?; - let reader_schema = Arc::clone(&metadata.schema); - let mut schema: Schema = (&reader_schema).into(); - if let Some(rc) = &row_index { - let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); - } - - pub fn _num_rows(metadata: &arrow::io::ipc::read::FileMetadata) -> PolarsResult { - let n_cols = metadata.schema.fields.len(); - // this magic number 10 is computed from the yellow trip dataset - Ok((metadata.size as usize) / n_cols / 10) - } - - let num_rows = _num_rows(&metadata)?; - - let schema = prepare_schema((&reader_schema).into(), row_index.as_ref()); - (schema, reader_schema, Some(num_rows), metadata) + arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new( + polars_utils::open_file(&path)?, + ))? }; - let file_info = FileInfo::new( - schema, - Some(reader_schema), - (num_rows, num_rows.unwrap_or_default()), - ); - - let file_options = FileScanOptions { - with_columns: None, - cache, - n_rows, - rechunk, - row_index, - file_counter: Default::default(), - // TODO! add - hive_partitioning: false, - }; Ok(LogicalPlan::Scan { paths: Arc::new([path]), - file_info, - file_options, + file_info: FileInfo::new( + prepare_schema(metadata.schema.as_ref().into(), row_index.as_ref()), + Some(Arc::clone(&metadata.schema)), + (None, 0), + ), + file_options: FileScanOptions { + with_columns: None, + cache, + n_rows, + rechunk, + row_index, + file_counter: Default::default(), + // TODO: According to + // https://github.com/pola-rs/polars/pull/14984#discussion_r1521226321 + // we need to re-design hive partitions. + hive_partitioning: false, + }, predicate: None, scan_type: FileScan::Ipc { options, From a146d06ee96ee01713cfe5f46aeda16afa0e2c77 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 13 Mar 2024 09:35:44 +0100 Subject: [PATCH 7/7] Remove comments about hive partitioning not being ready for IPC --- crates/polars-io/src/ipc/ipc_reader_async.rs | 3 --- crates/polars-plan/src/logical_plan/builder.rs | 3 --- 2 files changed, 6 deletions(-) diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index 69dbd4335552..1b78fa677325 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -214,9 +214,6 @@ impl IpcReaderAsync { let schema = prepare_schema((&metadata.schema).into(), options.row_index.as_ref()); - // TODO: According to - // https://github.com/pola-rs/polars/pull/14984#discussion_r1521226321 - // we need to re-design hive partitions. let hive_partitions = None; materialize_projection( diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 11966371e374..b3beecddfead 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -279,9 +279,6 @@ impl LogicalPlanBuilder { rechunk, row_index, file_counter: Default::default(), - // TODO: According to - // https://github.com/pola-rs/polars/pull/14984#discussion_r1521226321 - // we need to re-design hive partitions. hive_partitioning: false, }, predicate: None,