From 8bd1872848e93052e4057aac3ba12d40d1bce28b Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 23 Jan 2024 12:01:30 +0100 Subject: [PATCH] fix: ensure order is preserved if streaming from different sources (#13922) --- .../src/executors/sinks/sort/source.rs | 3 +- .../polars-pipe/src/executors/sources/csv.rs | 25 ++++++------ .../src/executors/sources/frame.rs | 5 ++- .../polars-pipe/src/executors/sources/mod.rs | 8 ++++ .../src/executors/sources/parquet.rs | 38 ++++++++++--------- 5 files changed, 47 insertions(+), 32 deletions(-) diff --git a/crates/polars-pipe/src/executors/sinks/sort/source.rs b/crates/polars-pipe/src/executors/sinks/sort/source.rs index 2c0414f50703..4d40efc8a5bb 100644 --- a/crates/polars-pipe/src/executors/sinks/sort/source.rs +++ b/crates/polars-pipe/src/executors/sinks/sort/source.rs @@ -7,6 +7,7 @@ use rayon::prelude::*; use crate::executors::sinks::sort::ooc::read_df; use crate::executors::sinks::sort::sink::sort_accumulated; +use crate::executors::sources::get_source_index; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; pub struct SortSource { @@ -41,7 +42,7 @@ impl SortSource { n_threads, sort_idx, descending, - chunk_offset: 0, + chunk_offset: get_source_index(1) as IdxSize, slice, finished: false, } diff --git a/crates/polars-pipe/src/executors/sources/csv.rs b/crates/polars-pipe/src/executors/sources/csv.rs index 11af352e991a..d760f91ca633 100644 --- a/crates/polars-pipe/src/executors/sources/csv.rs +++ b/crates/polars-pipe/src/executors/sources/csv.rs @@ -7,6 +7,7 @@ use polars_io::csv::read_impl::{BatchedCsvReaderMmap, BatchedCsvReaderRead}; use polars_io::csv::{CsvEncoding, CsvReader}; use polars_plan::global::_set_n_rows_for_scan; use polars_plan::prelude::{CsvParserOptions, FileScanOptions}; +use polars_utils::iter::EnumerateIdxTrait; use super::*; use crate::pipeline::determine_chunk_size; @@ -19,7 +20,6 @@ pub(crate) struct CsvSource { batched_reader: Option, *mut BatchedCsvReaderRead<'static>>>, n_threads: usize, - chunk_index: IdxSize, path: Option, options: Option, file_options: Option, @@ -112,7 +112,6 @@ impl CsvSource { reader: None, batched_reader: None, n_threads: POOL.current_num_threads(), - chunk_index: 0, path: Some(path), options: Some(options), file_options: Some(file_options), @@ -164,19 +163,19 @@ impl Source for CsvSource { }; Ok(match batches { None => SourceResult::Finished, - Some(batches) => SourceResult::GotMoreData( - batches + Some(batches) => { + let index = get_source_index(0); + let out = batches .into_iter() - .map(|data| { - let out = DataChunk { - chunk_index: self.chunk_index, - data, - }; - self.chunk_index += 1; - out + .enumerate_u32() + .map(|(i, data)| DataChunk { + chunk_index: (index + i) as IdxSize, + data, }) - .collect(), - ), + .collect::>(); + get_source_index(out.len() as u32); + SourceResult::GotMoreData(out) + }, }) } fn fmt(&self) -> &str { diff --git a/crates/polars-pipe/src/executors/sources/frame.rs b/crates/polars-pipe/src/executors/sources/frame.rs index b8a4ff4bae20..a990751cf45b 100644 --- a/crates/polars-pipe/src/executors/sources/frame.rs +++ b/crates/polars-pipe/src/executors/sources/frame.rs @@ -7,6 +7,7 @@ use polars_core::utils::split_df; use polars_core::POOL; use polars_utils::IdxSize; +use crate::executors::sources::get_source_index; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; pub struct DataFrameSource { @@ -25,13 +26,15 @@ impl DataFrameSource { impl Source for DataFrameSource { fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult { + let idx_offset = get_source_index(0); let chunks = (&mut self.dfs) .map(|(chunk_index, data)| DataChunk { - chunk_index: chunk_index as IdxSize, + chunk_index: (chunk_index as u32 + idx_offset) as IdxSize, data, }) .take(self.n_threads) .collect::>(); + get_source_index(chunks.len() as u32); if chunks.is_empty() { Ok(SourceResult::Finished) diff --git a/crates/polars-pipe/src/executors/sources/mod.rs b/crates/polars-pipe/src/executors/sources/mod.rs index 72878b1911dc..8270685f23b6 100644 --- a/crates/polars-pipe/src/executors/sources/mod.rs +++ b/crates/polars-pipe/src/executors/sources/mod.rs @@ -7,6 +7,8 @@ mod parquet; mod reproject; mod union; +use std::sync::atomic::{AtomicU32, Ordering}; + #[cfg(feature = "csv")] pub(crate) use csv::CsvSource; pub(crate) use frame::*; @@ -18,3 +20,9 @@ pub(crate) use union::*; #[cfg(feature = "csv")] use super::*; + +static CHUNK_INDEX: AtomicU32 = AtomicU32::new(0); + +pub(super) fn get_source_index(add: u32) -> u32 { + CHUNK_INDEX.fetch_add(add, Ordering::Relaxed) +} diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 82c3f1858926..d12791137ca0 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -19,8 +19,10 @@ use polars_io::utils::check_projected_arrow_schema; use polars_io::{is_cloud_url, SerReader}; use polars_plan::logical_plan::FileInfo; use polars_plan::prelude::{FileScanOptions, ParquetOptions}; +use polars_utils::iter::EnumerateIdxTrait; use polars_utils::IdxSize; +use crate::executors::sources::get_source_index; use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult}; use crate::pipeline::determine_chunk_size; @@ -28,7 +30,6 @@ pub struct ParquetSource { batched_readers: VecDeque, n_threads: usize, processed_paths: usize, - chunk_index: IdxSize, iter: Range, paths: Arc<[PathBuf]>, options: ParquetOptions, @@ -209,7 +210,6 @@ impl ParquetSource { let mut source = ParquetSource { batched_readers: VecDeque::new(), n_threads, - chunk_index: 0, processed_paths: 0, options, file_options, @@ -289,21 +289,25 @@ impl Source for ParquetSource { return self.get_batches(_context); }, Some(batches) => { - let result = SourceResult::GotMoreData( - batches - .into_iter() - .map(|data| { - // Keep the row limit updated so the next reader will have a correct limit. - if let Some(n_rows) = &mut self.file_options.n_rows { - *n_rows = n_rows.saturating_sub(data.height()) - } - - let chunk_index = self.chunk_index; - self.chunk_index += 1; - DataChunk { chunk_index, data } - }) - .collect(), - ); + let idx_offset = get_source_index(0); + let out = batches + .into_iter() + .enumerate_u32() + .map(|(i, data)| { + // Keep the row limit updated so the next reader will have a correct limit. + if let Some(n_rows) = &mut self.file_options.n_rows { + *n_rows = n_rows.saturating_sub(data.height()) + } + + DataChunk { + chunk_index: (idx_offset + i) as IdxSize, + data, + } + }) + .collect::>(); + get_source_index(out.len() as u32); + + let result = SourceResult::GotMoreData(out); // We are not yet done with this reader. // Ensure it is used in next iteration. self.batched_readers.push_front(reader);