Skip to content

Commit

Permalink
fix streaming read
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed May 14, 2024
1 parent 7504c1c commit f2da805
Showing 1 changed file with 91 additions and 102 deletions.
193 changes: 91 additions & 102 deletions crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,119 +49,108 @@ impl CsvExec {

let verbose = verbose();

let mut df =
if n_rows.is_some() || (predicate.is_some() && self.file_options.row_index.is_some()) {
// Basic sequential read
// predicate must be done after n_rows and row_index, so we must read sequentially
if verbose {
eprintln!("read per-file to apply n_rows or (predicate + row_index)");
}
let mut df = if n_rows.is_some()
|| (predicate.is_some() && self.file_options.row_index.is_some())
{
// Basic sequential read
// predicate must be done after n_rows and row_index, so we must read sequentially
if verbose {
eprintln!("read per-file to apply n_rows or (predicate + row_index)");
}

let mut n_rows_read = 0usize;
let mut out = Vec::with_capacity(self.paths.len());
// If we have n_rows or row_index then we need to count how many rows we read, so we need
// to delay applying the predicate.
let predicate_during_read = predicate
.clone()
.filter(|_| n_rows.is_none() && self.file_options.row_index.is_none());

for i in 0..self.paths.len() {
let path = &self.paths[i];

let df = options_base
.clone()
.with_row_index(self.file_options.row_index.clone().map(|mut ri| {
ri.offset += n_rows_read as IdxSize;
ri
}))
.with_n_rows(n_rows.map(|n| n - n_rows_read))
.try_into_reader_with_file_path(Some(path.clone()))
.unwrap()
._with_predicate(predicate_during_read.clone())
.finish()?;

n_rows_read = n_rows_read.saturating_add(df.height());

let df = if predicate.is_some() && predicate_during_read.is_none() {
let predicate = predicate.clone().unwrap();

concat_df(
POOL.install(|| {
df.split_chunks()
.collect::<Vec<_>>()
.into_par_iter()
.map(|df| {
let s = predicate.evaluate_io(&df)?;
let mask = s
.bool()
.expect("filter predicates was not of type boolean");
df._filter_seq(mask)
})
.collect::<PolarsResult<Vec<_>>>()
})?
.iter(),
)?
} else {
df
};

out.push(df);

if n_rows.is_some() && n_rows_read == n_rows.unwrap() {
if verbose {
eprintln!(
"reached n_rows = {} at file {} / {}",
n_rows.unwrap(),
1 + i,
self.paths.len()
)
}
break;
}
}
let mut n_rows_read = 0usize;
let mut out = Vec::with_capacity(self.paths.len());
// We can only give this to the reader if we don't have n_rows or row_index
let predicate_during_read = predicate
.clone()
.filter(|_| n_rows.is_none() && self.file_options.row_index.is_none());

for i in 0..self.paths.len() {
let path = &self.paths[i];

concat_df(out.iter())?
} else {
// Basic parallel read
assert!(
n_rows.is_none()
&& !(
// We can do either but not both because we are doing them
// out-of-order for parallel.
predicate.is_some() && self.file_options.row_index.is_some()
let df = options_base
.clone()
.with_row_index(self.file_options.row_index.clone().map(|mut ri| {
ri.offset += n_rows_read as IdxSize;
ri
}))
.with_n_rows(n_rows.map(|n| n - n_rows_read))
.try_into_reader_with_file_path(Some(path.clone()))
.unwrap()
._with_predicate(predicate_during_read.clone())
.finish()?;

n_rows_read = n_rows_read.saturating_add(df.height());
out.push(df);

if n_rows.is_some() && n_rows_read == n_rows.unwrap() {
if verbose {
eprintln!(
"reached n_rows = {} at file {} / {}",
n_rows.unwrap(),
1 + i,
self.paths.len()
)
);
if verbose {
eprintln!("read files in parallel")
}
break;
}
}

let dfs = POOL.install(|| {
self.paths
.chunks(std::cmp::min(POOL.current_num_threads(), 128))
.map(|paths| {
paths
.into_par_iter()
.map(|path| {
options_base
.clone()
.try_into_reader_with_file_path(Some(path.clone()))
.unwrap()
._with_predicate(predicate.clone())
.finish()
})
.collect::<PolarsResult<Vec<_>>>()
if predicate.is_some() && predicate_during_read.is_none() {
let predicate = predicate.unwrap();
out = POOL.install(|| {
out.into_par_iter()
.map(|df| {
let s = predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");
df._filter_seq(mask)
})
.collect::<PolarsResult<Vec<_>>>()
})?;
}

let mut df = concat_df(dfs.iter().flat_map(|dfs| dfs.iter()))?;
concat_df(out.iter())?
} else {
// Basic parallel read
assert!(
n_rows.is_none()
&& !(
// We can do either but not both because we don't do
// them in the correct order for parallel.
predicate.is_some() && self.file_options.row_index.is_some()
)
);
if verbose {
eprintln!("read files in parallel")
}

if let Some(row_index) = self.file_options.row_index.clone() {
df.with_row_index_mut(row_index.name.as_ref(), Some(row_index.offset));
}
let dfs = POOL.install(|| {
self.paths
.chunks(std::cmp::min(POOL.current_num_threads(), 128))
.map(|paths| {
paths
.into_par_iter()
.map(|path| {
options_base
.clone()
.try_into_reader_with_file_path(Some(path.clone()))
.unwrap()
._with_predicate(predicate.clone())
.finish()
})
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()
})?;

let mut df = concat_df(dfs.iter().flat_map(|dfs| dfs.iter()))?;

if let Some(row_index) = self.file_options.row_index.clone() {
df.with_row_index_mut(row_index.name.as_ref(), Some(row_index.offset));
}

df
};
df
};

if self.file_options.rechunk {
df.as_single_chunk_par();
Expand Down

0 comments on commit f2da805

Please sign in to comment.