Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Ipc exec multiple paths #15040

Merged
merged 17 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub fn get_rg_prefetch_size() -> usize {
.unwrap_or_else(|_| std::cmp::max(get_file_prefetch_size(), 128))
}

pub fn env_force_async() -> bool {
pub fn force_async() -> bool {
std::env::var("POLARS_FORCE_ASYNC")
.map(|value| value == "1")
.unwrap_or_default()
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@ pub(crate) fn finish_reader<R: ArrowReader>(
}
};

match rechunk {
true => Ok(df.agg_chunks()),
false => Ok(df),
}
Ok(if rechunk { df.agg_chunks() } else { df })
}

static CLOUD_URL: Lazy<Regex> =
Expand Down
310 changes: 270 additions & 40 deletions crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;

use polars_core::config::env_force_async;
use polars_core::config;
use polars_core::utils::accumulate_dataframes_vertical;
#[cfg(feature = "cloud")]
use polars_io::cloud::CloudOptions;
use polars_io::is_cloud_url;
use polars_io::predicates::apply_predicate;
use polars_io::{is_cloud_url, RowIndex};
use rayon::prelude::*;

use super::*;

pub struct IpcExec {
pub(crate) path: PathBuf,
pub(crate) paths: Arc<[PathBuf]>,
pub(crate) schema: SchemaRef,
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
pub(crate) options: IpcScanOptions,
Expand All @@ -20,10 +25,8 @@ pub struct IpcExec {

impl IpcExec {
fn read(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let is_cloud = is_cloud_url(&self.path);
let force_async = env_force_async();

let mut out = if is_cloud || force_async {
let is_cloud = self.paths.iter().any(is_cloud_url);
let mut out = if is_cloud || config::force_async() {
#[cfg(not(feature = "cloud"))]
{
panic!("activate cloud feature")
Expand All @@ -39,7 +42,7 @@ impl IpcExec {
.block_on_potential_spawn(self.read_async(verbose))?
}
} else {
self.read_sync(verbose)?
self.read_sync()?
};

if self.file_options.rechunk {
Expand All @@ -49,49 +52,226 @@ impl IpcExec {
Ok(out)
}

fn read_sync(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let file = std::fs::File::open(&self.path)?;
let (projection, predicate) = prepare_scan_args(
self.predicate.clone(),
&mut self.file_options.with_columns,
&mut self.schema,
self.file_options.row_index.is_some(),
fn read_sync(&mut self) -> PolarsResult<DataFrame> {
if config::verbose() {
eprintln!("executing ipc read sync with row_index = {:?}, n_rows = {:?}, predicate = {:?} for paths {:?}",
self.file_options.row_index.as_ref(),
self.file_options.n_rows.as_ref(),
self.predicate.is_some(),
self.paths
);
}

let projection = materialize_projection(
self.file_options
.with_columns
.as_deref()
.map(|cols| cols.deref()),
&self.schema,
None,
self.file_options.row_index.is_some(),
);
IpcReader::new(file)
.with_n_rows(self.file_options.n_rows)
.with_row_index(std::mem::take(&mut self.file_options.row_index))
.set_rechunk(self.file_options.rechunk)
.with_projection(projection)
.memory_mapped(self.options.memmap)
.finish_with_scan_ops(predicate, verbose)

let n_rows = self
.file_options
.n_rows
.map(|n| IdxSize::try_from(n).unwrap());

let row_limit = n_rows.unwrap_or(IdxSize::MAX);

// Used to determine the next file to open. This guarantees the order.
let path_index = AtomicUsize::new(0);
let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len()));

let index_and_dfs = (0..self.paths.len())
.into_par_iter()
.map(|_| -> PolarsResult<(usize, DataFrame)> {
let index = path_index.fetch_add(1, Ordering::Relaxed);
let path = &self.paths[index];

let already_read_in_sequence = row_counter.read().unwrap().sum();
if already_read_in_sequence >= row_limit {
return Ok((index, Default::default()));
}

let file = std::fs::File::open(path)?;

let df = IpcReader::new(file)
.with_n_rows(
// NOTE: If there is any file that by itself exceeds the
// row limit, passing the total row limit to each
// individual reader helps.
n_rows.map(|n| {
n.saturating_sub(already_read_in_sequence)
.try_into()
.unwrap()
}),
)
.with_row_index(self.file_options.row_index.clone())
.with_projection(projection.clone())
.memory_mapped(self.options.memmap)
.finish()?;

row_counter
.write()
.unwrap()
.write(index, df.height().try_into().unwrap());

Ok((index, df))
})
.collect::<PolarsResult<Vec<_>>>()?;

finish_index_and_dfs(
index_and_dfs,
row_counter.into_inner().unwrap(),
self.file_options.row_index.as_ref(),
row_limit,
self.predicate.as_ref(),
)
}

#[cfg(feature = "cloud")]
async fn read_async(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);

let reader =
IpcReaderAsync::from_uri(self.path.to_str().unwrap(), self.cloud_options.as_ref())
.await?;
reader
.data(
self.metadata.as_ref(),
IpcReadOptions::default()
.with_row_limit(self.file_options.n_rows)
.with_row_index(self.file_options.row_index.clone())
.with_projection(self.file_options.with_columns.as_deref().cloned())
.with_predicate(predicate),
verbose,
)
.await
use futures::stream::{self, StreamExt};
use futures::TryStreamExt;

/// See https://users.rust-lang.org/t/implementation-of-fnonce-is-not-general-enough-with-async-block/83427/3.
trait AssertSend {
fn assert_send<R>(self) -> impl Send + stream::Stream<Item = R>
where
Self: Send + stream::Stream<Item = R> + Sized,
{
self
}
}

impl<T: Send + stream::Stream + Sized> AssertSend for T {}

let n_rows = self
.file_options
.n_rows
.map(|limit| limit.try_into().unwrap());

let row_limit = n_rows.unwrap_or(IdxSize::MAX);

let row_counter = RwLock::new(ConsecutiveCountState::new(self.paths.len()));

let index_and_dfs = stream::iter(&*self.paths)
.enumerate()
.map(|(index, path)| {
let this = &*self;
let row_counter = &row_counter;
async move {
let already_read_in_sequence = row_counter.read().unwrap().sum();
if already_read_in_sequence >= row_limit {
return Ok((index, Default::default()));
}

let reader = IpcReaderAsync::from_uri(
path.to_str().unwrap(),
this.cloud_options.as_ref(),
)
.await?;
let df = reader
.data(
this.metadata.as_ref(),
IpcReadOptions::default()
.with_row_limit(
// NOTE: If there is any file that by itself
// exceeds the row limit, passing the total
// row limit to each individual reader
// helps.
n_rows.map(|n| {
n.saturating_sub(already_read_in_sequence)
.try_into()
.unwrap()
}),
)
.with_row_index(this.file_options.row_index.clone())
.with_projection(
this.file_options.with_columns.as_deref().cloned(),
),
verbose,
)
.await?;

row_counter
.write()
.unwrap()
.write(index, df.height().try_into().unwrap());

PolarsResult::Ok((index, df))
}
})
.assert_send()
.buffer_unordered(config::get_file_prefetch_size())
.try_collect::<Vec<_>>()
.await?;

finish_index_and_dfs(
index_and_dfs,
row_counter.into_inner().unwrap(),
self.file_options.row_index.as_ref(),
row_limit,
self.predicate.as_ref(),
)
}
}

fn finish_index_and_dfs(
mut index_and_dfs: Vec<(usize, DataFrame)>,
row_counter: ConsecutiveCountState,
row_index: Option<&RowIndex>,
row_limit: IdxSize,
predicate: Option<&Arc<dyn PhysicalExpr>>,
) -> PolarsResult<DataFrame> {
index_and_dfs.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));

debug_assert!(
index_and_dfs.iter().enumerate().all(|(a, &(b, _))| a == b),
"expected dataframe indices in order from 0 to len"
);

debug_assert_eq!(index_and_dfs.len(), row_counter.counts.len());
let mut offset = 0;
let mut df = accumulate_dataframes_vertical(
index_and_dfs
.into_iter()
.zip(row_counter.counts())
.filter_map(|((_, mut df), count)| {
let count = count?;

let remaining = row_limit.checked_sub(offset)?;

// If necessary, correct having read too much from a single file.
if remaining < count {
df = df.slice(0, remaining.try_into().unwrap());
}

// If necessary, correct row indices now that we know the offset.
if let Some(row_index) = row_index {
df.apply(&row_index.name, |series| {
series.idx().expect("index column should be of index type") + offset
})
.expect("index column should exist");
}

offset += count;

Some(df)
}),
)?;

let predicate = predicate.cloned().map(phys_expr_to_io_expr);
apply_predicate(&mut df, predicate.as_deref(), true)?;

Ok(df)
}

impl Executor for IpcExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
let finger_print = FileFingerPrint {
paths: Arc::new([self.path.clone()]),
paths: Arc::clone(&self.paths),
#[allow(clippy::useless_asref)]
predicate: self
.predicate
Expand All @@ -101,7 +281,7 @@ impl Executor for IpcExec {
};

let profile_name = if state.has_node_timer() {
let mut ids = vec![self.path.to_string_lossy().into()];
let mut ids = vec![self.paths[0].to_string_lossy().into()];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken from the parquet implementation. Should we just map each path instead?

if self.predicate.is_some() {
ids.push("predicate".into())
}
Expand All @@ -123,3 +303,53 @@ impl Executor for IpcExec {
)
}
}

// Tracks the sum of consecutive values in a dynamically sized array where the values can be written
// in any order.
struct ConsecutiveCountState {
counts: Box<[IdxSize]>,
next_index: usize,
sum: IdxSize,
}

impl ConsecutiveCountState {
fn new(len: usize) -> Self {
Self {
counts: vec![IdxSize::MAX; len].into_boxed_slice(),
next_index: 0,
sum: 0,
}
}

/// Sum of all consecutive counts.
fn sum(&self) -> IdxSize {
self.sum
}

/// Write count at index.
fn write(&mut self, index: usize, count: IdxSize) {
debug_assert!(
self.counts[index] == IdxSize::MAX,
"second write to same index"
);
debug_assert!(count != IdxSize::MAX, "count can not be IdxSize::MAX");

self.counts[index] = count;

// Update sum and next index.
while self.next_index < self.counts.len() {
let count = self.counts[self.next_index];
if count == IdxSize::MAX {
break;
}
self.sum += count;
self.next_index += 1;
}
}

fn counts(&self) -> impl Iterator<Item = Option<IdxSize>> + '_ {
self.counts
.iter()
.map(|&count| (count != IdxSize::MAX).then_some(count))
}
}
Loading
Loading