Skip to content

Commit

Permalink
Implement read_ipc for ObjectStore
Browse files Browse the repository at this point in the history
  • Loading branch information
mickvangelderen committed Mar 6, 2024
1 parent 772a90c commit f28dc9a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
5 changes: 5 additions & 0 deletions crates/polars-io/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ pub use ipc_file::IpcReader;
#[cfg(feature = "ipc_streaming")]
pub use ipc_stream::*;
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption};

#[cfg(feature = "cloud")]
mod object_store_reader;
#[cfg(feature = "cloud")]
pub use object_store_reader::*;
87 changes: 87 additions & 0 deletions crates/polars-io/src/ipc/object_store_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use bytes::Bytes;
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use polars_core::frame::DataFrame;
use polars_error::{to_compute_err, PolarsResult};

use crate::pl_async::{get_runtime, with_concurrency_budget};
use crate::prelude::IpcReader;
use crate::SerReader;

pub trait ObjectStoreIpc {
fn read_ipc(&self, path: &ObjectPath) -> PolarsResult<DataFrame>;
}

impl<T> ObjectStoreIpc for T
where
T: ObjectStore,
{
fn read_ipc(&self, path: &ObjectPath) -> PolarsResult<DataFrame> {
// TODO: This will block the current thread until the data has been
// loaded. No other (compute) work can be done on this thread.
//
// What is our work scheduling strategy? From what I have heard, we do
// not intend to make the entire library `async`. However, what do we do
// instead?
get_runtime().block_on(read_ipc_inner(self, path))
}
}

async fn read_ipc_inner<T: ObjectStore>(store: &T, path: &ObjectPath) -> PolarsResult<DataFrame> {
// TODO: Load only what is needed, rather than everything.
let file_bytes = with_concurrency_budget(1, || read_bytes(store, path)).await?;

let reader = IpcReader::new(std::io::Cursor::new(file_bytes));

let data = reader.finish()?;

PolarsResult::Ok(data)
}

async fn read_bytes<T: ObjectStore>(store: &T, path: &ObjectPath) -> PolarsResult<Bytes> {
// TODO: Is `to_compute_err` appropriate? It is used in the Parquet
// reader as well but I am not sure it is what we want.
let get_result = store.get(&path).await.map_err(to_compute_err)?;

// TODO: Perhaps use the streaming interface?
let file_bytes = get_result.bytes().await.map_err(to_compute_err)?;

PolarsResult::Ok(file_bytes)
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use polars_core::df;

use super::*;
use crate::prelude::IpcWriter;
use crate::SerWriter;

fn to_ipc_bytes(df: &mut DataFrame) -> Vec<u8> {
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcWriter::new(&mut writer).finish(df).unwrap();
writer.into_inner()
}

fn write_ipc<T: ObjectStore>(store: &T, path: &ObjectPath, df: &mut DataFrame) {
get_runtime()
.block_on(store.put(path, to_ipc_bytes(df).into()))
.unwrap();
}

#[test]
fn read_ipc() {
let mut expected_df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();

let store = object_store::memory::InMemory::new();
let path = ObjectPath::parse("data.ipc").unwrap();
write_ipc(&store, &path, &mut expected_df);

let actual_df = store.read_ipc(&path).unwrap();

assert_eq!(actual_df.shape(), expected_df.shape());
assert!(actual_df.equals(&expected_df));
}
}

0 comments on commit f28dc9a

Please sign in to comment.