From 8099aef101ac2a636b38d68db80927248860eb8e Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 11 Nov 2024 19:34:50 +1100 Subject: [PATCH 1/3] c --- .../src/cloud/polars_object_store.rs | 465 ++++++++++++++++-- .../polars-io/src/file_cache/file_fetcher.rs | 9 +- .../polars-io/src/parquet/read/async_impl.rs | 63 +-- crates/polars-io/src/pl_async.rs | 25 +- crates/polars-io/src/utils/byte_source.rs | 29 +- .../parquet_source/row_group_data_fetch.rs | 46 +- crates/polars-utils/src/mmap.rs | 6 + 7 files changed, 506 insertions(+), 137 deletions(-) diff --git a/crates/polars-io/src/cloud/polars_object_store.rs b/crates/polars-io/src/cloud/polars_object_store.rs index 9738e0cbdbe4..70b522e7ec0c 100644 --- a/crates/polars-io/src/cloud/polars_object_store.rs +++ b/crates/polars-io/src/cloud/polars_object_store.rs @@ -2,14 +2,16 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; -use polars_error::{to_compute_err, PolarsResult}; +use polars_core::prelude::{InitHashMaps, PlHashMap}; +use polars_error::{to_compute_err, PolarsError, PolarsResult}; use tokio::io::AsyncWriteExt; use crate::pl_async::{ - self, tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST, + self, get_concurrency_limit, get_download_chunk_size, tune_with_concurrency_budget, + with_concurrency_budget, MAX_BUDGET_PER_REQUEST, }; /// Polars specific wrapper for `Arc` that limits the number of @@ -23,63 +25,197 @@ impl PolarsObjectStore { Self(store) } - pub async fn get(&self, path: &Path) -> PolarsResult { - tune_with_concurrency_budget(1, || async { - self.0 - .get(path) - .await - .map_err(to_compute_err)? - .bytes() - .await - .map_err(to_compute_err) - }) - .await + /// Returns a buffered stream that downloads concurrently up to the concurrency limit. + fn get_buffered_ranges_stream<'a, T: Iterator>>( + &'a self, + path: &'a Path, + ranges: T, + ) -> impl StreamExt> + + TryStreamExt> + + use<'a, T> { + futures::stream::iter( + ranges + .map(|range| async { self.0.get_range(path, range).await.map_err(to_compute_err) }), + ) + // Add a limit locally as this gets run inside a single `tune_with_concurrency_budget`. + .buffered(get_concurrency_limit() as usize) } pub async fn get_range(&self, path: &Path, range: Range) -> PolarsResult { - tune_with_concurrency_budget(1, || self.0.get_range(path, range)) - .await - .map_err(to_compute_err) + let parts = split_range(range.clone()); + + if parts.len() == 1 { + tune_with_concurrency_budget(1, || self.0.get_range(path, range)) + .await + .map_err(to_compute_err) + } else { + dbg!(&range); + dbg!(parts.len()); + dbg!(parts.clone().take(10).collect::>()); + + let parts = tune_with_concurrency_budget( + parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, + || { + self.get_buffered_ranges_stream(path, parts) + .try_collect::>() + }, + ) + .await?; + + let mut combined = Vec::with_capacity(range.len()); + + for part in parts { + combined.extend_from_slice(&part) + } + + assert_eq!(combined.len(), range.len()); + + PolarsResult::Ok(Bytes::from(combined)) + } } - pub async fn get_ranges( + /// Fetch byte ranges into a HashMap keyed by the range start. This will mutably sort the + /// `ranges` slice for coalescing. + /// + /// # Panics + /// Panics if the same range start is used by more than 1 range. + pub async fn get_ranges_sort< + K: TryFrom + std::hash::Hash + Eq, + T: From, + >( &self, path: &Path, - ranges: &[Range], - ) -> PolarsResult> { + ranges: &mut [Range], + ) -> PolarsResult> { + if ranges.is_empty() { + return Ok(Default::default()); + } + + let mut out = PlHashMap::with_capacity(ranges.len()); + + ranges.sort_unstable_by_key(|x| x.start); + + let (merged_ranges, merged_ends): (Vec<_>, Vec<_>) = merge_ranges(ranges).unzip(); + + dbg!(ranges.len()); + dbg!(ranges.iter().take(10).collect::>()); + dbg!(merged_ranges.len()); + dbg!(merged_ranges + .iter() + .zip(merged_ends.iter()) + .take(10) + .collect::>()); + + let mut stream = self.get_buffered_ranges_stream(path, merged_ranges.iter().cloned()); + tune_with_concurrency_budget( - (ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32), - || self.0.get_ranges(path, ranges), + merged_ranges.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, + || async { + let mut len = 0; + let mut current_offset = 0; + let mut ends_iter = merged_ends.iter(); + + let mut splitted_parts = vec![]; + + while let Some(bytes) = stream.try_next().await? { + len += bytes.len(); + let end = *ends_iter.next().unwrap(); + + if end == 0 { + splitted_parts.push(bytes); + continue; + } + + let full_range = ranges[current_offset..end] + .iter() + .cloned() + .reduce(|l, r| l.start.min(r.start)..l.end.max(r.end)) + .unwrap(); + + let bytes = if splitted_parts.is_empty() { + bytes + } else { + let mut out = Vec::with_capacity(full_range.len()); + + for x in splitted_parts.drain(..) { + out.extend_from_slice(&x); + } + + out.extend_from_slice(&bytes); + Bytes::from(out) + }; + + assert_eq!(bytes.len(), full_range.len()); + + for range in &ranges[current_offset..end] { + let v = out.insert( + K::try_from(range.start).unwrap(), + T::from(bytes.slice( + range.start - full_range.start..range.end - full_range.start, + )), + ); + + assert!(v.is_none()); // duplicate range start + } + + current_offset = end; + } + + assert!(splitted_parts.is_empty()); + + PolarsResult::Ok(pl_async::Size::from(len as u64)) + }, ) - .await - .map_err(to_compute_err) + .await?; + + Ok(out) } - pub async fn download( - &self, - path: &Path, - file: &mut F, - ) -> PolarsResult<()> { - tune_with_concurrency_budget(1, || async { - let mut stream = self - .0 - .get(path) - .await - .map_err(to_compute_err)? - .into_stream(); - - let mut len = 0; - while let Some(bytes) = stream.next().await { - let bytes = bytes.map_err(to_compute_err)?; - len += bytes.len(); - file.write_all(bytes.as_ref()) + pub async fn download(&self, path: &Path, file: &mut tokio::fs::File) -> PolarsResult<()> { + let opt_size = self.head(path).await.ok().map(|x| x.size); + let parts = opt_size.map(|x| split_range(0..x)).filter(|x| x.len() > 1); + + if let Some(parts) = parts { + tune_with_concurrency_budget( + parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, + || async { + let mut stream = self.get_buffered_ranges_stream(path, parts); + let mut len = 0; + while let Some(bytes) = stream.try_next().await? { + len += bytes.len(); + file.write_all(&bytes).await.map_err(to_compute_err)?; + } + + assert_eq!(len, opt_size.unwrap()); + + PolarsResult::Ok(pl_async::Size::from(len as u64)) + }, + ) + .await? + } else { + tune_with_concurrency_budget(1, || async { + let mut stream = self + .0 + .get(path) .await - .map_err(to_compute_err)?; - } + .map_err(to_compute_err)? + .into_stream(); + + let mut len = 0; + while let Some(bytes) = stream.try_next().await? { + len += bytes.len(); + file.write_all(&bytes).await.map_err(to_compute_err)?; + } + + PolarsResult::Ok(pl_async::Size::from(len as u64)) + }) + .await? + }; + + // Dropping is delayed for tokio async files so we need to explicitly + // flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451). + file.sync_all().await.map_err(PolarsError::from)?; - PolarsResult::Ok(pl_async::Size::from(len as u64)) - }) - .await?; Ok(()) } @@ -113,3 +249,238 @@ impl PolarsObjectStore { .map_err(to_compute_err) } } + +/// Splits a single range into multiple smaller ranges, which can be downloaded concurrently for +/// much higher throughput. +fn split_range(range: Range) -> impl ExactSizeIterator> + Clone { + let chunk_size = get_download_chunk_size(); + + // Calculate n_parts such that we are as close as possible to the `chunk_size`. + let n_parts = [ + (range.len().div_ceil(chunk_size)).max(1), + (range.len() / chunk_size).max(1), + ] + .into_iter() + .min_by_key(|x| (range.len() / *x).abs_diff(chunk_size)) + .unwrap(); + + let chunk_size = (range.len() / n_parts).max(1); + + assert_eq!(n_parts, (range.len() / chunk_size).max(1)); + let bytes_rem = range.len() % chunk_size; + + (0..n_parts).map(move |part_no| { + let (start, end) = if part_no == 0 { + // Download remainder length in the first chunk since it starts downloading first. + let end = range.start + chunk_size + bytes_rem; + let end = if end > range.end { range.end } else { end }; + (range.start, end) + } else { + let start = bytes_rem + range.start + part_no * chunk_size; + (start, start + chunk_size) + }; + + start..end + }) +} + +/// Note: For optimal performance, `ranges` should be sorted. More generally, +/// ranges placed next to each other should also be close in range value. +/// +/// # Returns +/// `[(range1, end1), (range2, end2)]`, where: +/// * `range1` contains bytes for the ranges from `ranges[0..end1]` +/// * `range2` contains bytes for the ranges from `ranges[end1..end2]` +/// * etc.. +/// +/// Note that if an end value is 0, it means the range is a splitted part and should be combined. +fn merge_ranges(ranges: &[Range]) -> impl Iterator, usize)> + '_ { + let chunk_size = get_download_chunk_size(); + + let mut current_merged_range = ranges.first().map_or(0..0, Clone::clone); + // Number of fetched bytes excluding excess. + let mut current_n_bytes = 0; + + (0..ranges.len()) + .filter_map(move |current_idx| { + let current_idx = 1 + current_idx; + + if current_idx == ranges.len() { + // No more items - flush current state. + Some((current_merged_range.clone(), current_idx)) + } else { + let range = ranges[current_idx].clone(); + + let new_merged = current_merged_range.start.min(range.start) + ..current_merged_range.end.max(range.end); + + // E.g.: + // |--------| + // oo // range1 + // oo // range2 + // ^^^ // distance = 3, is_overlapping = false + // E.g.: + // |--------| + // ooooo // range1 + // ooooo // range2 + // ^^ // distance = 2, is_overlapping = true + let (distance, is_overlapping) = { + let l = current_merged_range.end.min(range.end); + let r = current_merged_range.start.max(range.start); + + (r.abs_diff(l), r < l) + }; + + #[rustfmt::skip] + let should_merge = + is_overlapping // Always merge if overlapping + || ( + // Don't merge if the result size is not closer to the `chunk_size` + new_merged.len().abs_diff(chunk_size) < current_merged_range.len().abs_diff(chunk_size) + && ( + // Either the gap is less than 1MiB.. + distance <= 1024 * 1024 + || ( + // ..or, the gap is less than 12.5% of the largest between `current_n_bytes` + // and the new `range`, capped at 8MiB. + distance <= current_n_bytes.max(range.len()) / 8 + && distance <= 8 * 1024 * 1024 + ) + ) + ); + + if should_merge { + // Merge to existing range + current_merged_range = new_merged; + current_n_bytes += if is_overlapping { + range.len() - distance + } else { + range.len() + }; + None + } else { + let v = current_merged_range.clone(); + current_merged_range = range; + current_n_bytes = 0; + Some((v, current_idx)) + } + } + }) + .flat_map(|x| { + // Split large individual ranges within the list of ranges. + let (range, end) = x; + let split = split_range(range.clone()); + let len = split.len(); + + split + .enumerate() + .map(move |(i, range)| (range, if 1 + i == len { end } else { 0 })) + }) +} + +#[cfg(test)] +mod tests { + + #[test] + fn test_split_range() { + use super::{get_download_chunk_size, split_range}; + + let chunk_size = get_download_chunk_size(); + + assert_eq!(chunk_size, 64 * 1024 * 1024); + + #[allow(clippy::single_range_in_vec_init)] + { + // Round-trip empty ranges. + assert_eq!(split_range(0..0).collect::>(), [0..0]); + assert_eq!(split_range(3..3).collect::>(), [3..3]); + } + + // Threshold to start splitting to 2 ranges + // + // n - chunk_size == chunk_size - n / 2 + // n + n / 2 == 2 * chunk_size + // 3 * n == 4 * chunk_size + // n = 4 * chunk_size / 3 + let n = 4 * chunk_size / 3; + + #[allow(clippy::single_range_in_vec_init)] + { + assert_eq!(split_range(0..n).collect::>(), [0..89478485]); + } + + assert_eq!( + split_range(0..n + 1).collect::>(), + [0..44739243, 44739243..89478486] + ); + + // Threshold to start splitting to 3 ranges + // + // n / 2 - chunk_size == chunk_size - n / 3 + // n / 2 + n / 3 == 2 * chunk_size + // 5 * n == 12 * chunk_size + // n == 12 * chunk_size / 5 + let n = 12 * chunk_size / 5; + + assert_eq!( + split_range(0..n).collect::>(), + [0..80530637, 80530637..161061273] + ); + + assert_eq!( + split_range(0..n + 1).collect::>(), + [0..53687092, 53687092..107374183, 107374183..161061274] + ); + } + + #[test] + fn test_merge_ranges() { + use super::{get_download_chunk_size, merge_ranges}; + + let chunk_size = get_download_chunk_size(); + + assert_eq!(chunk_size, 64 * 1024 * 1024); + + // Round-trip empty slice + assert_eq!(merge_ranges(&[]).collect::>(), []); + + // We have 1 tiny request followed by 1 huge request. They are combined as it reduces the + // `abs_diff()` to the `chunk_size`, but afterwards they are split to 2 evenly sized + // requests. + assert_eq!( + merge_ranges(&[0..1, 1..127 * 1024 * 1024]).collect::>(), + [(0..66584576, 0), (66584576..133169152, 2)] + ); + + // <= 1MiB gap, merge + assert_eq!( + merge_ranges(&[0..1, 1024 * 1024 + 1..1024 * 1024 + 2]).collect::>(), + [(0..1048578, 2)] + ); + + // > 1MiB gap, do not merge + assert_eq!( + merge_ranges(&[0..1, 1024 * 1024 + 2..1024 * 1024 + 3]).collect::>(), + [(0..1, 1), (1048578..1048579, 2)] + ); + + // <= 12.5% gap, merge + assert_eq!( + merge_ranges(&[0..8, 10..11]).collect::>(), + [(0..11, 2)] + ); + + // <= 12.5% gap relative to RHS, merge + assert_eq!( + merge_ranges(&[0..1, 3..11]).collect::>(), + [(0..11, 2)] + ); + + // Overlapping range, merge + assert_eq!( + merge_ranges(&[0..80 * 1024 * 1024, 10 * 1024 * 1024..70 * 1024 * 1024]) + .collect::>(), + [(0..80 * 1024 * 1024, 2)] + ); + } +} diff --git a/crates/polars-io/src/file_cache/file_fetcher.rs b/crates/polars-io/src/file_cache/file_fetcher.rs index bd16dff7fda4..3d712ba955fc 100644 --- a/crates/polars-io/src/file_cache/file_fetcher.rs +++ b/crates/polars-io/src/file_cache/file_fetcher.rs @@ -116,12 +116,7 @@ impl FileFetcher for CloudFileFetcher { .await .map_err(PolarsError::from)?; - self.object_store.download(&self.cloud_path, file).await?; - // Dropping is delayed for tokio async files so we need to explicitly - // flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451). - file.sync_all().await.map_err(PolarsError::from)?; - PolarsResult::Ok(()) - })?; - Ok(()) + self.object_store.download(&self.cloud_path, file).await + }) } } diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index da50364855da..053aad67464a 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -21,7 +21,7 @@ use crate::parquet::metadata::FileMetadataRef; use crate::pl_async::get_runtime; use crate::predicates::PhysicalIoExpr; -type DownloadedRowGroup = Vec<(u64, Bytes)>; +type DownloadedRowGroup = PlHashMap; type QueuePayload = (usize, DownloadedRowGroup); type QueueSend = Arc>>; @@ -49,14 +49,8 @@ impl ParquetObjectStore { }) } - async fn get_range(&self, start: usize, length: usize) -> PolarsResult { - self.store - .get_range(&self.path, start..start + length) - .await - } - - async fn get_ranges(&self, ranges: &[Range]) -> PolarsResult> { - self.store.get_ranges(&self.path, ranges).await + async fn get_ranges(&self, ranges: &mut [Range]) -> PolarsResult> { + self.store.get_ranges_sort(&self.path, ranges).await } /// Initialize the length property of the object, unless it has already been fetched. @@ -194,16 +188,10 @@ async fn download_projection( } }); - let result = async_reader.get_ranges(&ranges).await.map(|bytes| { - ( - rg_index, - bytes - .into_iter() - .zip(offsets) - .map(|(bytes, offset)| (offset, bytes)) - .collect::>(), - ) - }); + let result = async_reader + .get_ranges(&mut ranges) + .await + .map(|bytes_map| (rg_index, bytes_map)); sender.send(result).await.is_ok() } @@ -217,33 +205,20 @@ async fn download_row_group( return true; } - let full_byte_range = rg.full_byte_range(); - let full_byte_range = full_byte_range.start as usize..full_byte_range.end as usize; - - let result = async_reader - .get_range( - full_byte_range.start, - full_byte_range.end - full_byte_range.start, + let mut ranges = rg + .byte_ranges_iter() + .map(|x| x.start as usize..x.end as usize) + .collect::>(); + + sender + .send( + async_reader + .get_ranges(&mut ranges) + .await + .map(|bytes_map| (rg_index, bytes_map)), ) .await - .map(|bytes| { - ( - rg_index, - rg.byte_ranges_iter() - .map(|range| { - ( - range.start, - bytes.slice( - range.start as usize - full_byte_range.start - ..range.end as usize - full_byte_range.start, - ), - ) - }) - .collect::(), - ) - }); - - sender.send(result).await.is_ok() + .is_ok() } pub struct FetchRowGroupsFromObjectStore { diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index cc43a908cda3..4c95c96f7733 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -4,7 +4,7 @@ use std::ops::Deref; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; use once_cell::sync::Lazy; -use polars_core::config::verbose; +use polars_core::config::{self, verbose}; use polars_core::POOL; use tokio::runtime::{Builder, Runtime}; use tokio::sync::Semaphore; @@ -12,6 +12,25 @@ use tokio::sync::Semaphore; static CONCURRENCY_BUDGET: std::sync::OnceLock<(Semaphore, u32)> = std::sync::OnceLock::new(); pub(super) const MAX_BUDGET_PER_REQUEST: usize = 10; +/// Used to determine chunks when splitting large ranges, or combining small +/// ranges. +pub(super) static DOWNLOAD_CHUNK_SIZE: Lazy = Lazy::new(|| { + let v: usize = std::env::var("POLARS_DOWNLOAD_CHUNK_SIZE") + .as_deref() + .map(|x| x.parse().expect("integer")) + .unwrap_or(64 * 1024 * 1024); + + if config::verbose() { + eprintln!("async download_chunk_size: {}", v) + } + + v +}); + +pub(super) fn get_download_chunk_size() -> usize { + *DOWNLOAD_CHUNK_SIZE +} + pub trait GetSize { fn size(&self) -> u64; } @@ -158,6 +177,10 @@ fn get_semaphore() -> &'static (Semaphore, u32) { }) } +pub(crate) fn get_concurrency_limit() -> u32 { + get_semaphore().1 +} + pub async fn tune_with_concurrency_budget(requested_budget: u32, callable: F) -> Fut::Output where F: FnOnce() -> Fut, diff --git a/crates/polars-io/src/utils/byte_source.rs b/crates/polars-io/src/utils/byte_source.rs index e2dd3e876c2a..af37d32b36da 100644 --- a/crates/polars-io/src/utils/byte_source.rs +++ b/crates/polars-io/src/utils/byte_source.rs @@ -1,6 +1,7 @@ use std::ops::Range; use std::sync::Arc; +use polars_core::prelude::PlHashMap; use polars_error::PolarsResult; use polars_utils::_limit_path_len_io_err; use polars_utils::mmap::MemSlice; @@ -16,7 +17,11 @@ pub trait ByteSource: Send + Sync { /// # Panics /// Panics if `range` is not in bounds. async fn get_range(&self, range: Range) -> PolarsResult; - async fn get_ranges(&self, ranges: &[Range]) -> PolarsResult>; + /// Note: This will mutably sort ranges for coalescing. + async fn get_ranges( + &self, + ranges: &mut [Range], + ) -> PolarsResult>; } /// Byte source backed by a `MemSlice`, which can potentially be memory-mapped. @@ -49,11 +54,14 @@ impl ByteSource for MemSliceByteSource { Ok(out) } - async fn get_ranges(&self, ranges: &[Range]) -> PolarsResult> { + async fn get_ranges( + &self, + ranges: &mut [Range], + ) -> PolarsResult> { Ok(ranges .iter() - .map(|x| self.0.slice(x.clone())) - .collect::>()) + .map(|x| (x.start, self.0.slice(x.clone()))) + .collect()) } } @@ -88,9 +96,11 @@ impl ByteSource for ObjectStoreByteSource { Ok(mem_slice) } - async fn get_ranges(&self, ranges: &[Range]) -> PolarsResult> { - let ranges = self.store.get_ranges(&self.path, ranges).await?; - Ok(ranges.into_iter().map(MemSlice::from_bytes).collect()) + async fn get_ranges( + &self, + ranges: &mut [Range], + ) -> PolarsResult> { + self.store.get_ranges_sort(&self.path, ranges).await } } @@ -130,7 +140,10 @@ impl ByteSource for DynByteSource { } } - async fn get_ranges(&self, ranges: &[Range]) -> PolarsResult> { + async fn get_ranges( + &self, + ranges: &mut [Range], + ) -> PolarsResult> { match self { Self::MemSlice(v) => v.get_ranges(ranges).await, Self::Cloud(v) => v.get_ranges(ranges).await, diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs index 9a87f0f91b7c..a99d7535d970 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::sync::Arc; -use polars_core::prelude::{ArrowSchema, InitHashMaps, PlHashMap}; +use polars_core::prelude::{ArrowSchema, PlHashMap}; use polars_core::series::IsSorted; use polars_core::utils::operation_exceeded_idxsize_msg; use polars_error::{polars_err, PolarsResult}; @@ -197,46 +197,32 @@ impl RowGroupDataFetcher { mem_slice, } } else if let Some(columns) = projection.as_ref() { - let ranges = get_row_group_byte_ranges_for_projection( + let mut ranges = get_row_group_byte_ranges_for_projection( &row_group_metadata, columns.as_ref(), ) .collect::>(); - let bytes = current_byte_source.get_ranges(ranges.as_ref()).await?; + let n_ranges = ranges.len(); - assert_eq!(bytes.len(), ranges.len()); + let bytes_map = current_byte_source.get_ranges(&mut ranges).await?; - let mut bytes_map = PlHashMap::with_capacity(ranges.len()); - - for (range, bytes) in ranges.iter().zip(bytes) { - memory_prefetch_func(bytes.as_ref()); - let v = bytes_map.insert(range.start, bytes); - debug_assert!(v.is_none(), "duplicate range start {}", range.start); - } + assert_eq!(bytes_map.len(), n_ranges); FetchedBytes::BytesMap(bytes_map) } else { - // We have a dedicated code-path for a full projection that performs a - // single range request for the entire row group. During testing this - // provided much higher throughput from cloud than making multiple range - // request with `get_ranges()`. - let full_range = row_group_metadata.full_byte_range(); - let full_range = full_range.start as usize..full_range.end as usize; - - let mem_slice = { - let full_range_2 = full_range.clone(); - task_handles_ext::AbortOnDropHandle(io_runtime.spawn(async move { - current_byte_source.get_range(full_range_2).await - })) - .await - .unwrap()? - }; + let mut ranges = row_group_metadata + .byte_ranges_iter() + .map(|x| x.start as usize..x.end as usize) + .collect::>(); - FetchedBytes::MemSlice { - offset: full_range.start, - mem_slice, - } + let n_ranges = ranges.len(); + + let bytes_map = current_byte_source.get_ranges(&mut ranges).await?; + + assert_eq!(bytes_map.len(), n_ranges); + + FetchedBytes::BytesMap(bytes_map) }; PolarsResult::Ok(RowGroupData { diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs index 0ac1a643d93d..ef07714d591f 100644 --- a/crates/polars-utils/src/mmap.rs +++ b/crates/polars-utils/src/mmap.rs @@ -130,6 +130,12 @@ mod private { out } } + + impl From for MemSlice { + fn from(value: bytes::Bytes) -> Self { + Self::from_bytes(value) + } + } } use memmap::MmapOptions; From 29eed4373757f102eb44d6d6a342d1adcfceefd3 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 11 Nov 2024 20:11:41 +1100 Subject: [PATCH 2/3] c --- crates/polars-io/src/cloud/polars_object_store.rs | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/crates/polars-io/src/cloud/polars_object_store.rs b/crates/polars-io/src/cloud/polars_object_store.rs index 70b522e7ec0c..cbb804198c12 100644 --- a/crates/polars-io/src/cloud/polars_object_store.rs +++ b/crates/polars-io/src/cloud/polars_object_store.rs @@ -49,10 +49,6 @@ impl PolarsObjectStore { .await .map_err(to_compute_err) } else { - dbg!(&range); - dbg!(parts.len()); - dbg!(parts.clone().take(10).collect::>()); - let parts = tune_with_concurrency_budget( parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32, || { @@ -97,15 +93,6 @@ impl PolarsObjectStore { let (merged_ranges, merged_ends): (Vec<_>, Vec<_>) = merge_ranges(ranges).unzip(); - dbg!(ranges.len()); - dbg!(ranges.iter().take(10).collect::>()); - dbg!(merged_ranges.len()); - dbg!(merged_ranges - .iter() - .zip(merged_ends.iter()) - .take(10) - .collect::>()); - let mut stream = self.get_buffered_ranges_stream(path, merged_ranges.iter().cloned()); tune_with_concurrency_budget( @@ -252,7 +239,7 @@ impl PolarsObjectStore { /// Splits a single range into multiple smaller ranges, which can be downloaded concurrently for /// much higher throughput. -fn split_range(range: Range) -> impl ExactSizeIterator> + Clone { +fn split_range(range: Range) -> impl ExactSizeIterator> { let chunk_size = get_download_chunk_size(); // Calculate n_parts such that we are as close as possible to the `chunk_size`. From 8a6da81a06dc1b2d306b51c98cc0be781b837433 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Mon, 11 Nov 2024 20:27:43 +1100 Subject: [PATCH 3/3] c --- .../src/nodes/parquet_source/row_group_data_fetch.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs index a99d7535d970..bf2e7e60ea6e 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs @@ -211,6 +211,11 @@ impl RowGroupDataFetcher { FetchedBytes::BytesMap(bytes_map) } else { + // We still prefer `get_ranges()` over a single `get_range()` for downloading + // the entire row group, as it can have less memory-copying. A single `get_range()` + // would naively concatenate the memory blocks of the entire row group, while + // `get_ranges()` can skip concatenation since the downloaded blocks are + // aligned to the columns. let mut ranges = row_group_metadata .byte_ranges_iter() .map(|x| x.start as usize..x.end as usize)