diff --git a/crates/polars-core/src/config.rs b/crates/polars-core/src/config.rs index dee5e0103e54..a1d06d081e8e 100644 --- a/crates/polars-core/src/config.rs +++ b/crates/polars-core/src/config.rs @@ -55,7 +55,7 @@ pub fn get_rg_prefetch_size() -> usize { .unwrap_or_else(|_| std::cmp::max(get_file_prefetch_size(), 128)) } -pub fn env_force_async() -> bool { +pub fn force_async() -> bool { std::env::var("POLARS_FORCE_ASYNC") .map(|value| value == "1") .unwrap_or_default() diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index 8e75c9c74c99..7d60e5dd64b9 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -149,10 +149,7 @@ pub(crate) fn finish_reader( } }; - match rechunk { - true => Ok(df.agg_chunks()), - false => Ok(df), - } + Ok(if rechunk { df.agg_chunks() } else { df }) } static CLOUD_URL: Lazy = diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs index 123eeb9461e1..f3759a54c727 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs @@ -1,14 +1,19 @@ use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::RwLock; -use polars_core::config::env_force_async; +use polars_core::config; +use polars_core::utils::accumulate_dataframes_vertical; #[cfg(feature = "cloud")] use polars_io::cloud::CloudOptions; -use polars_io::is_cloud_url; +use polars_io::predicates::apply_predicate; +use polars_io::{is_cloud_url, RowIndex}; +use rayon::prelude::*; use super::*; pub struct IpcExec { - pub(crate) path: PathBuf, + pub(crate) paths: Arc<[PathBuf]>, pub(crate) schema: SchemaRef, pub(crate) predicate: Option>, pub(crate) options: IpcScanOptions, @@ -20,10 +25,8 @@ pub struct IpcExec { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { - let is_cloud = is_cloud_url(&self.path); - let force_async = env_force_async(); - - let mut out = if is_cloud || force_async { + let is_cloud = self.paths.iter().any(is_cloud_url); + let mut out = if is_cloud || config::force_async() { #[cfg(not(feature = "cloud"))] { panic!("activate cloud feature") @@ -39,7 +42,7 @@ impl IpcExec { .block_on_potential_spawn(self.read_async(verbose))? } } else { - self.read_sync(verbose)? + self.read_sync()? }; if self.file_options.rechunk { @@ -49,49 +52,226 @@ impl IpcExec { Ok(out) } - fn read_sync(&mut self, verbose: bool) -> PolarsResult { - let file = std::fs::File::open(&self.path)?; - let (projection, predicate) = prepare_scan_args( - self.predicate.clone(), - &mut self.file_options.with_columns, - &mut self.schema, - self.file_options.row_index.is_some(), + fn read_sync(&mut self) -> PolarsResult { + if config::verbose() { + eprintln!("executing ipc read sync with row_index = {:?}, n_rows = {:?}, predicate = {:?} for paths {:?}", + self.file_options.row_index.as_ref(), + self.file_options.n_rows.as_ref(), + self.predicate.is_some(), + self.paths + ); + } + + let projection = materialize_projection( + self.file_options + .with_columns + .as_deref() + .map(|cols| cols.deref()), + &self.schema, None, + self.file_options.row_index.is_some(), ); - IpcReader::new(file) - .with_n_rows(self.file_options.n_rows) - .with_row_index(std::mem::take(&mut self.file_options.row_index)) - .set_rechunk(self.file_options.rechunk) - .with_projection(projection) - .memory_mapped(self.options.memmap) - .finish_with_scan_ops(predicate, verbose) + + let n_rows = self + .file_options + .n_rows + .map(|n| IdxSize::try_from(n).unwrap()); + + let row_limit = n_rows.unwrap_or(IdxSize::MAX); + + // Used to determine the next file to open. This guarantees the order. + let path_index = AtomicUsize::new(0); + let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); + + let index_and_dfs = (0..self.paths.len()) + .into_par_iter() + .map(|_| -> PolarsResult<(usize, DataFrame)> { + let index = path_index.fetch_add(1, Ordering::Relaxed); + let path = &self.paths[index]; + + let already_read_in_sequence = row_counter.read().unwrap().sum(); + if already_read_in_sequence >= row_limit { + return Ok((index, Default::default())); + } + + let file = std::fs::File::open(path)?; + + let df = IpcReader::new(file) + .with_n_rows( + // NOTE: If there is any file that by itself exceeds the + // row limit, passing the total row limit to each + // individual reader helps. + n_rows.map(|n| { + n.saturating_sub(already_read_in_sequence) + .try_into() + .unwrap() + }), + ) + .with_row_index(self.file_options.row_index.clone()) + .with_projection(projection.clone()) + .memory_mapped(self.options.memmap) + .finish()?; + + row_counter + .write() + .unwrap() + .write(index, df.height().try_into().unwrap()); + + Ok((index, df)) + }) + .collect::>>()?; + + finish_index_and_dfs( + index_and_dfs, + row_counter.into_inner().unwrap(), + self.file_options.row_index.as_ref(), + row_limit, + self.predicate.as_ref(), + ) } #[cfg(feature = "cloud")] async fn read_async(&mut self, verbose: bool) -> PolarsResult { - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); - - let reader = - IpcReaderAsync::from_uri(self.path.to_str().unwrap(), self.cloud_options.as_ref()) - .await?; - reader - .data( - self.metadata.as_ref(), - IpcReadOptions::default() - .with_row_limit(self.file_options.n_rows) - .with_row_index(self.file_options.row_index.clone()) - .with_projection(self.file_options.with_columns.as_deref().cloned()) - .with_predicate(predicate), - verbose, - ) - .await + use futures::stream::{self, StreamExt}; + use futures::TryStreamExt; + + /// See https://users.rust-lang.org/t/implementation-of-fnonce-is-not-general-enough-with-async-block/83427/3. + trait AssertSend { + fn assert_send(self) -> impl Send + stream::Stream + where + Self: Send + stream::Stream + Sized, + { + self + } + } + + impl AssertSend for T {} + + let n_rows = self + .file_options + .n_rows + .map(|limit| limit.try_into().unwrap()); + + let row_limit = n_rows.unwrap_or(IdxSize::MAX); + + let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len())); + + let index_and_dfs = stream::iter(&*self.paths) + .enumerate() + .map(|(index, path)| { + let this = &*self; + let row_counter = &row_counter; + async move { + let already_read_in_sequence = row_counter.read().unwrap().sum(); + if already_read_in_sequence >= row_limit { + return Ok((index, Default::default())); + } + + let reader = IpcReaderAsync::from_uri( + path.to_str().unwrap(), + this.cloud_options.as_ref(), + ) + .await?; + let df = reader + .data( + this.metadata.as_ref(), + IpcReadOptions::default() + .with_row_limit( + // NOTE: If there is any file that by itself + // exceeds the row limit, passing the total + // row limit to each individual reader + // helps. + n_rows.map(|n| { + n.saturating_sub(already_read_in_sequence) + .try_into() + .unwrap() + }), + ) + .with_row_index(this.file_options.row_index.clone()) + .with_projection( + this.file_options.with_columns.as_deref().cloned(), + ), + verbose, + ) + .await?; + + row_counter + .write() + .unwrap() + .write(index, df.height().try_into().unwrap()); + + PolarsResult::Ok((index, df)) + } + }) + .assert_send() + .buffer_unordered(config::get_file_prefetch_size()) + .try_collect::>() + .await?; + + finish_index_and_dfs( + index_and_dfs, + row_counter.into_inner().unwrap(), + self.file_options.row_index.as_ref(), + row_limit, + self.predicate.as_ref(), + ) } } +fn finish_index_and_dfs( + mut index_and_dfs: Vec<(usize, DataFrame)>, + row_counter: ConsecutiveCountState, + row_index: Option<&RowIndex>, + row_limit: IdxSize, + predicate: Option<&Arc>, +) -> PolarsResult { + index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); + + debug_assert!( + index_and_dfs.iter().enumerate().all(|(a, &(b, _))| a == b), + "expected dataframe indices in order from 0 to len" + ); + + debug_assert_eq!(index_and_dfs.len(), row_counter.counts.len()); + let mut offset = 0; + let mut df = accumulate_dataframes_vertical( + index_and_dfs + .into_iter() + .zip(row_counter.counts()) + .filter_map(|((_, mut df), count)| { + let count = count?; + + let remaining = row_limit.checked_sub(offset)?; + + // If necessary, correct having read too much from a single file. + if remaining < count { + df = df.slice(0, remaining.try_into().unwrap()); + } + + // If necessary, correct row indices now that we know the offset. + if let Some(row_index) = row_index { + df.apply(&row_index.name, |series| { + series.idx().expect("index column should be of index type") + offset + }) + .expect("index column should exist"); + } + + offset += count; + + Some(df) + }), + )?; + + let predicate = predicate.cloned().map(phys_expr_to_io_expr); + apply_predicate(&mut df, predicate.as_deref(), true)?; + + Ok(df) +} + impl Executor for IpcExec { fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult { let finger_print = FileFingerPrint { - paths: Arc::new([self.path.clone()]), + paths: Arc::clone(&self.paths), #[allow(clippy::useless_asref)] predicate: self .predicate @@ -101,7 +281,7 @@ impl Executor for IpcExec { }; let profile_name = if state.has_node_timer() { - let mut ids = vec![self.path.to_string_lossy().into()]; + let mut ids = vec![self.paths[0].to_string_lossy().into()]; if self.predicate.is_some() { ids.push("predicate".into()) } @@ -123,3 +303,53 @@ impl Executor for IpcExec { ) } } + +// Tracks the sum of consecutive values in a dynamically sized array where the values can be written +// in any order. +struct ConsecutiveCountState { + counts: Box<[IdxSize]>, + next_index: usize, + sum: IdxSize, +} + +impl ConsecutiveCountState { + fn new(len: usize) -> Self { + Self { + counts: vec![IdxSize::MAX; len].into_boxed_slice(), + next_index: 0, + sum: 0, + } + } + + /// Sum of all consecutive counts. + fn sum(&self) -> IdxSize { + self.sum + } + + /// Write count at index. + fn write(&mut self, index: usize, count: IdxSize) { + debug_assert!( + self.counts[index] == IdxSize::MAX, + "second write to same index" + ); + debug_assert!(count != IdxSize::MAX, "count can not be IdxSize::MAX"); + + self.counts[index] = count; + + // Update sum and next index. + while self.next_index < self.counts.len() { + let count = self.counts[self.next_index]; + if count == IdxSize::MAX { + break; + } + self.sum += count; + self.next_index += 1; + } + } + + fn counts(&self) -> impl Iterator> + '_ { + self.counts + .iter() + .map(|&count| (count != IdxSize::MAX).then_some(count)) + } +} diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index ef5d7f8da914..083874daaf75 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use polars_core::config::env_force_async; +use polars_core::config; #[cfg(feature = "cloud")] use polars_core::config::{get_file_prefetch_size, verbose}; use polars_core::utils::accumulate_dataframes_vertical; @@ -346,7 +346,7 @@ impl ParquetExec { )); }, }; - let force_async = env_force_async(); + let force_async = config::force_async(); let out = if is_cloud || force_async { #[cfg(not(feature = "cloud"))] @@ -356,6 +356,10 @@ impl ParquetExec { #[cfg(feature = "cloud")] { + if !is_cloud && config::verbose() { + eprintln!("ASYNC READING FORCED"); + } + polars_io::pl_async::get_runtime().block_on_potential_spawn(self.read_async())? } } else { diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index dc6cbc81b255..15db692585d4 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -237,20 +237,16 @@ pub fn create_physical_plan( #[cfg(feature = "cloud")] cloud_options, metadata, - } => { - assert_eq!(paths.len(), 1); - let path = paths[0].clone(); - Ok(Box::new(executors::IpcExec { - path, - schema: file_info.schema, - predicate, - options, - file_options, - #[cfg(feature = "cloud")] - cloud_options, - metadata, - })) - }, + } => Ok(Box::new(executors::IpcExec { + paths, + schema: file_info.schema, + predicate, + options, + file_options, + #[cfg(feature = "cloud")] + cloud_options, + metadata, + })), #[cfg(feature = "parquet")] FileScan::Parquet { options, diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index a849bec3d1c8..6d378708c8f8 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -49,19 +49,35 @@ impl LazyIpcReader { } impl LazyFileListReader for LazyIpcReader { + fn finish(mut self) -> PolarsResult { + if let Some(paths) = self.iter_paths()? { + let paths = paths + .into_iter() + .collect::>>()?; + self.paths = paths; + } + self.finish_no_glob() + } + fn finish_no_glob(self) -> PolarsResult { let args = self.args; - let path = self.path; + + let paths = if self.paths.is_empty() { + Arc::new([self.path]) as Arc<[PathBuf]> + } else { + self.paths + }; let options = IpcScanOptions { memmap: args.memmap, }; + let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc( - path, + paths, options, args.n_rows, args.cache, - args.row_index.clone(), + args.row_index, args.rechunk, #[cfg(feature = "cloud")] args.cloud_options, @@ -70,11 +86,6 @@ impl LazyFileListReader for LazyIpcReader { .into(); lf.opt_state.file_caching = true; - // it is a bit hacky, but this `with_row_index` function updates the schema - if let Some(row_index) = args.row_index { - lf = lf.with_row_index(&row_index.name, Some(row_index.offset)) - } - Ok(lf) } diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 85e19f4bfc71..40855c9cc3a0 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -4,7 +4,7 @@ use std::path::PathBuf; use std::sync::Arc; use arrow::datatypes::ArrowSchemaRef; -use polars_core::config::{env_force_async, get_file_prefetch_size}; +use polars_core::config::{self, get_file_prefetch_size}; use polars_core::error::*; use polars_core::prelude::Series; use polars_core::POOL; @@ -204,7 +204,7 @@ impl ParquetSource { if verbose { eprintln!("POLARS PREFETCH_SIZE: {}", prefetch_size) } - let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || env_force_async(); + let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async(); let mut source = ParquetSource { batched_readers: VecDeque::new(), diff --git a/crates/polars-plan/src/logical_plan/builder.rs b/crates/polars-plan/src/logical_plan/builder.rs index b3beecddfead..be7ed03b2a0f 100644 --- a/crates/polars-plan/src/logical_plan/builder.rs +++ b/crates/polars-plan/src/logical_plan/builder.rs @@ -230,8 +230,8 @@ impl LogicalPlanBuilder { } #[cfg(feature = "ipc")] - pub fn scan_ipc>( - path: P, + pub fn scan_ipc>>( + paths: P, options: IpcScanOptions, n_rows: Option, cache: bool, @@ -241,9 +241,14 @@ impl LogicalPlanBuilder { ) -> PolarsResult { use polars_io::is_cloud_url; - let path = path.into(); + let paths = paths.into(); - let metadata = if is_cloud_url(&path) { + // Use first path to get schema. + let path = paths + .first() + .ok_or_else(|| polars_err!(ComputeError: "expected at least 1 path"))?; + + let metadata = if is_cloud_url(path) { #[cfg(not(feature = "cloud"))] panic!( "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." @@ -261,12 +266,12 @@ impl LogicalPlanBuilder { } } else { arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new( - polars_utils::open_file(&path)?, + polars_utils::open_file(path)?, ))? }; Ok(LogicalPlan::Scan { - paths: Arc::new([path]), + paths, file_info: FileInfo::new( prepare_schema(metadata.schema.as_ref().into(), row_index.as_ref()), Some(Arc::clone(&metadata.schema)), diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 2ea3bfa91a60..9f8a483626f9 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -2,7 +2,7 @@ from dataclasses import dataclass from math import ceil -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import pytest @@ -21,6 +21,18 @@ class _RowIndex: offset: int = 0 +def _enable_force_async(monkeypatch: pytest.MonkeyPatch) -> None: + """Modifies the provided monkeypatch context.""" + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + + +def _assert_force_async(capfd: Any) -> None: + """Calls `capfd.readouterr`, consuming the captured output so far.""" + captured = capfd.readouterr().err + assert captured.count("ASYNC READING FORCED") == 1 + + def _scan( file_path: Path, schema: SchemaDict | None = None, @@ -29,27 +41,31 @@ def _scan( suffix = file_path.suffix row_index_name = None if row_index is None else row_index.name row_index_offset = 0 if row_index is None else row_index.offset + if suffix == ".ipc": - return pl.scan_ipc( + result = pl.scan_ipc( file_path, row_index_name=row_index_name, row_index_offset=row_index_offset, ) - if suffix == ".parquet": - return pl.scan_parquet( + elif suffix == ".parquet": + result = pl.scan_parquet( file_path, row_index_name=row_index_name, row_index_offset=row_index_offset, ) - if suffix == ".csv": - return pl.scan_csv( + elif suffix == ".csv": + result = pl.scan_csv( file_path, schema=schema, row_index_name=row_index_name, row_index_offset=row_index_offset, ) - msg = f"Unknown suffix {suffix}" - raise NotImplementedError(msg) + else: + msg = f"Unknown suffix {suffix}" + raise NotImplementedError(msg) + + return result def _write(df: pl.DataFrame, file_path: Path) -> None: @@ -77,6 +93,17 @@ def session_tmp_dir(tmp_path_factory: pytest.TempPathFactory) -> Path: return tmp_path_factory.mktemp("polars-test") +@pytest.fixture( + params=[False, True], + ids=["sync", "async"], +) +def force_async( + request: pytest.FixtureRequest, monkeypatch: pytest.MonkeyPatch +) -> bool: + value: bool = request.param + return value + + @dataclass class _DataFile: path: Path @@ -161,14 +188,38 @@ def data_file( @pytest.mark.write_disk() -def test_scan(data_file: _DataFile) -> None: +def test_scan( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = _scan(data_file.path, data_file.df.schema).collect() + + if force_async: + _assert_force_async(capfd) + assert_frame_equal(df, data_file.df) @pytest.mark.write_disk() -def test_scan_with_limit(data_file: _DataFile) -> None: +def test_scan_with_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = _scan(data_file.path, data_file.df.schema).limit(4483).collect() + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -180,12 +231,24 @@ def test_scan_with_limit(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_filter(data_file: _DataFile) -> None: +def test_scan_with_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -197,13 +260,25 @@ def test_scan_with_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_filter_and_limit(data_file: _DataFile) -> None: +def test_scan_with_filter_and_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema) .filter(pl.col("sequence") % 2 == 0) .limit(4483) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -215,13 +290,25 @@ def test_scan_with_filter_and_limit(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_limit_and_filter(data_file: _DataFile) -> None: +def test_scan_with_limit_and_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema) .limit(4483) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -233,12 +320,24 @@ def test_scan_with_limit_and_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_and_limit(data_file: _DataFile) -> None: +def test_scan_with_row_index_and_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .limit(4483) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -252,12 +351,24 @@ def test_scan_with_row_index_and_limit(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_and_filter(data_file: _DataFile) -> None: +def test_scan_with_row_index_and_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -271,13 +382,25 @@ def test_scan_with_row_index_and_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_limit_and_filter(data_file: _DataFile) -> None: +def test_scan_with_row_index_limit_and_filter( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .limit(4483) .filter(pl.col("sequence") % 2 == 0) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame( @@ -291,13 +414,25 @@ def test_scan_with_row_index_limit_and_filter(data_file: _DataFile) -> None: @pytest.mark.write_disk() -def test_scan_with_row_index_filter_and_limit(data_file: _DataFile) -> None: +def test_scan_with_row_index_filter_and_limit( + capfd: Any, monkeypatch: pytest.MonkeyPatch, data_file: _DataFile, force_async: bool +) -> None: + if data_file.path.suffix == ".csv" and force_async: + pytest.skip(reason="async reading of .csv not yet implemented") + + if force_async: + _enable_force_async(monkeypatch) + df = ( _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) .filter(pl.col("sequence") % 2 == 0) .limit(4483) .collect() ) + + if force_async: + _assert_force_async(capfd) + assert_frame_equal( df, pl.DataFrame(