From ed741ae700994c213b7dda6f481a5e7e01d7a655 Mon Sep 17 00:00:00 2001 From: Mick van Gelderen Date: Wed, 6 Mar 2024 16:16:39 +0100 Subject: [PATCH] Add IpcReadOptions Probably not enough. Not sure how to represent projection through column names and indices. Seems like the current IPC readers prioritize names over indices and do not take the union. --- crates/polars-io/src/ipc/ipc_file.rs | 4 + .../polars-io/src/ipc/object_store_reader.rs | 99 ++++++++++++++++--- 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/crates/polars-io/src/ipc/ipc_file.rs b/crates/polars-io/src/ipc/ipc_file.rs index 671c3cf00ae93..e0f4226a4c0a2 100644 --- a/crates/polars-io/src/ipc/ipc_file.rs +++ b/crates/polars-io/src/ipc/ipc_file.rs @@ -164,6 +164,10 @@ impl IpcReader { let rechunk = self.rechunk; let metadata = read::read_file_metadata(&mut self.reader)?; + if let Some(columns) = &self.columns { + self.projection = Some(columns_to_projection(columns, &metadata.schema)?); + } + let schema = if let Some(projection) = &self.projection { Arc::new(apply_projection(&metadata.schema, projection)) } else { diff --git a/crates/polars-io/src/ipc/object_store_reader.rs b/crates/polars-io/src/ipc/object_store_reader.rs index 8a7a643609ebc..458cf51a77125 100644 --- a/crates/polars-io/src/ipc/object_store_reader.rs +++ b/crates/polars-io/src/ipc/object_store_reader.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use bytes::Bytes; use object_store::path::Path as ObjectPath; use object_store::ObjectStore; @@ -5,35 +7,91 @@ use polars_core::frame::DataFrame; use polars_error::{to_compute_err, PolarsResult}; use crate::pl_async::{get_runtime, with_concurrency_budget}; +use crate::predicates::PhysicalIoExpr; use crate::prelude::IpcReader; -use crate::SerReader; +use crate::{RowIndex, SerReader}; + +#[derive(Debug, Clone)] +enum Columns { + Names(Vec), + Indices(Vec), +} + +#[derive(Default, Clone)] +pub struct IpcReadOptions { + columns: Option, + n_rows: Option, + row_index: Option, + predicate: Option>, +} + +impl IpcReadOptions { + pub fn column_names(mut self, names: Vec) -> Self { + self.columns = Some(Columns::Names(names)); + self + } + + pub fn column_indices(mut self, indices: Vec) -> Self { + self.columns = Some(Columns::Indices(indices)); + self + } + + pub fn n_rows(mut self, n_rows: usize) -> Self { + self.n_rows = Some(n_rows); + self + } + + pub fn row_index(mut self, row_index: RowIndex) -> Self { + self.row_index = Some(row_index); + self + } + + pub fn predicate(mut self, predicate: Arc) -> Self { + self.predicate = Some(predicate); + self + } +} pub trait ObjectStoreIpc { - fn read_ipc(&self, path: &ObjectPath) -> PolarsResult; + fn read_ipc(&self, path: &ObjectPath, options: IpcReadOptions) -> PolarsResult; } impl ObjectStoreIpc for T where T: ObjectStore, { - fn read_ipc(&self, path: &ObjectPath) -> PolarsResult { + fn read_ipc(&self, path: &ObjectPath, options: IpcReadOptions) -> PolarsResult { // TODO: This will block the current thread until the data has been // loaded. No other (compute) work can be done on this thread. // // What is our work scheduling strategy? From what I have heard, we do // not intend to make the entire library `async`. However, what do we do // instead? - get_runtime().block_on(read_ipc_inner(self, path)) + get_runtime().block_on(read_ipc_inner(self, path, options)) } } -async fn read_ipc_inner(store: &T, path: &ObjectPath) -> PolarsResult { +async fn read_ipc_inner( + store: &T, + path: &ObjectPath, + options: IpcReadOptions, +) -> PolarsResult { // TODO: Load only what is needed, rather than everything. let file_bytes = with_concurrency_budget(1, || read_bytes(store, path)).await?; - let reader = IpcReader::new(std::io::Cursor::new(file_bytes)); + let mut reader = IpcReader::new(std::io::Cursor::new(file_bytes)); + + if let Some(columns) = options.columns { + reader = match columns { + Columns::Names(names) => reader.with_columns(Some(names)), + Columns::Indices(indices) => reader.with_projection(Some(indices)), + }; + } - let data = reader.finish()?; + let data = reader + .with_n_rows(options.n_rows) + .with_row_index(options.row_index) + .finish_with_scan_ops(options.predicate, false)?; PolarsResult::Ok(data) } @@ -41,7 +99,7 @@ async fn read_ipc_inner(store: &T, path: &ObjectPath) -> PolarsR async fn read_bytes(store: &T, path: &ObjectPath) -> PolarsResult { // TODO: Is `to_compute_err` appropriate? It is used in the Parquet // reader as well but I am not sure it is what we want. - let get_result = store.get(&path).await.map_err(to_compute_err)?; + let get_result = store.get(path).await.map_err(to_compute_err)?; // TODO: Perhaps use the streaming interface? let file_bytes = get_result.bytes().await.map_err(to_compute_err)?; @@ -49,6 +107,12 @@ async fn read_bytes(store: &T, path: &ObjectPath) -> PolarsResul PolarsResult::Ok(file_bytes) } +/* + +methods die de ipc reader heeft (with columns, with projection, ...) + + */ + #[cfg(test)] mod tests { use std::io::Cursor; @@ -73,15 +137,22 @@ mod tests { #[test] fn read_ipc() { - let mut expected_df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); + let mut df = df!("a" => [1], "b" => [2], "c" => [3]).unwrap(); let store = object_store::memory::InMemory::new(); let path = ObjectPath::parse("data.ipc").unwrap(); - write_ipc(&store, &path, &mut expected_df); + write_ipc(&store, &path, &mut df); - let actual_df = store.read_ipc(&path).unwrap(); - - assert_eq!(actual_df.shape(), expected_df.shape()); - assert!(actual_df.equals(&expected_df)); + let actual_df = store + .read_ipc( + &path, + IpcReadOptions::default().column_names(vec!["c".to_string(), "b".to_string()]), + ) + .unwrap(); + let expected_df = df!("c" => [3], "b" => [2]).unwrap(); + assert!( + actual_df.equals(&expected_df), + "expected {actual_df:?}\nto equal {expected_df:?}" + ); } }