Skip to content

Commit

Permalink
refactor(rust): Reduce memcopy in parquet (#19350)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Oct 21, 2024
1 parent 01e801f commit 81154ed
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 37 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<'a> CoreReader<'a> {
if let Some(b) =
decompress(&reader_bytes, total_n_rows, separator, quote_char, eol_char)
{
reader_bytes = ReaderBytes::Owned(b);
reader_bytes = ReaderBytes::Owned(b.into());
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/schema_inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ fn infer_file_schema_inner(
buf.push(eol_char);

return infer_file_schema_inner(
&ReaderBytes::Owned(buf),
&ReaderBytes::Owned(buf.into()),
separator,
max_read_rows,
has_header,
Expand Down Expand Up @@ -481,7 +481,7 @@ fn infer_file_schema_inner(
rb.extend_from_slice(reader_bytes);
rb.push(eol_char);
return infer_file_schema_inner(
&ReaderBytes::Owned(rb),
&ReaderBytes::Owned(rb.into()),
separator,
max_read_rows,
has_header,
Expand Down
26 changes: 12 additions & 14 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::fs::File;
use std::io::{BufReader, Cursor, Read, Seek};
use std::sync::Arc;

use polars_core::config::verbose;
use polars_utils::mmap::{MMapSemaphore, MemSlice};
use polars_utils::mmap::MemSlice;

/// Trait used to get a hold to file handler or to the underlying bytes
/// without performing a Read.
Expand Down Expand Up @@ -67,8 +66,7 @@ impl<T: MmapBytesReader> MmapBytesReader for &mut T {
// Handle various forms of input bytes
pub enum ReaderBytes<'a> {
Borrowed(&'a [u8]),
Owned(Vec<u8>),
Mapped(MMapSemaphore, &'a File),
Owned(MemSlice),
}

impl std::ops::Deref for ReaderBytes<'_> {
Expand All @@ -77,19 +75,21 @@ impl std::ops::Deref for ReaderBytes<'_> {
match self {
Self::Borrowed(ref_bytes) => ref_bytes,
Self::Owned(vec) => vec,
Self::Mapped(mmap, _) => mmap.as_ref(),
}
}
}

/// Require 'static to force the caller to do any transmute as it's usually much
/// clearer to see there whether it's sound.
/// There are some places that perform manual lifetime management after transmuting `ReaderBytes`
/// to have a `'static` inner lifetime. The advantage to doing this is that it lets you construct a
/// `MemSlice` from the `ReaderBytes` in a zero-copy manner regardless of the underlying enum
/// variant.
impl ReaderBytes<'static> {
pub fn into_mem_slice(self) -> MemSlice {
/// Construct a `MemSlice` in a zero-copy manner from the underlying bytes, with the assumption
/// that the underlying bytes have a `'static` lifetime.
pub fn to_memslice(&self) -> MemSlice {
match self {
ReaderBytes::Borrowed(v) => MemSlice::from_static(v),
ReaderBytes::Owned(v) => MemSlice::from_vec(v),
ReaderBytes::Mapped(v, _) => MemSlice::from_mmap(Arc::new(v)),
ReaderBytes::Owned(v) => v.clone(),
}
}
}
Expand All @@ -104,16 +104,14 @@ impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> {
},
None => {
if let Some(f) = m.to_file() {
let f = unsafe { std::mem::transmute::<&File, &'a File>(f) };
let mmap = MMapSemaphore::new_from_file(f).unwrap();
ReaderBytes::Mapped(mmap, f)
ReaderBytes::Owned(MemSlice::from_file(f).unwrap())
} else {
if verbose() {
eprintln!("could not memory map file; read to buffer.")
}
let mut buf = vec![];
m.read_to_end(&mut buf).expect("could not read");
ReaderBytes::Owned(buf)
ReaderBytes::Owned(MemSlice::from_vec(buf))
}
},
}
Expand Down
12 changes: 4 additions & 8 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{
self, ColumnChunkMetadata, FileMetadata, Filter, PhysicalType, RowGroupMetadata,
};
use polars_utils::mmap::MemSlice;
use rayon::prelude::*;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -908,10 +907,9 @@ pub fn read_parquet<R: MmapBytesReader>(
}

let reader = ReaderBytes::from(&mut reader);
let store = mmap::ColumnStore::Local(
unsafe { std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader) }
.into_mem_slice(),
);
let store = mmap::ColumnStore::Local(unsafe {
std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
});

let dfs = rg_to_dfs(
&store,
Expand Down Expand Up @@ -959,9 +957,7 @@ impl FetchRowGroupsFromMmapReader {

fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
// @TODO: we can something smarter here with mmap
Ok(mmap::ColumnStore::Local(MemSlice::from_vec(
self.0.deref().to_vec(),
)))
Ok(mmap::ColumnStore::Local(self.0.to_memslice()))
}
}

Expand Down
18 changes: 6 additions & 12 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use once_cell::sync::Lazy;
use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_utils::mmap::MMapSemaphore;
use polars_utils::mmap::{MMapSemaphore, MemSlice};
use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};

pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
reader: &'a mut R,
) -> PolarsResult<ReaderBytes<'a>> {
pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
reader: &mut R,
) -> PolarsResult<ReaderBytes<'_>> {
// we have a file so we can mmap
// only seekable files are mmap-able
if let Some((file, offset)) = reader
Expand All @@ -23,14 +23,8 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
{
let mut options = memmap::MmapOptions::new();
options.offset(offset);

// somehow bck thinks borrows alias
// this is sound as file was already bound to 'a
use std::fs::File;

let file = unsafe { std::mem::transmute::<&File, &'a File>(file) };
let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
Ok(ReaderBytes::Mapped(mmap, file))
Ok(ReaderBytes::Owned(MemSlice::from_mmap(Arc::new(mmap))))
} else {
// we can get the bytes for free
if reader.to_bytes().is_some() {
Expand All @@ -40,7 +34,7 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
// we have to read to an owned buffer to get the bytes.
let mut bytes = Vec::with_capacity(1024 * 128);
reader.read_to_end(&mut bytes)?;
Ok(ReaderBytes::Owned(bytes))
Ok(ReaderBytes::Owned(bytes.into()))
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-utils/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ mod private {
}
}

impl From<Vec<u8>> for MemSlice {
fn from(value: Vec<u8>) -> Self {
Self::from_vec(value)
}
}

impl MemSlice {
pub const EMPTY: Self = Self::from_static(&[]);

Expand Down

0 comments on commit 81154ed

Please sign in to comment.