Skip to content

Commit

Permalink
perf: Push down slice with non-zero offset to Parquet (#17972)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Aug 1, 2024
1 parent ed0f313 commit a3db898
Show file tree
Hide file tree
Showing 27 changed files with 529 additions and 278 deletions.
19 changes: 7 additions & 12 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use tokio::sync::Mutex;

use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use super::read_impl::compute_row_group_range;
use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
Expand Down Expand Up @@ -262,8 +261,8 @@ impl FetchRowGroupsFromObjectStore {
schema: ArrowSchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_group_range: Range<usize>,
row_groups: &[RowGroupMetaData],
limit: usize,
) -> PolarsResult<Self> {
let projected_fields: Option<Arc<[SmartString]>> = projection.map(|projection| {
projection
Expand All @@ -272,26 +271,22 @@ impl FetchRowGroupsFromObjectStore {
.collect()
});

let row_groups_end = compute_row_group_range(0, row_groups.len(), limit, row_groups);
let row_groups = &row_groups[0..row_groups_end];

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_groups
.iter()
.enumerate()
.filter(|(i, rg)| {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];
let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(*i, Default::default());
prefetched.insert(i, Default::default());
}
should_be_read

should_be_read.then(|| (i, rg.clone()))
})
.map(|(i, rg)| (i, rg.clone()))
.collect::<Vec<_>>()
} else {
row_groups.iter().cloned().enumerate().collect()
Expand Down
Loading

0 comments on commit a3db898

Please sign in to comment.