-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f603531
commit 66301f9
Showing
2 changed files
with
91 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
use bytes::Bytes; | ||
use object_store::path::Path as ObjectPath; | ||
use object_store::ObjectStore; | ||
use polars_core::frame::DataFrame; | ||
use polars_error::{to_compute_err, PolarsResult}; | ||
|
||
use crate::pl_async::{get_runtime, with_concurrency_budget}; | ||
use crate::prelude::IpcReader; | ||
use crate::SerReader; | ||
|
||
pub trait ObjectStoreIpc { | ||
fn read_ipc(&self, path: &ObjectPath) -> PolarsResult<DataFrame>; | ||
} | ||
|
||
impl<T> ObjectStoreIpc for T | ||
where | ||
T: ObjectStore, | ||
{ | ||
fn read_ipc(&self, path: &ObjectPath) -> PolarsResult<DataFrame> { | ||
// TODO: This will block the current thread until the data has been | ||
// loaded. No other (compute) work can be done on this thread. | ||
// | ||
// What is our work scheduling strategy? From what I have heard, we do | ||
// not intend to make the entire library `async`. However, what do we do | ||
// instead? | ||
get_runtime().block_on(read_ipc_inner(self, path)) | ||
} | ||
} | ||
|
||
async fn read_ipc_inner<T: ObjectStore>(store: &T, path: &ObjectPath) -> PolarsResult<DataFrame> { | ||
// TODO: Load only what is needed, rather than everything. | ||
let file_bytes = with_concurrency_budget(1, || read_bytes(store, path)) | ||
.await?; | ||
|
||
let reader = IpcReader::new(std::io::Cursor::new(file_bytes)); | ||
|
||
let data = reader.finish()?; | ||
|
||
PolarsResult::Ok(data) | ||
} | ||
|
||
async fn read_bytes<T: ObjectStore>(store: &T, path: &ObjectPath) -> PolarsResult<Bytes> { | ||
// TODO: Is `to_compute_err` appropriate? It is used in the Parquet | ||
// reader as well but I am not sure it is what we want. | ||
let get_result = store.get(&path).await.map_err(to_compute_err)?; | ||
|
||
// TODO: Perhaps use the streaming interface? | ||
let file_bytes = get_result.bytes().await.map_err(to_compute_err)?; | ||
|
||
PolarsResult::Ok(file_bytes) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::io::Cursor; | ||
|
||
use polars_core::df; | ||
|
||
use crate::{prelude::IpcWriter, SerWriter}; | ||
|
||
use super::*; | ||
|
||
fn to_ipc_bytes(df: &mut DataFrame) -> Vec<u8> { | ||
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new()); | ||
IpcWriter::new(&mut writer).finish(df).unwrap(); | ||
writer.into_inner() | ||
} | ||
|
||
fn write_ipc<T: ObjectStore>(store: &T, path: &ObjectPath, df: &mut DataFrame) { | ||
get_runtime().block_on(store.put(path, to_ipc_bytes(df).into())).unwrap(); | ||
} | ||
|
||
#[test] | ||
fn read_ipc() { | ||
let mut expected_df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); | ||
|
||
let store = object_store::memory::InMemory::new(); | ||
let path = ObjectPath::parse("data.ipc").unwrap(); | ||
write_ipc(&store, &path, &mut expected_df); | ||
|
||
let actual_df = store.read_ipc(&path).unwrap(); | ||
|
||
assert_eq!(actual_df.shape(), expected_df.shape()); | ||
assert!(actual_df.equals(&expected_df)); | ||
} | ||
} |