Skip to content

Commit

Permalink
refactor: Factor out logic for re-use by new streaming CSV source (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Nov 6, 2024
1 parent f12eff8 commit 750b371
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 256 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Utf8Field {
}

#[inline]
pub(super) fn validate_utf8(bytes: &[u8]) -> bool {
pub fn validate_utf8(bytes: &[u8]) -> bool {
simdutf8::basic::from_utf8(bytes).is_ok()
}

Expand Down
8 changes: 8 additions & 0 deletions crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ pub use parser::{count_rows, count_rows_from_slice};
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use schema_inference::infer_file_schema;

pub mod _csv_read_internal {
pub use super::buffer::validate_utf8;
pub use super::options::NullValuesCompiled;
pub use super::parser::CountLines;
pub use super::read_impl::{cast_columns, find_starting_point, read_chunk};
pub use super::reader::prepare_csv_schema;
}
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ pub enum NullValues {
}

impl NullValues {
pub(super) fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
pub fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
Ok(match self {
NullValues::AllColumnsSingle(v) => NullValuesCompiled::AllColumnsSingle(v),
NullValues::AllColumns(v) => NullValuesCompiled::AllColumns(v),
Expand All @@ -388,7 +388,7 @@ impl NullValues {
}

#[derive(Debug, Clone)]
pub(super) enum NullValuesCompiled {
pub enum NullValuesCompiled {
/// A single value that's used for all columns
AllColumnsSingle(PlSmallStr),
// Multiple null values that are null for all columns
Expand Down
18 changes: 16 additions & 2 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ impl<'a> Iterator for SplitLines<'a> {
}
}

pub(super) struct CountLines {
pub struct CountLines {
quote_char: u8,
eol_char: u8,
#[cfg(feature = "simd")]
Expand All @@ -560,7 +560,7 @@ pub(super) struct CountLines {
}

impl CountLines {
pub(super) fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
let quoting = quote_char.is_some();
let quote_char = quote_char.unwrap_or(b'\"');
#[cfg(feature = "simd")]
Expand All @@ -578,6 +578,20 @@ impl CountLines {
}
}

pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
loop {
let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };

let (count, offset) = self.count(b);

if count > 0 || b.len() == bytes.len() {
return (count, offset);
}

*chunk_size *= 2;
}
}

// Returns count and offset in slice
#[cfg(feature = "simd")]
pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
Expand Down
186 changes: 112 additions & 74 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::parser::{
is_comment_line, parse_lines, skip_bom, skip_line_ending, skip_this_line, CountLines,
SplitLines,
};
use super::reader::prepare_csv_schema;
use super::schema_inference::{check_decimal_comma, infer_file_schema};
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use super::utils::decompress;
Expand All @@ -27,7 +28,7 @@ use crate::utils::compression::SupportedCompression;
use crate::utils::update_row_counts2;
use crate::RowIndex;

pub(crate) fn cast_columns(
pub fn cast_columns(
df: &mut DataFrame,
to_cast: &[Field],
parallel: bool,
Expand Down Expand Up @@ -123,6 +124,8 @@ pub(crate) struct CoreReader<'a> {
to_cast: Vec<Field>,
row_index: Option<RowIndex>,
truncate_ragged_lines: bool,
#[cfg_attr(not(feature = "dtype-categorical"), allow(unused))]
has_categorical: bool,
}

impl fmt::Debug for CoreReader<'_> {
Expand Down Expand Up @@ -159,7 +162,7 @@ impl<'a> CoreReader<'a> {
null_values: Option<NullValues>,
missing_is_null: bool,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
to_cast: Vec<Field>,
mut to_cast: Vec<Field>,
skip_rows_after_header: usize,
row_index: Option<RowIndex>,
try_parse_dates: bool,
Expand Down Expand Up @@ -224,6 +227,8 @@ impl<'a> CoreReader<'a> {
}
}

let has_categorical = prepare_csv_schema(&mut schema, &mut to_cast)?;

// Create a null value for every column
let null_values = null_values.map(|nv| nv.compile(&schema)).transpose()?;

Expand Down Expand Up @@ -260,76 +265,28 @@ impl<'a> CoreReader<'a> {
row_index,
truncate_ragged_lines,
decimal_comma,
has_categorical,
})
}

fn find_starting_point<'b>(
&self,
mut bytes: &'b [u8],
bytes: &'b [u8],
quote_char: Option<u8>,
eol_char: u8,
) -> PolarsResult<(&'b [u8], Option<usize>)> {
let starting_point_offset = bytes.as_ptr() as usize;

// Skip utf8 byte-order-mark (BOM)
bytes = skip_bom(bytes);

// \n\n can be a empty string row of a single column
// in other cases we skip it.
if self.schema.len() > 1 {
bytes = skip_line_ending(bytes, eol_char)
}

// skip 'n' leading rows
if self.skip_rows_before_header > 0 {
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char);
let mut current_line = &bytes[..0];

for _ in 0..self.skip_rows_before_header {
current_line = split_lines
.next()
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
}

current_line = split_lines
.next()
.unwrap_or(&current_line[current_line.len()..]);
bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
}

// skip lines that are comments
while is_comment_line(bytes, self.comment_prefix.as_ref()) {
bytes = skip_this_line(bytes, quote_char, eol_char);
}

// skip header row
if self.has_header {
bytes = skip_this_line(bytes, quote_char, eol_char);
}
// skip 'n' rows following the header
if self.skip_rows_after_header > 0 {
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char);
let mut current_line = &bytes[..0];

for _ in 0..self.skip_rows_after_header {
current_line = split_lines
.next()
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
}

current_line = split_lines
.next()
.unwrap_or(&current_line[current_line.len()..]);
bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
}

let starting_point_offset = if bytes.is_empty() {
None
} else {
Some(bytes.as_ptr() as usize - starting_point_offset)
};
let i = find_starting_point(
bytes,
quote_char,
eol_char,
self.schema.len(),
self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_prefix.as_ref(),
self.has_header,
)?;

Ok((bytes, starting_point_offset))
Ok((&bytes[i..], (i <= bytes.len()).then_some(i)))
}

fn get_projection(&mut self) -> PolarsResult<Vec<usize>> {
Expand Down Expand Up @@ -382,8 +339,7 @@ impl<'a> CoreReader<'a> {
}

fn parse_csv(&mut self, bytes: &[u8]) -> PolarsResult<DataFrame> {
let (bytes, starting_point_offset) =
self.find_starting_point(bytes, self.quote_char, self.eol_char)?;
let (bytes, _) = self.find_starting_point(bytes, self.quote_char, self.eol_char)?;

let projection = self.get_projection()?;

Expand Down Expand Up @@ -451,16 +407,12 @@ impl<'a> CoreReader<'a> {

pool.scope(|s| {
loop {
let b = unsafe {
bytes.get_unchecked(
total_offset..std::cmp::min(total_offset + chunk_size, bytes.len()),
)
};
let b = unsafe { bytes.get_unchecked(total_offset..) };
if b.is_empty() {
break;
}
debug_assert!(total_offset == 0 || bytes[total_offset - 1] == self.eol_char);
let (count, position) = counter.count(b);
let (count, position) = counter.find_next(b, &mut chunk_size);
debug_assert!(count == 0 || b[position] == self.eol_char);

let (b, count) = if count == 0
Expand Down Expand Up @@ -498,7 +450,7 @@ impl<'a> CoreReader<'a> {
}

let result = slf
.read_chunk(b, projection, 0, count, starting_point_offset, b.len())
.read_chunk(b, projection, 0, count, Some(0), b.len())
.and_then(|mut df| {
debug_assert!(df.height() <= count);

Expand Down Expand Up @@ -554,7 +506,14 @@ impl<'a> CoreReader<'a> {
}

/// Read the csv into a DataFrame. The predicate can come from a lazy physical plan.
pub fn as_df(&mut self) -> PolarsResult<DataFrame> {
pub fn finish(mut self) -> PolarsResult<DataFrame> {
#[cfg(feature = "dtype-categorical")]
let mut _cat_lock = if self.has_categorical {
Some(polars_core::StringCacheHolder::hold())
} else {
None
};

let reader_bytes = self.reader_bytes.take().unwrap();

let mut df = self.parse_csv(&reader_bytes)?;
Expand All @@ -571,7 +530,7 @@ impl<'a> CoreReader<'a> {
}

#[allow(clippy::too_many_arguments)]
fn read_chunk(
pub fn read_chunk(
bytes: &[u8],
separator: u8,
schema: &Schema,
Expand Down Expand Up @@ -606,6 +565,8 @@ fn read_chunk(
decimal_comma,
)?;

debug_assert!(projection.is_sorted());

let mut last_read = usize::MAX;
loop {
if read >= stop_at_nbytes || read == last_read {
Expand Down Expand Up @@ -640,3 +601,80 @@ fn read_chunk(
.collect::<PolarsResult<Vec<_>>>()?;
Ok(unsafe { DataFrame::new_no_checks_height_from_first(columns) })
}

#[allow(clippy::too_many_arguments)]
pub fn find_starting_point(
mut bytes: &[u8],
quote_char: Option<u8>,
eol_char: u8,
schema_len: usize,
skip_rows_before_header: usize,
skip_rows_after_header: usize,
comment_prefix: Option<&CommentPrefix>,
has_header: bool,
) -> PolarsResult<usize> {
let full_len = bytes.len();
let starting_point_offset = bytes.as_ptr() as usize;

// Skip utf8 byte-order-mark (BOM)
bytes = skip_bom(bytes);

// \n\n can be a empty string row of a single column
// in other cases we skip it.
if schema_len > 1 {
bytes = skip_line_ending(bytes, eol_char)
}

// skip 'n' leading rows
if skip_rows_before_header > 0 {
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char);
let mut current_line = &bytes[..0];

for _ in 0..skip_rows_before_header {
current_line = split_lines
.next()
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
}

current_line = split_lines
.next()
.unwrap_or(&current_line[current_line.len()..]);
bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
}

// skip lines that are comments
while is_comment_line(bytes, comment_prefix) {
bytes = skip_this_line(bytes, quote_char, eol_char);
}

// skip header row
if has_header {
bytes = skip_this_line(bytes, quote_char, eol_char);
}
// skip 'n' rows following the header
if skip_rows_after_header > 0 {
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char);
let mut current_line = &bytes[..0];

for _ in 0..skip_rows_after_header {
current_line = split_lines
.next()
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;
}

current_line = split_lines
.next()
.unwrap_or(&current_line[current_line.len()..]);
bytes = &bytes[current_line.as_ptr() as usize - bytes.as_ptr() as usize..];
}

Ok(
// Some of the functions we call may return `&'static []` instead of
// slices of `&bytes[..]`.
if bytes.is_empty() {
full_len
} else {
bytes.as_ptr() as usize - starting_point_offset
},
)
}
Loading

0 comments on commit 750b371

Please sign in to comment.