Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support decimal float parsing in CSV #15774

Merged
merged 2 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 67 additions & 4 deletions crates/polars-io/src/csv/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_time::chunkedarray::string::Pattern;
use polars_time::prelude::string::infer::{
infer_pattern_single, DatetimeInfer, StrpTimeParser, TryFromWithUnit,
};
use polars_utils::vec::PushUnchecked;

use crate::csv::parser::{is_whitespace, skip_whitespace};
use crate::csv::utils::escape_field;
Expand Down Expand Up @@ -485,6 +486,7 @@ pub(crate) fn init_buffers(
schema: &Schema,
quote_char: Option<u8>,
encoding: CsvEncoding,
decimal_float: bool,
) -> PolarsResult<Vec<Buffer>> {
projection
.iter()
Expand All @@ -504,8 +506,26 @@ pub(crate) fn init_buffers(
&DataType::UInt16 => Buffer::UInt16(PrimitiveChunkedBuilder::new(name, capacity)),
&DataType::UInt32 => Buffer::UInt32(PrimitiveChunkedBuilder::new(name, capacity)),
&DataType::UInt64 => Buffer::UInt64(PrimitiveChunkedBuilder::new(name, capacity)),
&DataType::Float32 => Buffer::Float32(PrimitiveChunkedBuilder::new(name, capacity)),
&DataType::Float64 => Buffer::Float64(PrimitiveChunkedBuilder::new(name, capacity)),
&DataType::Float32 => {
if decimal_float {
Buffer::DecimalFloat32(
PrimitiveChunkedBuilder::new(name, capacity),
Default::default(),
)
} else {
Buffer::Float32(PrimitiveChunkedBuilder::new(name, capacity))
}
},
&DataType::Float64 => {
if decimal_float {
Buffer::DecimalFloat64(
PrimitiveChunkedBuilder::new(name, capacity),
Default::default(),
)
} else {
Buffer::Float64(PrimitiveChunkedBuilder::new(name, capacity))
}
},
&DataType::String => {
Buffer::Utf8(Utf8Field::new(name, capacity, quote_char, encoding))
},
Expand Down Expand Up @@ -560,6 +580,8 @@ pub(crate) enum Buffer {
Date(DatetimeField<Int32Type>),
#[allow(dead_code)]
Categorical(CategoricalField),
DecimalFloat32(PrimitiveChunkedBuilder<Float32Type>, Vec<u8>),
DecimalFloat64(PrimitiveChunkedBuilder<Float64Type>, Vec<u8>),
}

impl Buffer {
Expand All @@ -580,6 +602,8 @@ impl Buffer {
Buffer::UInt64(v) => v.finish().into_series(),
Buffer::Float32(v) => v.finish().into_series(),
Buffer::Float64(v) => v.finish().into_series(),
Buffer::DecimalFloat32(v, _) => v.finish().into_series(),
Buffer::DecimalFloat64(v, _) => v.finish().into_series(),
#[cfg(feature = "dtype-datetime")]
Buffer::Datetime {
buf,
Expand Down Expand Up @@ -635,6 +659,8 @@ impl Buffer {
Buffer::UInt64(v) => v.append_null(),
Buffer::Float32(v) => v.append_null(),
Buffer::Float64(v) => v.append_null(),
Buffer::DecimalFloat32(v, _) => v.append_null(),
Buffer::DecimalFloat64(v, _) => v.append_null(),
Buffer::Utf8(v) => {
if valid {
v.mutable.push_value("")
Expand Down Expand Up @@ -675,8 +701,8 @@ impl Buffer {
Buffer::UInt16(_) => DataType::UInt16,
Buffer::UInt32(_) => DataType::UInt32,
Buffer::UInt64(_) => DataType::UInt64,
Buffer::Float32(_) => DataType::Float32,
Buffer::Float64(_) => DataType::Float64,
Buffer::Float32(_) | Buffer::DecimalFloat32(_, _) => DataType::Float32,
Buffer::Float64(_) | Buffer::DecimalFloat64(_, _) => DataType::Float64,
Buffer::Utf8(_) => DataType::String,
#[cfg(feature = "dtype-datetime")]
Buffer::Datetime { time_unit, .. } => DataType::Datetime(*time_unit, None),
Expand Down Expand Up @@ -798,6 +824,28 @@ impl Buffer {
missing_is_null,
None,
),
DecimalFloat32(buf, scratch) => {
prepare_decimal_float(bytes, scratch);
<PrimitiveChunkedBuilder<Float32Type> as ParsedBuffer>::parse_bytes(
buf,
scratch,
ignore_errors,
needs_escaping,
missing_is_null,
None,
)
},
DecimalFloat64(buf, scratch) => {
prepare_decimal_float(bytes, scratch);
<PrimitiveChunkedBuilder<Float64Type> as ParsedBuffer>::parse_bytes(
buf,
scratch,
ignore_errors,
needs_escaping,
missing_is_null,
None,
)
},
Utf8(buf) => <Utf8Field as ParsedBuffer>::parse_bytes(
buf,
bytes,
Expand Down Expand Up @@ -841,3 +889,18 @@ impl Buffer {
}
}
}

#[inline]
fn prepare_decimal_float(bytes: &[u8], scratch: &mut Vec<u8>) {
scratch.clear();
scratch.reserve(bytes.len());

// SAFETY: we pre-allocated.
for &byte in bytes {
if byte == b',' {
unsafe { scratch.push_unchecked(b'.') }
} else {
unsafe { scratch.push_unchecked(byte) }
}
}
}
11 changes: 11 additions & 0 deletions crates/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ where
has_header: bool,
ignore_errors: bool,
eol_char: u8,
decimal_float: bool,
}

impl<'a, R> CsvReader<'a, R>
Expand Down Expand Up @@ -371,6 +372,12 @@ where
self.truncate_ragged_lines = toggle;
self
}

/// Parse floats with decimals.
pub fn with_decimal_float(mut self, toggle: bool) -> Self {
self.decimal_float = toggle;
self
}
}

impl<'a> CsvReader<'a, File> {
Expand Down Expand Up @@ -422,6 +429,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
self.try_parse_dates,
self.raise_if_empty,
self.truncate_ragged_lines,
self.decimal_float,
)
}

Expand Down Expand Up @@ -529,6 +537,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
self.try_parse_dates,
self.raise_if_empty,
&mut self.n_threads,
self.decimal_float,
)?;
let schema = Arc::new(inferred_schema);
Ok(to_batched_owned_mmap(self, schema))
Expand Down Expand Up @@ -559,6 +568,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
self.try_parse_dates,
self.raise_if_empty,
&mut self.n_threads,
self.decimal_float,
)?;
let schema = Arc::new(inferred_schema);
Ok(to_batched_owned_read(self, schema))
Expand Down Expand Up @@ -604,6 +614,7 @@ where
row_index: None,
raise_if_empty: true,
truncate_ragged_lines: false,
decimal_float: false,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/csv/read_impl/batched_mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl<'a> CoreReader<'a> {
schema: self.schema,
rows_read: 0,
_cat_lock,
decimal_float: self.decimal_float,
})
}
}
Expand Down Expand Up @@ -193,6 +194,7 @@ pub struct BatchedCsvReaderMmap<'a> {
_cat_lock: Option<polars_core::StringCacheHolder>,
#[cfg(not(feature = "dtype-categorical"))]
_cat_lock: Option<u8>,
decimal_float: bool,
}

