Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement support for multi-character comments in read_csv #12519

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use polars_core::prelude::*;
use polars_time::prelude::*;
#[cfg(feature = "temporal")]
use rayon::prelude::*;
pub use read::{CsvEncoding, CsvReader, NullValues};
pub use read::{CommentPrefix, CsvEncoding, CsvReader, NullValues};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub use write::{BatchedWriter, CsvWriter, QuoteStyle};
Expand Down
25 changes: 17 additions & 8 deletions crates/polars-io/src/csv/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::prelude::*;
use super::buffer::*;
use crate::csv::read::NullValuesCompiled;
use crate::csv::splitfields::SplitFields;
use crate::csv::CommentPrefix;

/// Skip the utf-8 Byte Order Mark.
/// credits to csv-core
Expand All @@ -16,6 +17,17 @@ pub(crate) fn skip_bom(input: &[u8]) -> &[u8] {
}
}

/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
///
/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
pub(crate) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
match comment_prefix {
Some(CommentPrefix::Single(c)) => line.starts_with(&[*c]),
Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
None => false,
}
}

/// Find the nearest next line position.
/// Does not check for new line characters embedded in String fields.
pub(crate) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
Expand Down Expand Up @@ -351,7 +363,7 @@ pub(super) fn parse_lines<'a>(
mut bytes: &'a [u8],
offset: usize,
separator: u8,
comment_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
missing_is_null: bool,
Expand Down Expand Up @@ -400,13 +412,10 @@ pub(super) fn parse_lines<'a>(
}

