Skip to content

Commit

Permalink
Integrate IpcReaderAsync with execution engine
Browse files Browse the repository at this point in the history
  • Loading branch information
mickvangelderen committed Mar 11, 2024
1 parent 975d7a0 commit 0d2de57
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 81 deletions.
8 changes: 4 additions & 4 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ impl<R: MmapBytesReader> IpcReader<R> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

debug_assert!(
self.columns.is_none(),
"column names must have already been converted into indices",
);
// TODO: Replace this back with the assertion before merging https://github.com/pola-rs/polars/pull/14861.
if let Some(columns) = &self.columns {
self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
}

let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(&metadata.schema, projection))
Expand Down
170 changes: 121 additions & 49 deletions crates/polars-io/src/ipc/ipc_reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,32 @@ use arrow::io::ipc::read::FileMetadata;
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_core::datatypes::IDX_DTYPE;
use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_error::{to_compute_err, PolarsResult};
use tokio::sync::{MappedMutexGuard, Mutex, MutexGuard};

use crate::cloud::{build_object_store, CloudLocation, CloudOptions};
use crate::pl_async::{
tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};
use crate::prelude::IpcReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::{materialize_projection, IpcReader};
use crate::RowIndex;

/// Polars specific wrapper for Arc<dyn ObjectStore> that limits the number of
/// concurrent requests for the entire application.
#[derive(Debug, Clone)]
pub struct PolarsObjectStore(Arc<dyn ObjectStore>);

impl<O> From<O> for PolarsObjectStore
where
O: ObjectStore,
{
fn from(value: O) -> Self {
Self::new(Arc::new(value))
}
}

impl PolarsObjectStore {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self(store)
}

pub async fn get(&self, path: &Path) -> PolarsResult<Bytes> {
tune_with_concurrency_budget(1, || async {
self
.0
self.0
.get(path)
.await
.map_err(to_compute_err)?
Expand Down Expand Up @@ -79,8 +72,45 @@ impl PolarsObjectStore {
pub struct IpcReaderAsync {
store: PolarsObjectStore,
path: Path,
object_metadata: Mutex<Option<ObjectMeta>>,
ipc_metadata: Mutex<Option<FileMetadata>>,
// object_metadata: Mutex<Option<ObjectMeta>>,
// ipc_metadata: Mutex<Option<FileMetadata>>,
}

#[derive(Default, Clone)]
pub struct IpcReadOptions {
// Names of the columns to include in the output.
projection: Option<Vec<String>>,

// The maximum number of rows to include in the output.
row_limit: Option<usize>,

// Include a column with the row number under the provided name starting at the provided index.
row_index: Option<RowIndex>,

// Only include rows that pass this predicate.
predicate: Option<Arc<dyn PhysicalIoExpr>>,
}

impl IpcReadOptions {
pub fn with_projection(mut self, indices: impl Into<Option<Vec<String>>>) -> Self {
self.projection = indices.into();
self
}

pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
self.row_limit = row_limit.into();
self
}

pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.row_index = row_index.into();
self
}

pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
self.predicate = predicate.into();
self
}
}

impl IpcReaderAsync {
Expand All @@ -103,34 +133,34 @@ impl IpcReaderAsync {
debug_assert!(expansion.is_none(), "path should not contain wildcards");
Path::from_url_path(prefix).map_err(to_compute_err)?
},
object_metadata: Default::default(),
ipc_metadata: Default::default(),
// object_metadata: Default::default(),
// ipc_metadata: Default::default(),
})
}

async fn fetch_object_metadata(&self) -> PolarsResult<ObjectMeta> {
async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
self.store.head(&self.path).await
}