impl<'a> BatchedCsvReaderMmap<'a> {
Expand Down Expand Up @@ -243,6 +245,7 @@ impl<'a> BatchedCsvReaderMmap<'a> {
self.chunk_size,
stop_at_nbytes,
self.starting_point_offset,
self.decimal_float,
)?;

cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-io/src/csv/read_impl/batched_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl<'a> CoreReader<'a> {
schema: self.schema,
rows_read: 0,
_cat_lock,
decimal_float: self.decimal_float,
})
}
}
Expand Down Expand Up @@ -276,6 +277,7 @@ pub struct BatchedCsvReaderRead<'a> {
_cat_lock: Option<polars_core::StringCacheHolder>,
#[cfg(not(feature = "dtype-categorical"))]
_cat_lock: Option<u8>,
decimal_float: bool,
}
//
impl<'a> BatchedCsvReaderRead<'a> {
Expand Down Expand Up @@ -340,6 +342,7 @@ impl<'a> BatchedCsvReaderRead<'a> {
self.chunk_size,
stop_at_n_bytes,
self.starting_point_offset,
self.decimal_float,
)?;

cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
Expand Down
18 changes: 17 additions & 1 deletion crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub(crate) struct CoreReader<'a> {
sample_size: usize,
chunk_size: usize,
low_memory: bool,
decimal_float: bool,
comment_prefix: Option<CommentPrefix>,
quote_char: Option<u8>,
eol_char: u8,
Expand Down Expand Up @@ -159,7 +160,9 @@ impl<'a> CoreReader<'a> {
try_parse_dates: bool,
raise_if_empty: bool,
truncate_ragged_lines: bool,
decimal_float: bool,
) -> PolarsResult<CoreReader<'a>> {
check_decimal_float(decimal_float, separator.unwrap_or(b','))?;
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;

Expand Down Expand Up @@ -206,6 +209,7 @@ impl<'a> CoreReader<'a> {
try_parse_dates,
raise_if_empty,
&mut n_threads,
decimal_float,
)?;
Arc::new(inferred_schema)
},
Expand Down Expand Up @@ -260,6 +264,7 @@ impl<'a> CoreReader<'a> {
to_cast,
row_index,
truncate_ragged_lines,
decimal_float,
})
}

Expand Down Expand Up @@ -505,6 +510,7 @@ impl<'a> CoreReader<'a> {
schema,
self.quote_char,
self.encoding,
self.decimal_float,
)?;

let local_bytes = &bytes[read..stop_at_nbytes];
Expand Down Expand Up @@ -589,6 +595,7 @@ impl<'a> CoreReader<'a> {
usize::MAX,
stop_at_nbytes,
starting_point_offset,
self.decimal_float,
)?;

cast_columns(&mut df, &self.to_cast, false, self.ignore_errors)?;
Expand All @@ -612,6 +619,7 @@ impl<'a> CoreReader<'a> {
self.schema.as_ref(),
self.quote_char,
self.encoding,
self.decimal_float,
)?;

parse_lines(
Expand Down Expand Up @@ -694,14 +702,22 @@ fn read_chunk(
chunk_size: usize,
stop_at_nbytes: usize,
starting_point_offset: Option<usize>,
decimal_float: bool,
) -> PolarsResult<DataFrame> {
let mut read = bytes_offset_thread;
// There's an off-by-one error somewhere in the reading code, where it reads
// one more item than the requested capacity. Given the batch sizes are
// approximate (sometimes they're smaller), this isn't broken, but it does
// mean a bunch of extra allocation and copying. So we allocate a
// larger-by-one buffer so the size is more likely to be accurate.
let mut buffers = init_buffers(projection, capacity + 1, schema, quote_char, encoding)?;
let mut buffers = init_buffers(
projection,
capacity + 1,
schema,
quote_char,
encoding,
decimal_float,
)?;

let mut last_read = usize::MAX;
loop {
Expand Down
Loading
Loading