Skip to content

Commit

Permalink
feat(rust): Implement IpcReaderAsync (#14984)
Browse files Browse the repository at this point in the history
  • Loading branch information
mickvangelderen authored Mar 13, 2024
1 parent 37d14e7 commit bbaf341
Show file tree
Hide file tree
Showing 16 changed files with 560 additions and 56 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ fn deserialize_footer_blocks(
Ok((footer, blocks))
}

pub(super) fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
pub fn deserialize_footer(footer_data: &[u8], size: u64) -> PolarsResult<FileMetadata> {
let (footer, blocks) = deserialize_footer_blocks(footer_data)?;

let ipc_schema = footer
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-arrow/src/io/ipc/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ pub mod file_async;
pub(crate) use common::first_dict_field;
#[cfg(feature = "io_flight")]
pub(crate) use common::{read_dictionary, read_record_batch};
pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata};
pub use file::{
deserialize_footer, read_batch, read_file_dictionaries, read_file_metadata, FileMetadata,
};
use polars_utils::aliases::PlHashMap;
pub use reader::FileReader;
pub use schema::deserialize_schema;
Expand Down
20 changes: 6 additions & 14 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,6 @@ fn check_mmap_err(err: PolarsError) -> PolarsResult<()> {
}

impl<R: MmapBytesReader> IpcReader<R> {
#[doc(hidden)]
/// A very bad estimate of the number of rows
/// This estimation will be entirely off if the file is compressed.
/// And will be varying off depending on the data types.
pub fn _num_rows(&mut self) -> PolarsResult<usize> {
let metadata = self.get_metadata()?;
let n_cols = metadata.schema.fields.len();
// this magic number 10 is computed from the yellow trip dataset
Ok((metadata.size as usize) / n_cols / 10)
}
fn get_metadata(&mut self) -> PolarsResult<&read::FileMetadata> {
if self.metadata.is_none() {
let metadata = read::read_file_metadata(&mut self.reader)?;
Expand Down Expand Up @@ -164,10 +154,12 @@ impl<R: MmapBytesReader> IpcReader<R> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

debug_assert!(
self.columns.is_none(),
"column names must have already been converted into indices",
);
// NOTE: For some code paths this already happened. See
// https://github.com/pola-rs/polars/pull/14984#discussion_r1520125000
// where this was introduced.
if let Some(columns) = &self.columns {
self.projection = Some(columns_to_projection(columns, &metadata.schema)?);
}

let schema = if let Some(projection) = &self.projection {
Arc::new(apply_projection(&metadata.schema, projection))
Expand Down
260 changes: 260 additions & 0 deletions crates/polars-io/src/ipc/ipc_reader_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use std::ops::Range;
use std::sync::Arc;

use arrow::io::ipc::read::{get_row_count, FileMetadata, OutOfSpecKind};
use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_core::datatypes::IDX_DTYPE;
use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult};

use crate::cloud::{build_object_store, CloudLocation, CloudOptions};
use crate::pl_async::{
tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};
use crate::predicates::PhysicalIoExpr;
use crate::prelude::{materialize_projection, IpcReader};
use crate::RowIndex;

/// Polars specific wrapper for `Arc<dyn ObjectStore>` that limits the number of
/// concurrent requests for the entire application.
#[derive(Debug, Clone)]
pub struct PolarsObjectStore(Arc<dyn ObjectStore>);

impl PolarsObjectStore {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self(store)
}

pub async fn get(&self, path: &Path) -> PolarsResult<Bytes> {
tune_with_concurrency_budget(1, || async {
self.0
.get(path)
.await
.map_err(to_compute_err)?
.bytes()
.await
.map_err(to_compute_err)
})
.await
}

pub async fn get_range(&self, path: &Path, range: Range<usize>) -> PolarsResult<Bytes> {
tune_with_concurrency_budget(1, || self.0.get_range(path, range))
.await
.map_err(to_compute_err)
}

pub async fn get_ranges(
&self,
path: &Path,
ranges: &[Range<usize>],
) -> PolarsResult<Vec<Bytes>> {
tune_with_concurrency_budget(
(ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32),
|| self.0.get_ranges(path, ranges),
)
.await
.map_err(to_compute_err)
}

/// Fetch the metadata of the parquet file, do not memoize it.
pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
with_concurrency_budget(1, || self.0.head(path))
.await
.map_err(to_compute_err)
}
}

/// An Arrow IPC reader implemented on top of PolarsObjectStore.
pub struct IpcReaderAsync {
store: PolarsObjectStore,
path: Path,
}

#[derive(Default, Clone)]
pub struct IpcReadOptions {
// Names of the columns to include in the output.
projection: Option<Vec<String>>,

// The maximum number of rows to include in the output.
row_limit: Option<usize>,

// Include a column with the row number under the provided name starting at the provided index.
row_index: Option<RowIndex>,

// Only include rows that pass this predicate.
predicate: Option<Arc<dyn PhysicalIoExpr>>,
}

impl IpcReadOptions {
pub fn with_projection(mut self, indices: impl Into<Option<Vec<String>>>) -> Self {
self.projection = indices.into();
self
}

pub fn with_row_limit(mut self, row_limit: impl Into<Option<usize>>) -> Self {
self.row_limit = row_limit.into();
self
}

pub fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.row_index = row_index.into();
self
}

pub fn with_predicate(mut self, predicate: impl Into<Option<Arc<dyn PhysicalIoExpr>>>) -> Self {
self.predicate = predicate.into();
self
}
}

impl IpcReaderAsync {
pub async fn from_uri(
uri: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<IpcReaderAsync> {
let (
CloudLocation {
prefix, expansion, ..
},
store,
) = build_object_store(uri, cloud_options).await?;

let path = {
// Any wildcards should already have been resolved here. Without this assertion they would
// be ignored.
debug_assert!(expansion.is_none(), "path should not contain wildcards");
Path::from_url_path(prefix).map_err(to_compute_err)?
};

Ok(Self {
store: PolarsObjectStore::new(store),
path,
})
}

async fn object_metadata(&self) -> PolarsResult<ObjectMeta> {
self.store.head(&self.path).await
}

async fn file_size(&self) -> PolarsResult<usize> {
Ok(self.object_metadata().await?.size)
}

pub async fn metadata(&self) -> PolarsResult<FileMetadata> {
let file_size = self.file_size().await?;

// TODO: Do a larger request and hope that the entire footer is contained within it to save one round-trip.
let footer_metadata =
self.store
.get_range(
&self.path,
file_size.checked_sub(FOOTER_METADATA_SIZE).ok_or_else(|| {
to_compute_err("ipc file size is smaller than the minimum")
})?..file_size,
)
.await?;

let footer_size = deserialize_footer_metadata(
footer_metadata
.as_ref()
.try_into()
.map_err(to_compute_err)?,
)?;

let footer = self
.store
.get_range(
&self.path,
file_size
.checked_sub(FOOTER_METADATA_SIZE + footer_size)
.ok_or_else(|| {
to_compute_err("invalid ipc footer metadata: footer size too large")
})?..file_size,
)
.await?;

arrow::io::ipc::read::deserialize_footer(
footer.as_ref(),
footer_size.try_into().map_err(to_compute_err)?,
)
}

pub async fn data(
&self,
metadata: Option<&FileMetadata>,
options: IpcReadOptions,
verbose: bool,
) -> PolarsResult<DataFrame> {
// TODO: Only download what is needed rather than the entire file by
// making use of the projection, row limit, predicate and such.
let bytes = self.store.get(&self.path).await?;

let projection = match options.projection.as_deref() {
Some(projection) => {
fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> Schema {
if let Some(rc) = row_index {
let _ = schema.insert_at_index(0, rc.name.as_str().into(), IDX_DTYPE);
}
schema
}

// Retrieve the metadata for the schema so we can map column names to indices.
let fetched_metadata;
let metadata = if let Some(metadata) = metadata {
metadata
} else {
// This branch is happens when _metadata is None, which can happen if we Deserialize the execution plan.
fetched_metadata = self.metadata().await?;
&fetched_metadata
};

let schema = prepare_schema((&metadata.schema).into(), options.row_index.as_ref());

let hive_partitions = None;

materialize_projection(
Some(projection),
&schema,
hive_partitions,
options.row_index.is_some(),
)
},
None => None,
};

let reader =
<IpcReader<_> as crate::SerReader<_>>::new(std::io::Cursor::new(bytes.as_ref()))
.with_row_index(options.row_index)
.with_n_rows(options.row_limit)
.with_projection(projection);
reader.finish_with_scan_ops(options.predicate, verbose)
}

pub async fn count_rows(&self, _metadata: Option<&FileMetadata>) -> PolarsResult<i64> {
// TODO: Only download what is needed rather than the entire file by
// making use of the projection, row limit, predicate and such.
let bytes = self.store.get(&self.path).await?;
get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
}
}

const FOOTER_METADATA_SIZE: usize = 10;

// TODO: Move to polars-arrow and deduplicate parsing of footer metadata in
// sync and async readers.
fn deserialize_footer_metadata(bytes: [u8; FOOTER_METADATA_SIZE]) -> PolarsResult<usize> {
let footer_size: usize =
i32::from_le_bytes(bytes[0..4].try_into().unwrap_or_else(|_| unreachable!()))
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;

if &bytes[4..] != b"ARROW1" {
polars_bail!(oos = OutOfSpecKind::InvalidFooter);
}

Ok(footer_size)
}
5 changes: 5 additions & 0 deletions crates/polars-io/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ pub use ipc_file::IpcReader;
#[cfg(feature = "ipc_streaming")]
pub use ipc_stream::*;
pub use write::{BatchedWriter, IpcCompression, IpcWriter, IpcWriterOption};

#[cfg(feature = "cloud")]
mod ipc_reader_async;
#[cfg(feature = "cloud")]
pub use ipc_reader_async::*;
Loading

0 comments on commit bbaf341

Please sign in to comment.