async fn object_metadata(&self) -> PolarsResult<MappedMutexGuard<'_, ObjectMeta>> {
// NOTE: We have to be careful of deadlocks because we wait here for a
// `head` request to finish which waits for a semaphore. If that
// semaphore is also used in places that call this function, we can get
// a deadlock.
let mut object_metadata = self.object_metadata.lock().await;
if object_metadata.is_none() {
*object_metadata = Some(self.fetch_object_metadata().await?);
}
Ok(MutexGuard::map(object_metadata, |x| {
x.as_mut().unwrap_or_else(|| unreachable!())
}))
}
// async fn object_metadata(&self) -> PolarsResult<MappedMutexGuard<'_, ObjectMeta>> {
// // NOTE: We have to be careful of deadlocks because we wait here for a
// // `head` request to finish which waits for a semaphore. If that
// // semaphore is also used in places that call this function, we can get
// // a deadlock.
// let mut object_metadata = self.object_metadata.lock().await;
// if object_metadata.is_none() {
// *object_metadata = Some(self.fetch_object_metadata().await?);
// }
// Ok(MutexGuard::map(object_metadata, |x| {
// x.as_mut().unwrap_or_else(|| unreachable!())
// }))
// }

async fn file_size(&self) -> PolarsResult<usize> {
Ok(self.object_metadata().await?.size)
}

async fn fetch_ipc_metadata(&self) -> PolarsResult<FileMetadata> {
pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
let file_size = self.file_size().await?;

// TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.
Expand Down Expand Up @@ -169,27 +199,69 @@ impl IpcReaderAsync {
)
}

pub async fn metadata(&self) -> PolarsResult<MappedMutexGuard<'_, FileMetadata>> {
let mut ipc_metadata = self.ipc_metadata.lock().await;
if ipc_metadata.is_none() {
*ipc_metadata = Some(self.fetch_ipc_metadata().await?)
}
Ok(MutexGuard::map(ipc_metadata, |x| {
x.as_mut().unwrap_or_else(|| unreachable!())
}))
}
// pub async fn metadata(&self) -> PolarsResult<MappedMutexGuard<'_, FileMetadata>> {
// let mut ipc_metadata = self.ipc_metadata.lock().await;
// if ipc_metadata.is_none() {
// *ipc_metadata = Some(self.fetch_ipc_metadata().await?)
// }
// Ok(MutexGuard::map(ipc_metadata, |x| {
// x.as_mut().unwrap_or_else(|| unreachable!())
// }))
// }

pub async fn data(&self) -> PolarsResult<DataFrame> {
// TODO: Utilize previously obtained metadata.
pub async fn data(
&self,
metadata: Option<&FileMetadata>,
options: IpcReadOptions,
verbose: bool,
) -> PolarsResult<DataFrame> {
let bytes = self.store.get(&self.path).await?;

let projection = match options.projection.as_deref() {
Some(projection) => {
fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
if let Some(rc) = row_index {
let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE);
}
schema
}

// Retrieve the metadata for the schema so we can map column names to indices.
let fetched_metadata;
let metadata = if let Some(metadata) = metadata {
metadata
} else {
// This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan.
fetched_metadata = self.metadata().await?;
&fetched_metadata
};

let schema = prepare_schema((&metadata.schema).into(), options.row_index.as_ref());

let hive_partitions = None; // TODO
materialize_projection(
Some(projection),
&schema,
hive_partitions,
options.row_index.is_some(),
)
},
None => None,
};

let reader =
<IpcReader<_> as crate::SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
.with_row_index(None) // TODO
.with_n_rows(None) // TODO
.with_columns(None) // TODO
.with_projection(None); // TODO
let predicate = None; // TODO
let verbose = false; // TODO
reader.finish_with_scan_ops(predicate, verbose)
.with_row_index(options.row_index)
.with_n_rows(options.row_limit)
.with_projection(projection);
reader.finish_with_scan_ops(options.predicate, verbose)
}

// TODO: Return type i64/u64/usize/alias?
// TODO: Utilize previously obtained metadata.
pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
todo!();
}
}

Expand Down
58 changes: 58 additions & 0 deletions crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::path::PathBuf;

