Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for IO[bytes] and bytes in scan_{...} functions #18532

Merged
merged 27 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mod splitfields;
mod utils;

pub use options::{CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues};
pub use parser::count_rows;
pub use parser::{count_rows, count_rows_from_slice};
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use schema_inference::infer_file_schema;
58 changes: 34 additions & 24 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::path::PathBuf;
use std::path::Path;

use memchr::memchr2_iter;
use num_traits::Pow;
use polars_core::prelude::*;
use polars_core::{config, POOL};
use polars_error::feature_gated;
use polars_utils::index::Bounded;
use polars_utils::slice::GetSaferUnchecked;
use rayon::prelude::*;
Expand All @@ -18,71 +19,80 @@ use crate::utils::maybe_decompress_bytes;
/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows(
path: &PathBuf,
path: &Path,
separator: u8,
quote_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
let file = if is_cloud_url(path) || config::force_async() {
#[cfg(feature = "cloud")]
{
feature_gated!("cloud", {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by: this is the preferred way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, much nicer.

crate::file_cache::FILE_CACHE
.get_entry(path.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()?
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
})
} else {
polars_utils::open_file(path)?
};

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let owned = &mut vec![];
let mut reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

for _ in 0..reader_bytes.len() {
if reader_bytes[0] != eol_char {
count_rows_from_slice(
reader_bytes,
separator,
quote_char,
comment_prefix,
eol_char,
has_header,
)
}

/// Read the number of rows without parsing columns
/// useful for count(*) queries
pub fn count_rows_from_slice(
mut bytes: &[u8],
separator: u8,
quote_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
eol_char: u8,
has_header: bool,
) -> PolarsResult<usize> {
for _ in 0..bytes.len() {
if bytes[0] != eol_char {
break;
}

reader_bytes = &reader_bytes[1..];
bytes = &bytes[1..];
}

const MIN_ROWS_PER_THREAD: usize = 1024;
let max_threads = POOL.current_num_threads();

// Determine if parallelism is beneficial and how many threads
let n_threads = get_line_stats(
reader_bytes,
bytes,
MIN_ROWS_PER_THREAD,
eol_char,
None,
separator,
quote_char,
)
.map(|(mean, std)| {
let n_rows = (reader_bytes.len() as f32 / (mean - 0.01 * std)) as usize;
let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
(n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
})
.unwrap_or(1);

let file_chunks: Vec<(usize, usize)> = get_file_chunks(
reader_bytes,
n_threads,
None,
separator,
quote_char,
eol_char,
);
let file_chunks: Vec<(usize, usize)> =
get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);

let iter = file_chunks.into_par_iter().map(|(start, stop)| {
let local_bytes = &reader_bytes[start..stop];
let local_bytes = &bytes[start..stop];
let row_iterator = SplitLines::new(local_bytes, quote_char.unwrap_or(b'"'), eol_char);
if comment_prefix.is_some() {
Ok(row_iterator
Expand Down
4 changes: 1 addition & 3 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ use crate::RowIndex;

#[derive(Clone, Debug, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcScanOptions {
pub memory_map: bool,
}
pub struct IpcScanOptions;

/// Read Arrows IPC format into a DataFrame
///
Expand Down
13 changes: 3 additions & 10 deletions crates/polars-io/src/ipc/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use arrow::io::ipc::read::{Dictionaries, FileMetadata};
use arrow::mmap::{mmap_dictionaries_unchecked, mmap_unchecked};
use arrow::record_batch::RecordBatch;
use polars_core::prelude::*;
use polars_utils::mmap::MMapSemaphore;

use super::ipc_file::IpcReader;
use crate::mmap::{MMapSemaphore, MmapBytesReader};
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::shared::{finish_reader, ArrowReader};
use crate::utils::{apply_projection, columns_to_projection};
Expand All @@ -15,17 +16,9 @@ impl<R: MmapBytesReader> IpcReader<R> {
&mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
) -> PolarsResult<DataFrame> {
#[cfg(target_family = "unix")]
use std::os::unix::fs::MetadataExt;
match self.reader.to_file() {
Some(file) => {
#[cfg(target_family = "unix")]
let metadata = file.metadata()?;
let mmap = unsafe { memmap::Mmap::map(file).unwrap() };
#[cfg(target_family = "unix")]
let semaphore = MMapSemaphore::new(metadata.dev(), metadata.ino(), mmap);
#[cfg(not(target_family = "unix"))]
let semaphore = MMapSemaphore::new(mmap);
let semaphore = MMapSemaphore::new_from_file(file)?;
let metadata =
read::read_file_metadata(&mut std::io::Cursor::new(semaphore.as_ref()))?;

Expand Down
89 changes: 10 additions & 79 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,9 @@
#[cfg(target_family = "unix")]
use std::collections::btree_map::Entry;
#[cfg(target_family = "unix")]
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, Cursor, Read, Seek};
use std::sync::Arc;
#[cfg(target_family = "unix")]
use std::sync::Mutex;

use memmap::Mmap;
#[cfg(target_family = "unix")]
use once_cell::sync::Lazy;
use polars_core::config::verbose;
#[cfg(target_family = "unix")]
use polars_error::polars_bail;
use polars_error::PolarsResult;
use polars_utils::mmap::MemSlice;

// Keep track of memory mapped files so we don't write to them while reading
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
// Write handle in Windows is exclusive, so this is only necessary in Unix.
#[cfg(target_family = "unix")]
static MEMORY_MAPPED_FILES: Lazy<Mutex<BTreeMap<(u64, u64), u32>>> =
Lazy::new(|| Mutex::new(Default::default()));

pub(crate) struct MMapSemaphore {
#[cfg(target_family = "unix")]
key: (u64, u64),
mmap: Mmap,
}

impl MMapSemaphore {
#[cfg(target_family = "unix")]
pub(super) fn new(dev: u64, ino: u64, mmap: Mmap) -> Self {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
let key = (dev, ino);
guard.insert(key, 1);
Self { key, mmap }
}

#[cfg(not(target_family = "unix"))]
pub(super) fn new(mmap: Mmap) -> Self {
Self { mmap }
}
}

impl AsRef<[u8]> for MMapSemaphore {
#[inline]
fn as_ref(&self) -> &[u8] {
self.mmap.as_ref()
}
}

#[cfg(target_family = "unix")]
impl Drop for MMapSemaphore {
fn drop(&mut self) {
let mut guard = MEMORY_MAPPED_FILES.lock().unwrap();
if let Entry::Occupied(mut e) = guard.entry(self.key) {
let v = e.get_mut();
*v -= 1;

if *v == 0 {
e.remove_entry();
}
}
}
}

pub fn ensure_not_mapped(#[allow(unused)] file: &File) -> PolarsResult<()> {
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::MetadataExt;
let guard = MEMORY_MAPPED_FILES.lock().unwrap();
let metadata = file.metadata()?;
if guard.contains_key(&(metadata.dev(), metadata.ino())) {
polars_bail!(ComputeError: "cannot write to file: already memory mapped");
}
}
Ok(())
}
use polars_utils::mmap::{MMapSemaphore, MemSlice};

/// Trait used to get a hold to file handler or to the underlying bytes
/// without performing a Read.
Expand All @@ -104,6 +29,12 @@ impl MmapBytesReader for BufReader<File> {
}
}

impl MmapBytesReader for BufReader<&File> {
fn to_file(&self) -> Option<&File> {
Some(self.get_ref())
}
}

impl<T> MmapBytesReader for Cursor<T>
where
T: AsRef<[u8]> + Send + Sync,
Expand Down Expand Up @@ -137,7 +68,7 @@ impl<T: MmapBytesReader> MmapBytesReader for &mut T {
pub enum ReaderBytes<'a> {
Borrowed(&'a [u8]),
Owned(Vec<u8>),
Mapped(memmap::Mmap, &'a File),
Mapped(MMapSemaphore, &'a File),
}

impl std::ops::Deref for ReaderBytes<'_> {
Expand All @@ -146,7 +77,7 @@ impl std::ops::Deref for ReaderBytes<'_> {
match self {
Self::Borrowed(ref_bytes) => ref_bytes,
Self::Owned(vec) => vec,
Self::Mapped(mmap, _) => mmap,
Self::Mapped(mmap, _) => mmap.as_ref(),
}
}
}
Expand Down Expand Up @@ -174,7 +105,7 @@ impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> {
None => {
if let Some(f) = m.to_file() {
let f = unsafe { std::mem::transmute::<&File, &'a File>(f) };
let mmap = unsafe { memmap::Mmap::map(f).unwrap() };
let mmap = MMapSemaphore::new_from_file(f).unwrap();
ReaderBytes::Mapped(mmap, f)
} else {
if verbose() {
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/path_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub fn expand_paths(
paths: &[PathBuf],
glob: bool,
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
) -> PolarsResult<Arc<Vec<PathBuf>>> {
) -> PolarsResult<Arc<[PathBuf]>> {
expand_paths_hive(paths, glob, cloud_options, false).map(|x| x.0)
}

Expand Down Expand Up @@ -129,7 +129,7 @@ pub fn expand_paths_hive(
glob: bool,
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
check_directory_level: bool,
) -> PolarsResult<(Arc<Vec<PathBuf>>, usize)> {
) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
let Some(first_path) = paths.first() else {
return Ok((vec![].into(), 0));
};
Expand Down Expand Up @@ -361,7 +361,7 @@ pub fn expand_paths_hive(
out_paths
};

Ok((Arc::new(out_paths), hive_idx_tracker.idx))
Ok((out_paths.into(), hive_idx_tracker.idx))
}

/// Ignores errors from `std::fs::create_dir_all` if the directory exists.
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-io/src/utils/byte_source.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops::Range;
use std::sync::Arc;

use polars_error::{to_compute_err, PolarsResult};
use polars_error::PolarsResult;
use polars_utils::_limit_path_len_io_err;
use polars_utils::mmap::MemSlice;

Expand Down Expand Up @@ -34,9 +34,8 @@ impl MemSliceByteSource {
.into_std()
.await,
);
let mmap = Arc::new(unsafe { memmap::Mmap::map(file.as_ref()) }.map_err(to_compute_err)?);

Ok(Self(MemSlice::from_mmap(mmap)))
Ok(Self(MemSlice::from_file(file.as_ref())?))
}
}

Expand Down
6 changes: 5 additions & 1 deletion crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_error::to_compute_err;
use polars_utils::mmap::MMapSemaphore;
use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand All @@ -21,12 +22,15 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
.ok()
.and_then(|offset| Some((reader.to_file()?, offset)))
{
let mmap = unsafe { memmap::MmapOptions::new().offset(offset).map(file)? };
let mut options = memmap::MmapOptions::new();
options.offset(offset);

// somehow bck thinks borrows alias
// this is sound as file was already bound to 'a
use std::fs::File;

let file = unsafe { std::mem::transmute::<&File, &'a File>(file) };
let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
Ok(ReaderBytes::Mapped(mmap, file))
} else {
// we can get the bytes for free
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub mod dsl;
pub mod frame;
pub mod physical_plan;
pub mod prelude;

mod scan;
#[cfg(test)]
mod tests;
Loading