Skip to content

Commit

Permalink
fix: ensure order is preserved if streaming from different sources (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 23, 2024
1 parent 1bdac4e commit 8bd1872
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 32 deletions.
3 changes: 2 additions & 1 deletion crates/polars-pipe/src/executors/sinks/sort/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rayon::prelude::*;

use crate::executors::sinks::sort::ooc::read_df;
use crate::executors::sinks::sort::sink::sort_accumulated;
use crate::executors::sources::get_source_index;
use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult};

pub struct SortSource {
Expand Down Expand Up @@ -41,7 +42,7 @@ impl SortSource {
n_threads,
sort_idx,
descending,
chunk_offset: 0,
chunk_offset: get_source_index(1) as IdxSize,
slice,
finished: false,
}
Expand Down
25 changes: 12 additions & 13 deletions crates/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_io::csv::read_impl::{BatchedCsvReaderMmap, BatchedCsvReaderRead};
use polars_io::csv::{CsvEncoding, CsvReader};
use polars_plan::global::_set_n_rows_for_scan;
use polars_plan::prelude::{CsvParserOptions, FileScanOptions};
use polars_utils::iter::EnumerateIdxTrait;

use super::*;
use crate::pipeline::determine_chunk_size;
Expand All @@ -19,7 +20,6 @@ pub(crate) struct CsvSource {
batched_reader:
Option<Either<*mut BatchedCsvReaderMmap<'static>, *mut BatchedCsvReaderRead<'static>>>,
n_threads: usize,
chunk_index: IdxSize,
path: Option<PathBuf>,
options: Option<CsvParserOptions>,
file_options: Option<FileScanOptions>,
Expand Down Expand Up @@ -112,7 +112,6 @@ impl CsvSource {
reader: None,
batched_reader: None,
n_threads: POOL.current_num_threads(),
chunk_index: 0,
path: Some(path),
options: Some(options),
file_options: Some(file_options),
Expand Down Expand Up @@ -164,19 +163,19 @@ impl Source for CsvSource {
};
Ok(match batches {
None => SourceResult::Finished,
Some(batches) => SourceResult::GotMoreData(
batches
Some(batches) => {
let index = get_source_index(0);
let out = batches
.into_iter()
.map(|data| {
let out = DataChunk {
chunk_index: self.chunk_index,
data,
};
self.chunk_index += 1;
out
.enumerate_u32()
.map(|(i, data)| DataChunk {
chunk_index: (index + i) as IdxSize,
data,
})
.collect(),
),
.collect::<Vec<_>>();
get_source_index(out.len() as u32);
SourceResult::GotMoreData(out)
},
})
}
fn fmt(&self) -> &str {
Expand Down
5 changes: 4 additions & 1 deletion crates/polars-pipe/src/executors/sources/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::utils::split_df;
use polars_core::POOL;
use polars_utils::IdxSize;

use crate::executors::sources::get_source_index;
use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult};

pub struct DataFrameSource {
Expand All @@ -25,13 +26,15 @@ impl DataFrameSource {

impl Source for DataFrameSource {
fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult<SourceResult> {
let idx_offset = get_source_index(0);
let chunks = (&mut self.dfs)
.map(|(chunk_index, data)| DataChunk {
chunk_index: chunk_index as IdxSize,
chunk_index: (chunk_index as u32 + idx_offset) as IdxSize,
data,
})
.take(self.n_threads)
.collect::<Vec<_>>();
get_source_index(chunks.len() as u32);

if chunks.is_empty() {
Ok(SourceResult::Finished)
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-pipe/src/executors/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod parquet;
mod reproject;
mod union;

use std::sync::atomic::{AtomicU32, Ordering};

#[cfg(feature = "csv")]
pub(crate) use csv::CsvSource;
pub(crate) use frame::*;
Expand All @@ -18,3 +20,9 @@ pub(crate) use union::*;

#[cfg(feature = "csv")]
use super::*;

static CHUNK_INDEX: AtomicU32 = AtomicU32::new(0);

pub(super) fn get_source_index(add: u32) -> u32 {
CHUNK_INDEX.fetch_add(add, Ordering::Relaxed)
}
38 changes: 21 additions & 17 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ use polars_io::utils::check_projected_arrow_schema;
use polars_io::{is_cloud_url, SerReader};
use polars_plan::logical_plan::FileInfo;
use polars_plan::prelude::{FileScanOptions, ParquetOptions};
use polars_utils::iter::EnumerateIdxTrait;
use polars_utils::IdxSize;

use crate::executors::sources::get_source_index;
use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult};
use crate::pipeline::determine_chunk_size;

pub struct ParquetSource {
batched_readers: VecDeque<BatchedParquetReader>,
n_threads: usize,
processed_paths: usize,
chunk_index: IdxSize,
iter: Range<usize>,
paths: Arc<[PathBuf]>,
options: ParquetOptions,
Expand Down Expand Up @@ -209,7 +210,6 @@ impl ParquetSource {
let mut source = ParquetSource {
batched_readers: VecDeque::new(),
n_threads,
chunk_index: 0,
processed_paths: 0,
options,
file_options,
Expand Down Expand Up @@ -289,21 +289,25 @@ impl Source for ParquetSource {
return self.get_batches(_context);
},
Some(batches) => {
let result = SourceResult::GotMoreData(
batches
.into_iter()
.map(|data| {
// Keep the row limit updated so the next reader will have a correct limit.
if let Some(n_rows) = &mut self.file_options.n_rows {
*n_rows = n_rows.saturating_sub(data.height())
}

let chunk_index = self.chunk_index;
self.chunk_index += 1;
DataChunk { chunk_index, data }
})
.collect(),
);
let idx_offset = get_source_index(0);
let out = batches
.into_iter()
.enumerate_u32()
.map(|(i, data)| {
// Keep the row limit updated so the next reader will have a correct limit.
if let Some(n_rows) = &mut self.file_options.n_rows {
*n_rows = n_rows.saturating_sub(data.height())
}

DataChunk {
chunk_index: (idx_offset + i) as IdxSize,
data,
}
})
.collect::<Vec<_>>();
get_source_index(out.len() as u32);

let result = SourceResult::GotMoreData(out);
// We are not yet done with this reader.
// Ensure it is used in next iteration.
self.batched_readers.push_front(reader);
Expand Down

0 comments on commit 8bd1872

Please sign in to comment.