diff --git a/crates/polars-io/src/ipc/mod.rs b/crates/polars-io/src/ipc/mod.rs index 1366aa84324fe..1b4c00b8adf45 100644 --- a/crates/polars-io/src/ipc/mod.rs +++ b/crates/polars-io/src/ipc/mod.rs @@ -16,3 +16,8 @@ pub use ipc_file::IpcReader; #[cfg(feature = "ipc_streaming")] pub use ipc_stream::*; pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption}; + +#[cfg(feature = "cloud")] +mod object_store_reader; +#[cfg(feature = "cloud")] +pub use object_store_reader::*; diff --git a/crates/polars-io/src/ipc/object_store_reader.rs b/crates/polars-io/src/ipc/object_store_reader.rs new file mode 100644 index 0000000000000..8a7a643609ebc --- /dev/null +++ b/crates/polars-io/src/ipc/object_store_reader.rs @@ -0,0 +1,87 @@ +use bytes::Bytes; +use object_store::path::Path as ObjectPath; +use object_store::ObjectStore; +use polars_core::frame::DataFrame; +use polars_error::{to_compute_err, PolarsResult}; + +use crate::pl_async::{get_runtime, with_concurrency_budget}; +use crate::prelude::IpcReader; +use crate::SerReader; + +pub trait ObjectStoreIpc { + fn read_ipc(&self, path: &ObjectPath) -> PolarsResult; +} + +impl ObjectStoreIpc for T +where + T: ObjectStore, +{ + fn read_ipc(&self, path: &ObjectPath) -> PolarsResult { + // TODO: This will block the current thread until the data has been + // loaded. No other (compute) work can be done on this thread. + // + // What is our work scheduling strategy? From what I have heard, we do + // not intend to make the entire library `async`. However, what do we do + // instead? + get_runtime().block_on(read_ipc_inner(self, path)) + } +} + +async fn read_ipc_inner(store: &T, path: &ObjectPath) -> PolarsResult { + // TODO: Load only what is needed, rather than everything. + let file_bytes = with_concurrency_budget(1, || read_bytes(store, path)).await?; + + let reader = IpcReader::new(std::io::Cursor::new(file_bytes)); + + let data = reader.finish()?; + + PolarsResult::Ok(data) +} + +async fn read_bytes(store: &T, path: &ObjectPath) -> PolarsResult { + // TODO: Is `to_compute_err` appropriate? It is used in the Parquet + // reader as well but I am not sure it is what we want. + let get_result = store.get(&path).await.map_err(to_compute_err)?; + + // TODO: Perhaps use the streaming interface? + let file_bytes = get_result.bytes().await.map_err(to_compute_err)?; + + PolarsResult::Ok(file_bytes) +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use polars_core::df; + + use super::*; + use crate::prelude::IpcWriter; + use crate::SerWriter; + + fn to_ipc_bytes(df: &mut DataFrame) -> Vec { + let mut writer: Cursor> = Cursor::new(Vec::new()); + IpcWriter::new(&mut writer).finish(df).unwrap(); + writer.into_inner() + } + + fn write_ipc(store: &T, path: &ObjectPath, df: &mut DataFrame) { + get_runtime() + .block_on(store.put(path, to_ipc_bytes(df).into())) + .unwrap(); + } + + #[test] + fn read_ipc() { + let mut expected_df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); + + let store = object_store::memory::InMemory::new(); + let path = ObjectPath::parse("data.ipc").unwrap(); + write_ipc(&store, &path, &mut expected_df); + + let actual_df = store.read_ipc(&path).unwrap(); + + assert_eq!(actual_df.shape(), expected_df.shape()); + assert!(actual_df.equals(&expected_df)); + } +}