diff --git a/crates/polars-arrow/src/io/ipc/read/file.rs b/crates/polars-arrow/src/io/ipc/read/file.rs index 91d276326430..0eb60d0a566c 100644 --- a/crates/polars-arrow/src/io/ipc/read/file.rs +++ b/crates/polars-arrow/src/io/ipc/read/file.rs @@ -215,7 +215,7 @@ fn deserialize_footer_blocks( Ok((footer, blocks)) } -pub(super) fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult { +pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult { let (footer, blocks) = deserialize_footer_blocks(footer_data)?; let ipc_schema = footer diff --git a/crates/polars-arrow/src/io/ipc/read/mod.rs b/crates/polars-arrow/src/io/ipc/read/mod.rs index f79376934427..74d9a93a9309 100644 --- a/crates/polars-arrow/src/io/ipc/read/mod.rs +++ b/crates/polars-arrow/src/io/ipc/read/mod.rs @@ -30,7 +30,9 @@ pub mod file_async; pub(crate) use common::first_dict_field; #[cfg(feature = "io_flight")] pub(crate) use common::{read_dictionary, read_record_batch}; -pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata}; +pub use file::{ + deserialize_footer, read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, +}; use polars_utils::aliases::PlHashMap; pub use reader::FileReader; pub use schema::deserialize_schema; diff --git a/crates/polars-io/src/ipc/ipc_file.rs b/crates/polars-io/src/ipc/ipc_file.rs index e18b3d3bb763..ec0cf8cf2ec0 100644 --- a/crates/polars-io/src/ipc/ipc_file.rs +++ b/crates/polars-io/src/ipc/ipc_file.rs @@ -90,16 +90,6 @@ fn check_mmap_err(err: PolarsError) -> PolarsResult<()> { } impl IpcReader { - #[doc(hidden)] - /// A very bad estimate of the number of rows - /// This estimation will be entirely off if the file is compressed. - /// And will be varying off depending on the data types. - pub fn _num_rows(&mut self) -> PolarsResult { - let metadata = self.get_metadata()?; - let n_cols = metadata.schema.fields.len(); - // this magic number 10 is computed from the yellow trip dataset - Ok((metadata.size as usize) / n_cols / 10) - } fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> { if self.metadata.is_none() { let metadata = read::read_file_metadata(&mut self.reader)?; @@ -164,10 +154,12 @@ impl IpcReader { let rechunk = self.rechunk; let metadata = read::read_file_metadata(&mut self.reader)?; - debug_assert!( - self.columns.is_none(), - "column names must have already been converted into indices", - ); + // NOTE: For some code paths this already happened. See + // https://github.com/pola-rs/polars/pull/14984#discussion_r1520125000 + // where this was introduced. + if let Some(columns) = &self.columns { + self.projection = Some(columns_to_projection(columns, &metadata.schema)?); + } let schema = if let Some(projection) = &self.projection { Arc::new(apply_projection(&metadata.schema, projection)) diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs new file mode 100644 index 000000000000..1b78fa677325 --- /dev/null +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -0,0 +1,260 @@ +use std::ops::Range; +use std::sync::Arc; + +use arrow::io::ipc::read::{get_row_count, FileMetadata, OutOfSpecKind}; +use bytes::Bytes; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use polars_core::datatypes::IDX_DTYPE; +use polars_core::frame::DataFrame; +use polars_core::schema::Schema; +use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult}; + +use crate::cloud::{build_object_store, CloudLocation, CloudOptions}; +use crate::pl_async::{ + tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, +}; +use crate::predicates::PhysicalIoExpr; +use crate::prelude::{materialize_projection, IpcReader}; +use crate::RowIndex; + +/// Polars specific wrapper for `Arc` that limits the number of +/// concurrent requests for the entire application. +#[derive(Debug, Clone)] +pub struct PolarsObjectStore(Arc); + +impl PolarsObjectStore { + pub fn new(store: Arc) -> Self { + Self(store) + } + + pub async fn get(&self, path: &Path) -> PolarsResult { + tune_with_concurrency_budget(1, || async { + self.0 + .get(path) + .await + .map_err(to_compute_err)? + .bytes() + .await + .map_err(to_compute_err) + }) + .await + } + + pub async fn get_range(&self, path: &Path, range: Range) -> PolarsResult { + tune_with_concurrency_budget(1, || self.0.get_range(path, range)) + .await + .map_err(to_compute_err) + } + + pub async fn get_ranges( + &self, + path: &Path, + ranges: &[Range], + ) -> PolarsResult> { + tune_with_concurrency_budget( + (ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32), + || self.0.get_ranges(path, ranges), + ) + .await + .map_err(to_compute_err) + } + + /// Fetch the metadata of the parquet file, do not memoize it. + pub async fn head(&self, path: &Path) -> PolarsResult { + with_concurrency_budget(1, || self.0.head(path)) + .await + .map_err(to_compute_err) + } +} + +/// An Arrow IPC reader implemented on top of PolarsObjectStore. +pub struct IpcReaderAsync { + store: PolarsObjectStore, + path: Path, +} + +#[derive(Default, Clone)] +pub struct IpcReadOptions { + // Names of the columns to include in the output. + projection: Option>, + + // The maximum number of rows to include in the output. + row_limit: Option, + + // Include a column with the row number under the provided name starting at the provided index. + row_index: Option, + + // Only include rows that pass this predicate. + predicate: Option>, +} + +impl IpcReadOptions { + pub fn with_projection(mut self, indices: impl Into>>) -> Self { + self.projection = indices.into(); + self + } + + pub fn with_row_limit(mut self, row_limit: impl Into>) -> Self { + self.row_limit = row_limit.into(); + self + } + + pub fn with_row_index(mut self, row_index: impl Into>) -> Self { + self.row_index = row_index.into(); + self + } + + pub fn with_predicate(mut self, predicate: impl Into>>) -> Self { + self.predicate = predicate.into(); + self + } +} + +impl IpcReaderAsync { + pub async fn from_uri( + uri: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { + let ( + CloudLocation { + prefix, expansion, .. + }, + store, + ) = build_object_store(uri, cloud_options).await?; + + let path = { + // Any wildcards should already have been resolved here. Without this assertion they would + // be ignored. + debug_assert!(expansion.is_none(), "path should not contain wildcards"); + Path::from_url_path(prefix).map_err(to_compute_err)? + }; + + Ok(Self { + store: PolarsObjectStore::new(store), + path, + }) + } + + async fn object_metadata(&self) -> PolarsResult { + self.store.head(&self.path).await + } + + async fn file_size(&self) -> PolarsResult { + Ok(self.object_metadata().await?.size) + } + + pub async fn metadata(&self) -> PolarsResult { + let file_size = self.file_size().await?; + + // TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip. + let footer_metadata = + self.store + .get_range( + &self.path, + file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| { + to_compute_err("ipc file size is smaller than the minimum") + })?..file_size, + ) + .await?; + + let footer_size = deserialize_footer_metadata( + footer_metadata + .as_ref() + .try_into() + .map_err(to_compute_err)?, + )?; + + let footer = self + .store + .get_range( + &self.path, + file_size + .checked_sub(FOOTER_METADATA_SIZE + footer_size) + .ok_or_else(|| { + to_compute_err("invalid ipc footer metadata: footer size too large") + })?..file_size, + ) + .await?; + + arrow::io::ipc::read::deserialize_footer( + footer.as_ref(), + footer_size.try_into().map_err(to_compute_err)?, + ) + } + + pub async fn data( + &self, + metadata: Option<&FileMetadata>, + options: IpcReadOptions, + verbose: bool, + ) -> PolarsResult { + // TODO: Only download what is needed rather than the entire file by + // making use of the projection, row limit, predicate and such. + let bytes = self.store.get(&self.path).await?; + + let projection = match options.projection.as_deref() { + Some(projection) => { + fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema { + if let Some(rc) = row_index { + let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); + } + schema + } + + // Retrieve the metadata for the schema so we can map column names to indices. + let fetched_metadata; + let metadata = if let Some(metadata) = metadata { + metadata + } else { + // This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan. + fetched_metadata = self.metadata().await?; + &fetched_metadata + }; + + let schema = prepare_schema((&metadata.schema).into(), options.row_index.as_ref()); + + let hive_partitions = None; + + materialize_projection( + Some(projection), + &schema, + hive_partitions, + options.row_index.is_some(), + ) + }, + None => None, + }; + + let reader = + as crate::SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref())) + .with_row_index(options.row_index) + .with_n_rows(options.row_limit) + .with_projection(projection); + reader.finish_with_scan_ops(options.predicate, verbose) + } + + pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult { + // TODO: Only download what is needed rather than the entire file by + // making use of the projection, row limit, predicate and such. + let bytes = self.store.get(&self.path).await?; + get_row_count(&mut std::io::Cursor::new(bytes.as_ref())) + } +} + +const FOOTER_METADATA_SIZE: usize = 10; + +// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in +// sync and async readers. +fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult { + let footer_size: usize = + i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!())) + .try_into() + .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; + + if &bytes[4..] != b"ARROW1" { + polars_bail!(oos = OutOfSpecKind::InvalidFooter); + } + + Ok(footer_size) +} diff --git a/crates/polars-io/src/ipc/mod.rs b/crates/polars-io/src/ipc/mod.rs index 1366aa84324f..813fc7f5df78 100644 --- a/crates/polars-io/src/ipc/mod.rs +++ b/crates/polars-io/src/ipc/mod.rs @@ -16,3 +16,8 @@ pub use ipc_file::IpcReader; #[cfg(feature = "ipc_streaming")] pub use ipc_stream::*; pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption}; + +#[cfg(feature = "cloud")] +mod ipc_reader_async; +#[cfg(feature = "cloud")] +pub use ipc_reader_async::*; diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 08f37ab566aa..123eeb9461e1 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -1,5 +1,10 @@ use std::path::PathBuf; +use polars_core::config::env_force_async; +#[cfg(feature = "cloud")] +use polars_io::cloud::CloudOptions; +use polars_io::is_cloud_url; + use super::*; pub struct IpcExec { @@ -8,10 +13,43 @@ pub struct IpcExec { pub(crate) predicate: Option>, pub(crate) options: IpcScanOptions, pub(crate) file_options: FileScanOptions, + #[cfg(feature = "cloud")] + pub(crate) cloud_options: Option, + pub(crate) metadata: Option, } impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { + let is_cloud = is_cloud_url(&self.path); + let force_async = env_force_async(); + + let mut out = if is_cloud || force_async { + #[cfg(not(feature = "cloud"))] + { + panic!("activate cloud feature") + } + + #[cfg(feature = "cloud")] + { + if !is_cloud && verbose { + eprintln!("ASYNC READING FORCED"); + } + + polars_io::pl_async::get_runtime() + .block_on_potential_spawn(self.read_async(verbose))? + } + } else { + self.read_sync(verbose)? + }; + + if self.file_options.rechunk { + out.as_single_chunk_par(); + } + + Ok(out) + } + + fn read_sync(&mut self, verbose: bool) -> PolarsResult { let file = std::fs::File::open(&self.path)?; let (projection, predicate) = prepare_scan_args( self.predicate.clone(), @@ -28,6 +66,26 @@ impl IpcExec { .memory_mapped(self.options.memmap) .finish_with_scan_ops(predicate, verbose) } + + #[cfg(feature = "cloud")] + async fn read_async(&mut self, verbose: bool) -> PolarsResult { + let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + + let reader = + IpcReaderAsync::from_uri(self.path.to_str().unwrap(), self.cloud_options.as_ref()) + .await?; + reader + .data( + self.metadata.as_ref(), + IpcReadOptions::default() + .with_row_limit(self.file_options.n_rows) + .with_row_index(self.file_options.row_index.clone()) + .with_projection(self.file_options.with_columns.as_deref().cloned()) + .with_predicate(predicate), + verbose, + ) + .await + } } impl Executor for IpcExec { diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 8673538f3725..dc6cbc81b255 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -232,7 +232,12 @@ pub fn create_physical_plan( })) }, #[cfg(feature = "ipc")] - FileScan::Ipc { options } => { + FileScan::Ipc { + options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, + } => { assert_eq!(paths.len(), 1); let path = paths[0].clone(); Ok(Box::new(executors::IpcExec { @@ -241,6 +246,9 @@ pub fn create_physical_plan( predicate, options, file_options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, })) }, #[cfg(feature = "parquet")] diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index 653b7e368f91..0cfcb5333a2c 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; use polars_core::prelude::*; +use polars_io::cloud::CloudOptions; use polars_io::RowIndex; use crate::prelude::*; @@ -12,6 +13,8 @@ pub struct ScanArgsIpc { pub rechunk: bool, pub row_index: Option, pub memmap: bool, + #[cfg(feature = "cloud")] + pub cloud_options: Option, } impl Default for ScanArgsIpc { @@ -22,6 +25,8 @@ impl Default for ScanArgsIpc { rechunk: false, row_index: None, memmap: true, + #[cfg(feature = "cloud")] + cloud_options: Default::default(), } } } @@ -58,6 +63,8 @@ impl LazyFileListReader for LazyIpcReader { args.cache, args.row_index.clone(), args.rechunk, + #[cfg(feature = "cloud")] + args.cloud_options, )? .build() .into(); diff --git a/crates/polars-lazy/src/tests/io.rs b/crates/polars-lazy/src/tests/io.rs index 0b22a1a33a4d..24110424f20c 100644 --- a/crates/polars-lazy/src/tests/io.rs +++ b/crates/polars-lazy/src/tests/io.rs @@ -416,6 +416,8 @@ fn test_ipc_globbing() -> PolarsResult<()> { rechunk: false, row_index: None, memmap: true, + #[cfg(feature = "cloud")] + cloud_options: None, }, )? .collect()?; diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index 12af640e2d68..b3beecddfead 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -4,8 +4,6 @@ use std::io::{Read, Seek}; use polars_core::prelude::*; #[cfg(feature = "parquet")] use polars_io::cloud::CloudOptions; -#[cfg(feature = "ipc")] -use polars_io::ipc::IpcReader; #[cfg(all(feature = "parquet", feature = "async"))] use polars_io::parquet::ParquetAsyncReader; #[cfg(feature = "parquet")] @@ -239,38 +237,57 @@ impl LogicalPlanBuilder { cache: bool, row_index: Option, rechunk: bool, + #[cfg(feature = "cloud")] cloud_options: Option, ) -> PolarsResult { - use polars_io::SerReader as _; + use polars_io::is_cloud_url; let path = path.into(); - let file = polars_utils::open_file(&path)?; - let mut reader = IpcReader::new(file); - let reader_schema = reader.schema()?; - let mut schema: Schema = (&reader_schema).into(); - if let Some(rc) = &row_index { - let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE); - } - - let num_rows = reader._num_rows()?; - let file_info = FileInfo::new(Arc::new(schema), Some(reader_schema), (None, num_rows)); + let metadata = if is_cloud_url(&path) { + #[cfg(not(feature = "cloud"))] + panic!( + "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." + ); - let file_options = FileScanOptions { - with_columns: None, - cache, - n_rows, - rechunk, - row_index, - file_counter: Default::default(), - // TODO! add - hive_partitioning: false, + #[cfg(feature = "cloud")] + { + let uri = path.to_string_lossy(); + get_runtime().block_on(async { + polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref()) + .await? + .metadata() + .await + })? + } + } else { + arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new( + polars_utils::open_file(&path)?, + ))? }; + Ok(LogicalPlan::Scan { paths: Arc::new([path]), - file_info, - file_options, + file_info: FileInfo::new( + prepare_schema(metadata.schema.as_ref().into(), row_index.as_ref()), + Some(Arc::clone(&metadata.schema)), + (None, 0), + ), + file_options: FileScanOptions { + with_columns: None, + cache, + n_rows, + rechunk, + row_index, + file_counter: Default::default(), + hive_partitioning: false, + }, predicate: None, - scan_type: FileScan::Ipc { options }, + scan_type: FileScan::Ipc { + options, + #[cfg(feature = "cloud")] + cloud_options, + metadata: Some(metadata), + }, } .into()) } diff --git a/crates/polars-plan/src/logical_plan/file_scan.rs b/crates/polars-plan/src/logical_plan/file_scan.rs index 2364711eef3b..f0720ed4664b 100644 --- a/crates/polars-plan/src/logical_plan/file_scan.rs +++ b/crates/polars-plan/src/logical_plan/file_scan.rs @@ -16,7 +16,13 @@ pub enum FileScan { metadata: Option>, }, #[cfg(feature = "ipc")] - Ipc { options: IpcScanOptions }, + Ipc { + options: IpcScanOptions, + #[cfg(feature = "cloud")] + cloud_options: Option, + #[cfg_attr(feature = "serde", serde(skip))] + metadata: Option, + }, #[cfg_attr(feature = "serde", serde(skip))] Anonymous { options: Arc, @@ -43,7 +49,29 @@ impl PartialEq for FileScan { }, ) => opt_l == opt_r && c_l == c_r, #[cfg(feature = "ipc")] - (FileScan::Ipc { options: l }, FileScan::Ipc { options: r }) => l == r, + ( + FileScan::Ipc { + options: l, + #[cfg(feature = "cloud")] + cloud_options: c_l, + .. + }, + FileScan::Ipc { + options: r, + #[cfg(feature = "cloud")] + cloud_options: c_r, + .. + }, + ) => { + #[cfg(not(feature = "cloud"))] + { + l == r + } + #[cfg(feature = "cloud")] + { + l == r && c_l == c_r + } + }, _ => false, } } diff --git a/crates/polars-plan/src/logical_plan/functions/count.rs b/crates/polars-plan/src/logical_plan/functions/count.rs index 5e28ad4a37fb..1da3484928fc 100644 --- a/crates/polars-plan/src/logical_plan/functions/count.rs +++ b/crates/polars-plan/src/logical_plan/functions/count.rs @@ -1,5 +1,7 @@ #[cfg(feature = "ipc")] -use arrow::io::ipc::read::get_row_count as count_rows_ipc; +use arrow::io::ipc::read::get_row_count as count_rows_ipc_sync; +#[cfg(feature = "ipc")] +use polars_core::error::to_compute_err; #[cfg(feature = "parquet")] use polars_io::cloud::CloudOptions; #[cfg(feature = "csv")] @@ -41,15 +43,21 @@ pub fn count_rows(paths: &Arc<[PathBuf]>, scan_type: &FileScan) -> PolarsResult< Ok(DataFrame::new(vec![Series::new("len", [n_rows as IdxSize])]).unwrap()) }, #[cfg(feature = "ipc")] - FileScan::Ipc { options } => { - let n_rows: PolarsResult = paths - .iter() - .map(|path| { - let mut reader = polars_utils::open_file(path)?; - count_rows_ipc(&mut reader) - }) - .sum(); - Ok(DataFrame::new(vec![Series::new("len", [n_rows? as IdxSize])]).unwrap()) + FileScan::Ipc { + options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, + } => { + let count: IdxSize = count_rows_ipc( + paths, + #[cfg(feature = "cloud")] + cloud_options.as_ref(), + metadata.as_ref(), + )? + .try_into() + .map_err(to_compute_err)?; + Ok(DataFrame::new(vec![Series::new("len", [count])]).unwrap()) }, FileScan::Anonymous { .. } => { unreachable!(); @@ -103,3 +111,52 @@ async fn count_rows_cloud_parquet( .await .map(|rows| rows.iter().sum()) } + +#[cfg(feature = "ipc")] +pub(super) fn count_rows_ipc( + paths: &Arc<[PathBuf]>, + #[cfg(feature = "cloud")] cloud_options: Option<&CloudOptions>, + metadata: Option<&arrow::io::ipc::read::FileMetadata>, +) -> PolarsResult { + if paths.is_empty() { + return Ok(0); + }; + let is_cloud = is_cloud_url(paths.first().unwrap().as_path()); + + if is_cloud { + #[cfg(not(feature = "cloud"))] + panic!("One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled."); + + #[cfg(feature = "cloud")] + { + get_runtime().block_on(count_rows_cloud_ipc(paths, cloud_options, metadata)) + } + } else { + paths + .iter() + .map(|path| { + let mut reader = polars_utils::open_file(path)?; + count_rows_ipc_sync(&mut reader) + }) + .sum() + } +} + +#[cfg(all(feature = "ipc", feature = "async"))] +async fn count_rows_cloud_ipc( + paths: &Arc<[PathBuf]>, + cloud_options: Option<&CloudOptions>, + metadata: Option<&arrow::io::ipc::read::FileMetadata>, +) -> PolarsResult { + use polars_io::ipc::IpcReaderAsync; + + let collection = paths.iter().map(|path| { + with_concurrency_budget(1, || async { + let reader = IpcReaderAsync::from_uri(&path.to_string_lossy(), cloud_options).await?; + reader.count_rows(metadata).await + }) + }); + futures::future::try_join_all(collection) + .await + .map(|rows| rows.iter().sum()) +} diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index ddec69601732..55d07848cb5e 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -222,6 +222,7 @@ def scan_ipc( row_index_offset: int = 0, storage_options: dict[str, Any] | None = None, memory_map: bool = True, + retries: int = 0, ) -> LazyFrame: """ Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns. @@ -252,6 +253,9 @@ def scan_ipc( Try to memory map the file. This can greatly improve performance on repeated queries as the OS may cache pages. Only uncompressed IPC files can be memory mapped. + retries + Number of retries if accessing a cloud instance fails. + """ return pl.LazyFrame._scan_ipc( source, @@ -262,4 +266,5 @@ def scan_ipc( row_index_offset=row_index_offset, storage_options=storage_options, memory_map=memory_map, + retries=retries, ) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index ccb109c48cf6..a099ef55b893 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -491,6 +491,7 @@ def _scan_ipc( row_index_offset: int = 0, storage_options: dict[str, object] | None = None, memory_map: bool = True, + retries: int = 0, ) -> Self: """ Lazily read from an Arrow IPC (Feather v2) file. @@ -528,6 +529,8 @@ def _scan_ipc( rechunk, _prepare_row_index_args(row_index_name, row_index_offset), memory_map=memory_map, + cloud_options=storage_options, + retries=retries, ) return self diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index eeb9f28eb731..b1c3f6b80d39 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -304,7 +304,7 @@ impl PyLazyFrame { #[cfg(feature = "ipc")] #[staticmethod] - #[pyo3(signature = (path, paths, n_rows, cache, rechunk, row_index, memory_map))] + #[pyo3(signature = (path, paths, n_rows, cache, rechunk, row_index, memory_map, cloud_options, retries))] fn new_from_ipc( path: Option, paths: Vec, @@ -313,14 +313,45 @@ impl PyLazyFrame { rechunk: bool, row_index: Option<(String, IdxSize)>, memory_map: bool, + cloud_options: Option>, + retries: usize, ) -> PyResult { let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); + + #[cfg(feature = "cloud")] + let cloud_options = { + let first_path = if let Some(path) = &path { + path + } else { + paths + .first() + .ok_or_else(|| PyValueError::new_err("expected a path argument"))? + }; + + let first_path_url = first_path.to_string_lossy(); + let mut cloud_options = cloud_options + .map(|kv| parse_cloud_options(&first_path_url, kv)) + .transpose()?; + if retries > 0 { + cloud_options = + cloud_options + .or_else(|| Some(CloudOptions::default())) + .map(|mut options| { + options.max_retries = retries; + options + }); + } + cloud_options + }; + let args = ScanArgsIpc { n_rows, cache, rechunk, row_index, memmap: memory_map, + #[cfg(feature = "cloud")] + cloud_options, }; let lf = if let Some(path) = &path { diff --git a/py-polars/tests/unit/io/test_lazy_ipc.py b/py-polars/tests/unit/io/test_lazy_ipc.py index 8702e83af538..ecf2a3e657c2 100644 --- a/py-polars/tests/unit/io/test_lazy_ipc.py +++ b/py-polars/tests/unit/io/test_lazy_ipc.py @@ -1,10 +1,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import sys +from typing import TYPE_CHECKING, Any import pytest import polars as pl +from polars.testing.asserts.frame import assert_frame_equal if TYPE_CHECKING: from pathlib import Path @@ -85,3 +87,30 @@ def test_ipc_list_arg(io_files_path: Path) -> None: assert df.shape == (54, 4) assert df.row(-1) == ("seafood", 194, 12.0, 1) assert df.row(0) == ("vegetables", 45, 0.5, 2) + + +@pytest.mark.skipif( + sys.platform == "win32", reason="object_store does not handle windows-style paths." +) +def test_scan_ipc_local_with_async( + capfd: Any, + monkeypatch: Any, + io_files_path: Path, +) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + + assert_frame_equal( + pl.scan_ipc(io_files_path / "foods1.ipc").head(1).collect(), + pl.DataFrame( + { + "category": ["vegetables"], + "calories": [45], + "fats_g": [0.5], + "sugars_g": [2], + } + ), + ) + + captured = capfd.readouterr().err + assert "ASYNC READING FORCED" in captured