Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): Implement IpcReaderAsync #14984

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
}

impl<R: MmapBytesReader> IpcReader<R> {
#[doc(hidden)]
/// A very bad estimate of the number of rows
/// This estimation will be entirely off if the file is compressed.
/// And will be varying off depending on the data types.
pub fn _num_rows(&mut self) -> PolarsResult<usize> {
let metadata = self.get_metadata()?;
let n_cols = metadata.schema.fields.len();
// this magic number 10 is computed from the yellow trip dataset
Ok((metadata.size as usize) / n_cols / 10)
}
fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
if self.metadata.is_none() {
let metadata = read::read_file_metadata(&mut self.reader)?;
Expand Down Expand Up @@ -164,7 +154,9 @@ impl<R: MmapBytesReader> IpcReader<R> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

// TODO: Replace this back with the assertion before merging https://github.com/pola-rs/polars/pull/14861.
// NOTE: For some code paths this already happened. See
// https://github.com/pola-rs/polars/pull/14984#discussion_r1520125000
// where this was introduced.
if let Some(columns) = &self.columns {
self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
}
Expand Down
19 changes: 10 additions & 9 deletions crates/polars-io/src/ipc/ipc_reader_async.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::ops::Range;
use std::sync::Arc;

use arrow::io::ipc::read::FileMetadata;
use arrow::io::ipc::read::{get_row_count, FileMetadata, OutOfSpecKind};
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_core::datatypes::IDX_DTYPE;
use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_error::{to_compute_err, PolarsResult};
use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult};

use crate::cloud::{build_object_store, CloudLocation, CloudOptions};
use crate::pl_async::{
Expand Down Expand Up @@ -237,25 +237,26 @@ impl IpcReaderAsync {
reader.finish_with_scan_ops(options.predicate, verbose)
}

// TODO: Return type i64/u64/usize/alias?
pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
unimplemented!(
"the row count specialization for the async IPC reader is not yet implemented"
)
// TODO: Only download what is needed rather than the entire file by
// making use of the projection, row limit, predicate and such.
let bytes = self.store.get(&self.path).await?;
get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
}
}

const FOOTER_METADATA_SIZE: usize = 10;

// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in
// sync and async readers.
fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
let footer_size: usize =
i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
.try_into()
.map_err(to_compute_err)?;
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;

// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in sync and async readers.
if &bytes[4..] != b"ARROW1" {
Err(to_compute_err("invalid ipc footer magic"))?
polars_bail!(oos = OutOfSpecKind::InvalidFooter);
}

Ok(footer_size)
Expand Down
73 changes: 25 additions & 48 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl LogicalPlanBuilder {

let path = path.into();

let (schema, reader_schema, num_rows, metadata) = if is_cloud_url(&path) {
let metadata = if is_cloud_url(&path) {
#[cfg(not(feature = "cloud"))]
panic!(
"One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled."
Expand All @@ -253,60 +253,37 @@ impl LogicalPlanBuilder {
{
let uri = path.to_string_lossy();
get_runtime().block_on(async {
let reader =
polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref())
.await?;
let metadata = reader.metadata().await?;
let reader_schema = Arc::clone(&metadata.schema);

let schema = prepare_schema((&reader_schema).into(), row_index.as_ref());
PolarsResult::Ok((schema, reader_schema, None, metadata))
polars_io::ipc::IpcReaderAsync::from_uri(&uri, cloud_options.as_ref())
.await?
.metadata()
.await
})?
}
} else {
// NOTE: We do not really need IpcReader and all of it's memoization
// to read the metadata...
let mut file = polars_utils::open_file(&path)?;

let metadata = arrow::io::ipc::read::read_file_metadata(&mut file)?;
let reader_schema = Arc::clone(&metadata.schema);
let mut schema: Schema = (&reader_schema).into();
if let Some(rc) = &row_index {
let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE);
}

pub fn _num_rows(metadata: &arrow::io::ipc::read::FileMetadata) -> PolarsResult<usize> {
let n_cols = metadata.schema.fields.len();
// this magic number 10 is computed from the yellow trip dataset
Ok((metadata.size as usize) / n_cols / 10)
}

let num_rows = _num_rows(&metadata)?;

let schema = prepare_schema((&reader_schema).into(), row_index.as_ref());
(schema, reader_schema, Some(num_rows), metadata)
arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new(
polars_utils::open_file(&path)?,
))?
};

let file_info = FileInfo::new(
schema,
Some(reader_schema),
(num_rows, num_rows.unwrap_or_default()),
);

let file_options = FileScanOptions {
with_columns: None,
cache,
n_rows,
rechunk,
row_index,
file_counter: Default::default(),
// TODO! add
hive_partitioning: false,
};
Ok(LogicalPlan::Scan {
paths: Arc::new([path]),
file_info,
file_options,
file_info: FileInfo::new(
prepare_schema(metadata.schema.as_ref().into(), row_index.as_ref()),
Some(Arc::clone(&metadata.schema)),
(None, 0),
),
file_options: FileScanOptions {
with_columns: None,
cache,
n_rows,
rechunk,
row_index,
file_counter: Default::default(),
// TODO: According to
mickvangelderen marked this conversation as resolved.
Show resolved Hide resolved
// https://github.com/pola-rs/polars/pull/14984#discussion_r1521226321
// we need to re-design hive partitions.
hive_partitioning: false,
},
mickvangelderen marked this conversation as resolved.
Show resolved Hide resolved
predicate: None,
scan_type: FileScan::Ipc {
options,
Expand Down
Loading