Skip to content

Commit

Permalink
Add IpcReadOptions
Browse files Browse the repository at this point in the history
Probably not enough. Not sure how to represent projection through column
names and indices. Seems like the current IPC readers prioritize names
over indices and do not take the union.
  • Loading branch information
mickvangelderen committed Mar 6, 2024
1 parent f28dc9a commit ed741ae
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 14 deletions.
4 changes: 4 additions & 0 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ impl<R: MmapBytesReader> IpcReader<R> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

if let Some(columns) = &self.columns {
self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
}

let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(&metadata.schema, projection))
} else {
Expand Down
99 changes: 85 additions & 14 deletions crates/polars-io/src/ipc/object_store_reader.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,118 @@
use std::sync::Arc;

use bytes::Bytes;
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use polars_core::frame::DataFrame;
use polars_error::{to_compute_err, PolarsResult};

use crate::pl_async::{get_runtime, with_concurrency_budget};
use crate::predicates::PhysicalIoExpr;
use crate::prelude::IpcReader;
use crate::SerReader;
use crate::{RowIndex, SerReader};

#[derive(Debug, Clone)]
enum Columns {
Names(Vec<String>),
Indices(Vec<usize>),
}

#[derive(Default, Clone)]
pub struct IpcReadOptions {
columns: Option<Columns>,
n_rows: Option<usize>,
row_index: Option<RowIndex>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
}

impl IpcReadOptions {
pub fn column_names(mut self, names: Vec<String>) -> Self {
self.columns = Some(Columns::Names(names));
self
}

pub fn column_indices(mut self, indices: Vec<usize>) -> Self {
self.columns = Some(Columns::Indices(indices));
self
}

pub fn n_rows(mut self, n_rows: usize) -> Self {
self.n_rows = Some(n_rows);
self
}

pub fn row_index(mut self, row_index: RowIndex) -> Self {
self.row_index = Some(row_index);
self
}

pub fn predicate(mut self, predicate: Arc<dyn PhysicalIoExpr>) -> Self {
self.predicate = Some(predicate);
self
}
}

pub trait ObjectStoreIpc {
fn read_ipc(&self, path: &ObjectPath) -> PolarsResult<DataFrame>;
fn read_ipc(&self, path: &ObjectPath, options: IpcReadOptions) -> PolarsResult<DataFrame>;
}

impl<T> ObjectStoreIpc for T
where
T: ObjectStore,
{
fn read_ipc(&self, path: &ObjectPath) -> PolarsResult<DataFrame> {
fn read_ipc(&self, path: &ObjectPath, options: IpcReadOptions) -> PolarsResult<DataFrame> {
// TODO: This will block the current thread until the data has been
// loaded. No other (compute) work can be done on this thread.
//
// What is our work scheduling strategy? From what I have heard, we do
// not intend to make the entire library `async`. However, what do we do
// instead?
get_runtime().block_on(read_ipc_inner(self, path))
get_runtime().block_on(read_ipc_inner(self, path, options))
}
}

async fn read_ipc_inner<T: ObjectStore>(store: &T, path: &ObjectPath) -> PolarsResult<DataFrame> {
async fn read_ipc_inner<T: ObjectStore>(
store: &T,
path: &ObjectPath,
options: IpcReadOptions,
) -> PolarsResult<DataFrame> {
// TODO: Load only what is needed, rather than everything.
let file_bytes = with_concurrency_budget(1, || read_bytes(store, path)).await?;

let reader = IpcReader::new(std::io::Cursor::new(file_bytes));
let mut reader = IpcReader::new(std::io::Cursor::new(file_bytes));

if let Some(columns) = options.columns {
reader = match columns {
Columns::Names(names) => reader.with_columns(Some(names)),
Columns::Indices(indices) => reader.with_projection(Some(indices)),
};
}

let data = reader.finish()?;
let data = reader
.with_n_rows(options.n_rows)
.with_row_index(options.row_index)
.finish_with_scan_ops(options.predicate, false)?;

PolarsResult::Ok(data)
}

async fn read_bytes<T: ObjectStore>(store: &T, path: &ObjectPath) -> PolarsResult<Bytes> {
// TODO: Is `to_compute_err` appropriate? It is used in the Parquet
// reader as well but I am not sure it is what we want.
let get_result = store.get(&path).await.map_err(to_compute_err)?;
let get_result = store.get(path).await.map_err(to_compute_err)?;

// TODO: Perhaps use the streaming interface?
let file_bytes = get_result.bytes().await.map_err(to_compute_err)?;

PolarsResult::Ok(file_bytes)
}

/*
methods die de ipc reader heeft (with columns, with projection, ...)
*/

#[cfg(test)]
mod tests {
use std::io::Cursor;
Expand All @@ -73,15 +137,22 @@ mod tests {

#[test]
fn read_ipc() {
let mut expected_df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let mut df = df!("a" => [1], "b" => [2], "c" => [3]).unwrap();

let store = object_store::memory::InMemory::new();
let path = ObjectPath::parse("data.ipc").unwrap();
write_ipc(&store, &path, &mut expected_df);
write_ipc(&store, &path, &mut df);

let actual_df = store.read_ipc(&path).unwrap();

assert_eq!(actual_df.shape(), expected_df.shape());
assert!(actual_df.equals(&expected_df));
let actual_df = store
.read_ipc(
&path,
IpcReadOptions::default().column_names(vec!["c".to_string(), "b".to_string()]),
)
.unwrap();
let expected_df = df!("c" => [3], "b" => [2]).unwrap();
assert!(
actual_df.equals(&expected_df),
"expected {actual_df:?}\nto equal {expected_df:?}"
);
}
}

0 comments on commit ed741ae

Please sign in to comment.