use polars_core::config::env_force_async;
#[cfg(feature = "cloud")]
use polars_io::cloud::CloudOptions;
use polars_io::is_cloud_url;

use super::*;

pub struct IpcExec {
Expand All @@ -8,10 +13,43 @@ pub struct IpcExec {
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
pub(crate) options: IpcScanOptions,
pub(crate) file_options: FileScanOptions,
#[cfg(feature = "cloud")]
pub(crate) cloud_options: Option<CloudOptions>,
pub(crate) metadata: Option<arrow::io::ipc::read::FileMetadata>,
}

impl IpcExec {
fn read(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let is_cloud = is_cloud_url(&self.path);
let force_async = env_force_async();

let mut out = if is_cloud || force_async {
#[cfg(not(feature = "cloud"))]
{
panic!("activate cloud feature")
}

#[cfg(feature = "cloud")]
{
if !is_cloud && verbose {
eprintln!("ASYNC READING FORCED");
}

polars_io::pl_async::get_runtime()
.block_on_potential_spawn(self.read_async(verbose))?
}
} else {
self.read_sync(verbose)?
};

if self.file_options.rechunk {
out.as_single_chunk_par();
}

Ok(out)
}

fn read_sync(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let file = std::fs::File::open(&self.path)?;
let (projection, predicate) = prepare_scan_args(
self.predicate.clone(),
Expand All @@ -28,6 +66,26 @@ impl IpcExec {
.memory_mapped(self.options.memmap)
.finish_with_scan_ops(predicate, verbose)
}

#[cfg(feature = "cloud")]
async fn read_async(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);

let reader =
IpcReaderAsync::from_uri(self.path.to_str().unwrap(), self.cloud_options.as_ref())
.await?;
reader
.data(
self.metadata.as_ref(),
IpcReadOptions::default()
.with_row_limit(self.file_options.n_rows)
.with_row_index(self.file_options.row_index.clone())
.with_projection(self.file_options.with_columns.as_deref().cloned())
.with_predicate(predicate),
verbose,
)
.await
}
}

impl Executor for IpcExec {
Expand Down
10 changes: 9 additions & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,12 @@ pub fn create_physical_plan(
}))
},
#[cfg(feature = "ipc")]
FileScan::Ipc { options } => {
FileScan::Ipc {
options,
#[cfg(feature = "cloud")]
cloud_options,
metadata,
} => {
assert_eq!(paths.len(), 1);
let path = paths[0].clone();
Ok(Box::new(executors::IpcExec {
Expand All @@ -241,6 +246,9 @@ pub fn create_physical_plan(
predicate,
options,
file_options,
#[cfg(feature = "cloud")]
cloud_options,
metadata,
}))
},
#[cfg(feature = "parquet")]
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::path::{Path, PathBuf};

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::RowIndex;

use crate::prelude::*;
Expand All @@ -12,6 +13,8 @@ pub struct ScanArgsIpc {
pub rechunk: bool,
pub row_index: Option<RowIndex>,
pub memmap: bool,
#[cfg(feature = "cloud")]
pub cloud_options: Option<CloudOptions>,
}

impl Default for ScanArgsIpc {
Expand All @@ -22,6 +25,7 @@ impl Default for ScanArgsIpc {
rechunk: false,
row_index: None,
memmap: true,
cloud_options: Default::default(),
}
}
}
Expand Down Expand Up @@ -58,6 +62,7 @@ impl LazyFileListReader for LazyIpcReader {
args.cache,
args.row_index.clone(),
args.rechunk,
args.cloud_options,
)?
.build()
.into();
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ fn test_ipc_globbing() -> PolarsResult<()> {
rechunk: false,
row_index: None,
memmap: true,
#[cfg(feature = "cloud")]
cloud_options: None,
},
)?
.collect()?;
Expand Down
Loading

0 comments on commit 0d2de57

Please sign in to comment.