Skip to content

Commit

Permalink
refactor(rust): Add parquet source node to new streaming engine (#18152)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Aug 23, 2024
1 parent 41d3048 commit ffb66aa
Show file tree
Hide file tree
Showing 29 changed files with 2,640 additions and 154 deletions.
297 changes: 193 additions & 104 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/polars-core/src/datatypes/_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! We could use [serde_1712](https://github.com/serde-rs/serde/issues/1712), but that gave problems caused by
//! [rust_96956](https://github.com/rust-lang/rust/issues/96956), so we make a dummy type without static

#[cfg(feature = "dtype-categorical")]
use serde::de::SeqAccess;
use serde::{Deserialize, Serialize};

Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/serde/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::fmt::Formatter;

use serde::de::{Error as DeError, MapAccess, Visitor};
#[cfg(feature = "object")]
use serde::ser::Error as SerError;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

Expand Down
16 changes: 16 additions & 0 deletions crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,22 @@ pub fn coalesce_nulls_series(a: &Series, b: &Series) -> (Series, Series) {
}
}

pub fn operation_exceeded_idxsize_msg(operation: &str) -> String {
if core::mem::size_of::<IdxSize>() == core::mem::size_of::<u32>() {
format!(
"{} exceeded the maximum supported limit of {} rows. Consider installing 'polars-u64-idx'.",
operation,
IdxSize::MAX,
)
} else {
format!(
"{} exceeded the maximum supported limit of {} rows.",
operation,
IdxSize::MAX,
)
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl PolarsError {
}
}

fn wrap_msg<F: FnOnce(&str) -> String>(&self, func: F) -> Self {
pub fn wrap_msg<F: FnOnce(&str) -> String>(&self, func: F) -> Self {
use PolarsError::*;
match self {
ColumnNotFound(msg) => ColumnNotFound(func(msg).into()),
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/src/cloud/polars_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::pl_async::{
/// concurrent requests for the entire application.
#[derive(Debug, Clone)]
pub struct PolarsObjectStore(Arc<dyn ObjectStore>);
pub type ObjectStorePath = object_store::path::Path;

impl PolarsObjectStore {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn _mmap_single_column<'a>(

// similar to arrow2 serializer, except this accepts a slice instead of a vec.
// this allows us to memory map
pub(super) fn to_deserializer(
pub fn to_deserializer(
columns: Vec<(&ColumnChunkMetaData, MemSlice)>,
field: Field,
filter: Option<Filter>,
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ use polars_error::{ErrString, PolarsError};
pub use reader::ParquetAsyncReader;
pub use reader::{BatchedParquetReader, ParquetReader};
pub use utils::materialize_empty_df;

pub mod _internal {
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
}
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) fn collect_statistics(
)))
}

pub(super) fn read_this_row_group(
pub fn read_this_row_group(
predicate: Option<&dyn PhysicalIoExpr>,
md: &RowGroupMetaData,
schema: &ArrowSchemaRef,
Expand Down
176 changes: 176 additions & 0 deletions crates/polars-io/src/utils/byte_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::ops::Range;
use std::sync::Arc;

use polars_error::{to_compute_err, PolarsResult};
use polars_utils::_limit_path_len_io_err;
use polars_utils::mmap::MemSlice;

use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, ObjectStorePath,
PolarsObjectStore,
};

#[allow(async_fn_in_trait)]
pub trait ByteSource: Send + Sync {
async fn get_size(&self) -> PolarsResult<usize>;
/// # Panics
/// Panics if `range` is not in bounds.
async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice>;
async fn get_ranges(&self, ranges: &[Range<usize>]) -> PolarsResult<Vec<MemSlice>>;
}

/// Byte source backed by a `MemSlice`, which can potentially be memory-mapped.
pub struct MemSliceByteSource(pub MemSlice);

impl MemSliceByteSource {
async fn try_new_mmap_from_path(
path: &str,
_cloud_options: Option<&CloudOptions>,
) -> PolarsResult<Self> {
let file = Arc::new(
tokio::fs::File::open(path)
.await
.map_err(|err| _limit_path_len_io_err(path.as_ref(), err))?
.into_std()
.await,
);
let mmap = Arc::new(unsafe { memmap::Mmap::map(file.as_ref()) }.map_err(to_compute_err)?);

Ok(Self(MemSlice::from_mmap(mmap)))
}
}

impl ByteSource for MemSliceByteSource {
async fn get_size(&self) -> PolarsResult<usize> {
Ok(self.0.as_ref().len())
}

async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
let out = self.0.slice(range);
Ok(out)
}

async fn get_ranges(&self, ranges: &[Range<usize>]) -> PolarsResult<Vec<MemSlice>> {
Ok(ranges
.iter()
.map(|x| self.0.slice(x.clone()))
.collect::<Vec<_>>())
}
}

pub struct ObjectStoreByteSource {
store: PolarsObjectStore,
path: ObjectStorePath,
}

impl ObjectStoreByteSource {
async fn try_new_from_path(
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<Self> {
let (CloudLocation { prefix, .. }, store) =
build_object_store(path, cloud_options, false).await?;
let path = object_path_from_str(&prefix)?;
let store = PolarsObjectStore::new(store);

Ok(Self { store, path })
}
}

impl ByteSource for ObjectStoreByteSource {
async fn get_size(&self) -> PolarsResult<usize> {
Ok(self.store.head(&self.path).await?.size)
}

async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
let bytes = self.store.get_range(&self.path, range).await?;
let mem_slice = MemSlice::from_bytes(bytes);

Ok(mem_slice)
}

async fn get_ranges(&self, ranges: &[Range<usize>]) -> PolarsResult<Vec<MemSlice>> {
let ranges = self.store.get_ranges(&self.path, ranges).await?;
Ok(ranges.into_iter().map(MemSlice::from_bytes).collect())
}
}

/// Dynamic dispatch to async functions.
pub enum DynByteSource {
MemSlice(MemSliceByteSource),
Cloud(ObjectStoreByteSource),
}

impl DynByteSource {
pub fn variant_name(&self) -> &str {
match self {
Self::MemSlice(_) => "MemSlice",
Self::Cloud(_) => "Cloud",
}
}
}

impl Default for DynByteSource {
fn default() -> Self {
Self::MemSlice(MemSliceByteSource(MemSlice::default()))
}
}

impl ByteSource for DynByteSource {
async fn get_size(&self) -> PolarsResult<usize> {
match self {
Self::MemSlice(v) => v.get_size().await,
Self::Cloud(v) => v.get_size().await,
}
}

async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
match self {
Self::MemSlice(v) => v.get_range(range).await,
Self::Cloud(v) => v.get_range(range).await,
}
}

async fn get_ranges(&self, ranges: &[Range<usize>]) -> PolarsResult<Vec<MemSlice>> {
match self {
Self::MemSlice(v) => v.get_ranges(ranges).await,
Self::Cloud(v) => v.get_ranges(ranges).await,
}
}
}

impl From<MemSliceByteSource> for DynByteSource {
fn from(value: MemSliceByteSource) -> Self {
Self::MemSlice(value)
}
}

impl From<ObjectStoreByteSource> for DynByteSource {
fn from(value: ObjectStoreByteSource) -> Self {
Self::Cloud(value)
}
}

#[derive(Clone, Debug)]
pub enum DynByteSourceBuilder {
Mmap,
/// Supports both cloud and local files.
ObjectStore,
}

impl DynByteSourceBuilder {
pub async fn try_build_from_path(
&self,
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<DynByteSource> {
Ok(match self {
Self::Mmap => MemSliceByteSource::try_new_mmap_from_path(path, cloud_options)
.await?
.into(),
Self::ObjectStore => ObjectStoreByteSource::try_new_from_path(path, cloud_options)
.await?
.into(),
})
}
}
2 changes: 2 additions & 0 deletions crates/polars-io/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ mod other;

pub use compression::is_compressed;
pub use other::*;
#[cfg(feature = "cloud")]
pub mod byte_source;
pub mod slice;

pub const URL_ENCODE_CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
Expand Down
71 changes: 48 additions & 23 deletions crates/polars-io/src/utils/slice.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,58 @@
/// Given a `slice` that is relative to the start of a list of files, calculate the slice to apply
/// at a file with a row offset of `current_row_offset`.
pub fn split_slice_at_file(
current_row_offset: &mut usize,
current_row_offset_ref: &mut usize,
n_rows_this_file: usize,
global_slice_start: usize,
global_slice_end: usize,
) -> (usize, usize) {
let next_file_offset = *current_row_offset + n_rows_this_file;
// e.g.
// slice: (start: 1, end: 2)
// files:
// 0: (1 row): current_offset: 0, next_file_offset: 1
// 1: (1 row): current_offset: 1, next_file_offset: 2
// 2: (1 row): current_offset: 2, next_file_offset: 3
// in this example we want to include only file 1.
let has_overlap_with_slice =
*current_row_offset < global_slice_end && next_file_offset > global_slice_start;
let current_row_offset = *current_row_offset_ref;
*current_row_offset_ref += n_rows_this_file;
match SplitSlicePosition::split_slice_at_file(
current_row_offset,
n_rows_this_file,
global_slice_start..global_slice_end,
) {
SplitSlicePosition::Overlapping(offset, len) => (offset, len),
SplitSlicePosition::Before | SplitSlicePosition::After => (0, 0),
}
}

#[derive(Debug)]
pub enum SplitSlicePosition {
Before,
Overlapping(usize, usize),
After,
}

impl SplitSlicePosition {
pub fn split_slice_at_file(
current_row_offset: usize,
n_rows_this_file: usize,
global_slice: std::ops::Range<usize>,
) -> Self {
// e.g.
// slice: (start: 1, end: 2)
// files:
// 0: (1 row): current_offset: 0, next_file_offset: 1
// 1: (1 row): current_offset: 1, next_file_offset: 2
// 2: (1 row): current_offset: 2, next_file_offset: 3
// in this example we want to include only file 1.

let next_row_offset = current_row_offset + n_rows_this_file;

let (rel_start, slice_len) = if !has_overlap_with_slice {
(0, 0)
} else {
let n_rows_to_skip = global_slice_start.saturating_sub(*current_row_offset);
let n_excess_rows = next_file_offset.saturating_sub(global_slice_end);
(
n_rows_to_skip,
n_rows_this_file - n_rows_to_skip - n_excess_rows,
)
};
if next_row_offset <= global_slice.start {
Self::Before
} else if current_row_offset >= global_slice.end {
Self::After
} else {
let n_rows_to_skip = global_slice.start.saturating_sub(current_row_offset);
let n_excess_rows = next_row_offset.saturating_sub(global_slice.end);

*current_row_offset = next_file_offset;
(rel_start, slice_len)
Self::Overlapping(
n_rows_to_skip,
n_rows_this_file - n_rows_to_skip - n_excess_rows,
)
}
}
}
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn prepare_schemas(mut schema: Schema, row_index: Option<&RowIndex>) -> (SchemaR
pub(super) fn parquet_file_info(
paths: &[PathBuf],
file_options: &FileScanOptions,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
#[allow(unused)] cloud_options: Option<&polars_io::cloud::CloudOptions>,
) -> PolarsResult<(FileInfo, Option<FileMetaDataRef>)> {
let path = get_first_path(paths)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/functions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub fn count_rows(paths: &Arc<Vec<PathBuf>>, scan_type: &FileScan) -> PolarsResu
#[cfg(feature = "parquet")]
pub(super) fn count_rows_parquet(
paths: &Arc<Vec<PathBuf>>,
cloud_options: Option<&CloudOptions>,
#[allow(unused)] cloud_options: Option<&CloudOptions>,
) -> PolarsResult<usize> {
if paths.is_empty() {
return Ok(0);
Expand Down
9 changes: 6 additions & 3 deletions crates/polars-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ description = "Private crate for the streaming execution engine for the Polars D
atomic-waker = { workspace = true }
crossbeam-deque = { workspace = true }
crossbeam-utils = { workspace = true }
futures = { workspace = true }
memmap = { workspace = true }
parking_lot = { workspace = true }
pin-project-lite = { workspace = true }
polars-io = { workspace = true, features = ["async"] }
polars-io = { workspace = true, features = ["async", "cloud", "aws"] }
polars-utils = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
Expand All @@ -25,8 +27,9 @@ tokio = { workspace = true }
polars-core = { workspace = true }
polars-error = { workspace = true }
polars-expr = { workspace = true }
polars-mem-engine = { workspace = true }
polars-plan = { workspace = true }
polars-mem-engine = { workspace = true, features = ["parquet"] }
polars-parquet = { workspace = true }
polars-plan = { workspace = true, features = ["parquet"] }

[build-dependencies]
version_check = { workspace = true }
Expand Down
Loading

0 comments on commit ffb66aa

Please sign in to comment.