diff --git a/crates/polars-io/src/ipc/ipc_file.rs b/crates/polars-io/src/ipc/ipc_file.rs index 671c3cf00ae93..e0f4226a4c0a2 100644 --- a/crates/polars-io/src/ipc/ipc_file.rs +++ b/crates/polars-io/src/ipc/ipc_file.rs @@ -164,6 +164,10 @@ impl IpcReader { let rechunk = self.rechunk; let metadata = read::read_file_metadata(&mut self.reader)?; + if let Some(columns) = &self.columns { + self.projection = Some(columns_to_projection(columns, &metadata.schema)?); + } + let schema = if let Some(projection) = &self.projection { Arc::new(apply_projection(&metadata.schema, projection)) } else { diff --git a/crates/polars-io/src/ipc/object_store_reader.rs b/crates/polars-io/src/ipc/object_store_reader.rs index 8a7a643609ebc..458cf51a77125 100644 --- a/crates/polars-io/src/ipc/object_store_reader.rs +++ b/crates/polars-io/src/ipc/object_store_reader.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use bytes::Bytes; use object_store::path::Path as ObjectPath; use object_store::ObjectStore; @@ -5,35 +7,91 @@ use polars_core::frame::DataFrame; use polars_error::{to_compute_err, PolarsResult}; use crate::pl_async::{get_runtime, with_concurrency_budget}; +use crate::predicates::PhysicalIoExpr; use crate::prelude::IpcReader; -use crate::SerReader; +use crate::{RowIndex, SerReader}; + +#[derive(Debug, Clone)] +enum Columns { + Names(Vec), + Indices(Vec), +} + +#[derive(Default, Clone)] +pub struct IpcReadOptions { + columns: Option, + n_rows: Option, + row_index: Option, + predicate: Option>, +} + +impl IpcReadOptions { + pub fn column_names(mut self, names: Vec) -> Self { + self.columns = Some(Columns::Names(names)); + self + } + + pub fn column_indices(mut self, indices: Vec) -> Self { + self.columns = Some(Columns::Indices(indices)); + self + } + + pub fn n_rows(mut self, n_rows: usize) -> Self { + self.n_rows = Some(n_rows); + self + } + + pub fn row_index(mut self, row_index: RowIndex) -> Self { + self.row_index = Some(row_index); + self + } + + pub fn predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } +} pub trait ObjectStoreIpc { - fn read_ipc(&self, path: &ObjectPath) -> PolarsResult; + fn read_ipc(&self, path: &ObjectPath, options: IpcReadOptions) -> PolarsResult; } impl ObjectStoreIpc for T where T: ObjectStore, { - fn read_ipc(&self, path: &ObjectPath) -> PolarsResult { + fn read_ipc(&self, path: &ObjectPath, options: IpcReadOptions) -> PolarsResult { // TODO: This will block the current thread until the data has been // loaded. No other (compute) work can be done on this thread. // // What is our work scheduling strategy? From what I have heard, we do // not intend to make the entire library `async`. However, what do we do // instead? - get_runtime().block_on(read_ipc_inner(self, path)) + get_runtime().block_on(read_ipc_inner(self, path, options)) } } -async fn read_ipc_inner(store: &T, path: &ObjectPath) -> PolarsResult { +async fn read_ipc_inner( + store: &T, + path: &ObjectPath, + options: IpcReadOptions, +) -> PolarsResult { // TODO: Load only what is needed, rather than everything. let file_bytes = with_concurrency_budget(1, || read_bytes(store, path)).await?; - let reader = IpcReader::new(std::io::Cursor::new(file_bytes)); + let mut reader = IpcReader::new(std::io::Cursor::new(file_bytes)); + + if let Some(columns) = options.columns { + reader = match columns { + Columns::Names(names) => reader.with_columns(Some(names)), + Columns::Indices(indices) => reader.with_projection(Some(indices)), + }; + } - let data = reader.finish()?; + let data = reader + .with_n_rows(options.n_rows) + .with_row_index(options.row_index) + .finish_with_scan_ops(options.predicate, false)?; PolarsResult::Ok(data) } @@ -41,7 +99,7 @@ async fn read_ipc_inner(store: &T, path: &ObjectPath) -> PolarsR async fn read_bytes(store: &T, path: &ObjectPath) -> PolarsResult { // TODO: Is `to_compute_err` appropriate? It is used in the Parquet // reader as well but I am not sure it is what we want. - let get_result = store.get(&path).await.map_err(to_compute_err)?; + let get_result = store.get(path).await.map_err(to_compute_err)?; // TODO: Perhaps use the streaming interface? let file_bytes = get_result.bytes().await.map_err(to_compute_err)?; @@ -49,6 +107,12 @@ async fn read_bytes(store: &T, path: &ObjectPath) -> PolarsResul PolarsResult::Ok(file_bytes) } +/* + +methods die de ipc reader heeft (with columns, with projection, ...) + + */ + #[cfg(test)] mod tests { use std::io::Cursor; @@ -73,15 +137,22 @@ mod tests { #[test] fn read_ipc() { - let mut expected_df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); + let mut df = df!("a" => [1], "b" => [2], "c" => [3]).unwrap(); let store = object_store::memory::InMemory::new(); let path = ObjectPath::parse("data.ipc").unwrap(); - write_ipc(&store, &path, &mut expected_df); + write_ipc(&store, &path, &mut df); - let actual_df = store.read_ipc(&path).unwrap(); - - assert_eq!(actual_df.shape(), expected_df.shape()); - assert!(actual_df.equals(&expected_df)); + let actual_df = store + .read_ipc( + &path, + IpcReadOptions::default().column_names(vec!["c".to_string(), "b".to_string()]), + ) + .unwrap(); + let expected_df = df!("c" => [3], "b" => [2]).unwrap(); + assert!( + actual_df.equals(&expected_df), + "expected {actual_df:?}\nto equal {expected_df:?}" + ); } }