// deal with comments
if let Some(c) = comment_char {
// line is a comment -> skip
if bytes[0] == c {
let bytes_rem = skip_this_line(bytes, quote_char, eol_char);
bytes = bytes_rem;
continue;
}
if is_comment_line(bytes, comment_prefix) {
let bytes_rem = skip_this_line(bytes, quote_char, eol_char);
bytes = bytes_rem;
continue;
}

// Every line we only need to parse the columns that are projected.
Expand Down
55 changes: 47 additions & 8 deletions crates/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@ pub enum NullValues {
Named(Vec<(String, String)>),
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CommentPrefix {
/// A single byte character that indicates the start of a comment line.
Single(u8),
/// A string that indicates the start of a comment line.
/// This allows for multiple characters to be used as a comment identifier.
Multi(String),
}

impl CommentPrefix {
/// Creates a new `CommentPrefix` for the `Single` variant.
pub fn new_single(c: u8) -> Self {
CommentPrefix::Single(c)
}

/// Creates a new `CommentPrefix`. If `Multi` variant is used and the string is longer
/// than 5 characters, it will return `None`.
pub fn new_multi(s: String) -> Option<Self> {
if s.len() <= 5 {
Some(CommentPrefix::Multi(s))
} else {
None
}
}
}

pub(super) enum NullValuesCompiled {
/// A single value that's used for all columns
AllColumnsSingle(String),
Expand Down Expand Up @@ -118,7 +145,7 @@ where
dtype_overwrite: Option<&'a [DataType]>,
sample_size: usize,
chunk_size: usize,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
null_values: Option<NullValues>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
quote_char: Option<u8>,
Expand Down Expand Up @@ -210,9 +237,21 @@ where
self
}

/// Set the comment character. Lines starting with this character will be ignored.
pub fn with_comment_char(mut self, comment_char: Option<u8>) -> Self {
self.comment_char = comment_char;
/// Set the comment prefix for this instance. Lines starting with this prefix will be ignored.
pub fn with_comment_prefix(mut self, comment_prefix: Option<&str>) -> Self {
self.comment_prefix = comment_prefix.map(|s| {
if s.len() == 1 && s.chars().next().unwrap().is_ascii() {
CommentPrefix::Single(s.as_bytes()[0])
} else {
CommentPrefix::Multi(s.to_string())
}
});
self
}

/// Sets the comment prefix from `CsvParserOptions` for internal initialization.
pub fn _with_comment_prefix(mut self, comment_prefix: Option<CommentPrefix>) -> Self {
self.comment_prefix = comment_prefix;
self
}

Expand Down Expand Up @@ -370,7 +409,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
self.sample_size,
self.chunk_size,
self.low_memory,
self.comment_char,
std::mem::take(&mut self.comment_prefix),
self.quote_char,
self.eol_char,
std::mem::take(&mut self.null_values),
Expand Down Expand Up @@ -487,7 +526,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
None,
&mut self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
Expand Down Expand Up @@ -516,7 +555,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
None,
&mut self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
Expand Down Expand Up @@ -556,7 +595,7 @@ where
sample_size: 1024,
chunk_size: 1 << 18,
low_memory: false,
comment_char: None,
comment_prefix: None,
eol_char: b'\n',
null_values: None,
missing_is_null: true,
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/csv/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl<'a> CoreReader<'a> {
projection,
starting_point_offset,
row_count: self.row_count,
comment_char: self.comment_char,
comment_prefix: self.comment_prefix,
quote_char: self.quote_char,
eol_char: self.eol_char,
null_values: self.null_values,
Expand Down Expand Up @@ -182,7 +182,7 @@ pub struct BatchedCsvReaderMmap<'a> {
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'a> BatchedCsvReaderMmap<'a> {
bytes_offset_thread,
self.quote_char,
self.eol_char,
self.comment_char,
self.comment_prefix.as_ref(),
self.chunk_size,
&self.str_capacities,
self.encoding,
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-io/src/csv/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl<'a> CoreReader<'a> {
projection,
starting_point_offset,
row_count: self.row_count,
comment_char: self.comment_char,
comment_prefix: self.comment_prefix,
quote_char: self.quote_char,
eol_char: self.eol_char,
null_values: self.null_values,
Expand Down Expand Up @@ -265,7 +265,7 @@ pub struct BatchedCsvReaderRead<'a> {
projection: Vec<usize>,
starting_point_offset: Option<usize>,
row_count: Option<RowCount>,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
Expand Down Expand Up @@ -337,7 +337,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
0,
self.quote_char,
self.eol_char,
self.comment_char,
self.comment_prefix.as_ref(),
self.chunk_size,
&self.str_capacities,
self.encoding,
Expand Down
29 changes: 14 additions & 15 deletions crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use rayon::prelude::*;

use crate::csv::buffer::*;
use crate::csv::parser::*;
use crate::csv::read::NullValuesCompiled;
use crate::csv::read::{CommentPrefix, NullValuesCompiled};
use crate::csv::utils::*;
use crate::csv::{CsvEncoding, NullValues};
use crate::mmap::ReaderBytes;
Expand Down Expand Up @@ -109,7 +109,7 @@ pub(crate) struct CoreReader<'a> {
sample_size: usize,
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValuesCompiled>,
Expand Down Expand Up @@ -198,7 +198,7 @@ impl<'a> CoreReader<'a> {
sample_size: usize,
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValues>,
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<'a> CoreReader<'a> {
schema_overwrite.as_deref(),
&mut skip_rows,
skip_rows_after_header,
comment_char,
comment_prefix.as_ref(),
quote_char,
eol_char,
null_values.as_ref(),
Expand Down Expand Up @@ -299,7 +299,7 @@ impl<'a> CoreReader<'a> {
sample_size,
chunk_size,
low_memory,
comment_char,
comment_prefix,
quote_char,
eol_char,
null_values,
Expand Down Expand Up @@ -342,14 +342,13 @@ impl<'a> CoreReader<'a> {

if self.skip_rows_after_header > 0 {
for _ in 0..self.skip_rows_after_header {
let pos = match bytes.first() {
Some(first) if Some(*first) == self.comment_char => {
next_line_position_naive(bytes, eol_char)
},
let pos = if is_comment_line(bytes, self.comment_prefix.as_ref()) {
next_line_position_naive(bytes, eol_char)
} else {
// we don't pass expected fields
// as we want to skip all rows
// no matter the no. of fields
_ => next_line_position(bytes, None, self.separator, self.quote_char, eol_char),
next_line_position(bytes, None, self.separator, self.quote_char, eol_char)
}
.ok_or_else(|| polars_err!(NoData: "not enough lines to skip"))?;

Expand Down Expand Up @@ -598,7 +597,7 @@ impl<'a> CoreReader<'a> {
local_bytes,
offset,
self.separator,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.missing_is_null,
Expand Down Expand Up @@ -670,7 +669,7 @@ impl<'a> CoreReader<'a> {
bytes_offset_thread,
self.quote_char,
self.eol_char,
self.comment_char,
self.comment_prefix.as_ref(),
capacity,
&str_capacities,
self.encoding,
Expand Down Expand Up @@ -716,7 +715,7 @@ impl<'a> CoreReader<'a> {
remaining_bytes,
0,
self.separator,
self.comment_char,
self.comment_prefix.as_ref(),
self.quote_char,
self.eol_char,
self.missing_is_null,
Expand Down Expand Up @@ -800,7 +799,7 @@ fn read_chunk(
bytes_offset_thread: usize,
quote_char: Option<u8>,
eol_char: u8,
comment_char: Option<u8>,
comment_prefix: Option<&CommentPrefix>,
capacity: usize,
str_capacities: &[RunningSize],
encoding: CsvEncoding,
Expand Down Expand Up @@ -835,7 +834,7 @@ fn read_chunk(
local_bytes,
offset,
separator,
comment_char,
comment_prefix,
quote_char,
eol_char,
missing_is_null,
Expand Down
Loading