diff --git a/polars/polars-io/src/csv/read.rs b/polars/polars-io/src/csv/read.rs index 3f4a7a2eb48da..80a6c41d1130f 100644 --- a/polars/polars-io/src/csv/read.rs +++ b/polars/polars-io/src/csv/read.rs @@ -1,6 +1,3 @@ -#[cfg(feature = "dtype-categorical")] -use polars_core::toggle_string_cache; - use super::*; use crate::csv::read_impl::BatchedCsvReader; @@ -375,7 +372,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { // We only support a few dtypes in the parser and later cast to the required dtype let mut to_cast = Vec::with_capacity(overwriting_schema.len()); - let mut has_categorical = false; + let mut _has_categorical = false; #[allow(clippy::unnecessary_filter_map)] let fields: Vec<_> = overwriting_schema @@ -396,7 +393,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { } #[cfg(feature = "dtype-categorical")] Categorical(_) => { - has_categorical = true; + _has_categorical = true; Some(fld) } _ => Some(fld), @@ -404,12 +401,12 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { }) .collect(); let schema = Schema::from(fields); - (schema, to_cast, has_categorical) + (schema, to_cast, _has_categorical) } pub fn batched(&'a mut self) -> PolarsResult> { if let Some(schema) = self.schema_overwrite { - let (schema, to_cast) = self.prepare_schema_overwrite(schema); + let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema); self.owned_schema = Some(Box::new(schema)); // safety @@ -423,10 +420,10 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> { }; let csv_reader = self.core_reader(schema, to_cast)?; - csv_reader.batched() + csv_reader.batched(has_cat) } else { let csv_reader = self.core_reader(self.schema, vec![])?; - csv_reader.batched() + csv_reader.batched(false) } } } @@ -479,14 +476,14 @@ where let low_memory = self.low_memory; #[cfg(feature = "dtype-categorical")] - let mut cat_lock = None; + let mut _cat_lock = None; let mut df = if let Some(schema) = schema_overwrite { - let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema); + let (schema, to_cast, _has_cat) = self.prepare_schema_overwrite(schema); #[cfg(feature = "dtype-categorical")] - if has_cat { - cat_lock = Some(polars_core::IUseStringCache::new()) + if _has_cat { + _cat_lock = Some(polars_core::IUseStringCache::new()) } let mut csv_reader = self.core_reader(Some(&schema), to_cast)?; diff --git a/polars/polars-io/src/csv/read_impl.rs b/polars/polars-io/src/csv/read_impl.rs index 772a9f9a1b147..26bba417dfaef 100644 --- a/polars/polars-io/src/csv/read_impl.rs +++ b/polars/polars-io/src/csv/read_impl.rs @@ -706,7 +706,7 @@ impl<'a> CoreReader<'a> { Ok(df) } - pub fn batched(mut self) -> PolarsResult> { + pub fn batched(mut self, _has_cat: bool) -> PolarsResult> { let mut n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads()); let reader_bytes = self.reader_bytes.take().unwrap(); let logging = std::env::var("POLARS_VERBOSE").is_ok(); @@ -721,6 +721,17 @@ impl<'a> CoreReader<'a> { std::mem::transmute::, Vec<&'a str>>(self.get_string_columns(&projection)?) }; + // RAII structure that will ensure we maintain a global stringcache + #[cfg(feature = "dtype-categorical")] + let _cat_lock = if _has_cat { + Some(polars_core::IUseStringCache::new()) + } else { + None + }; + + #[cfg(not(feature = "dtype-categorical"))] + let _cat_lock = None; + Ok(BatchedCsvReader { reader_bytes, chunk_size, @@ -742,6 +753,7 @@ impl<'a> CoreReader<'a> { delimiter: self.delimiter, schema: self.schema, rows_read: 0, + _cat_lock, }) } } @@ -767,6 +779,10 @@ pub struct BatchedCsvReader<'a> { delimiter: u8, schema: Cow<'a, Schema>, rows_read: IdxSize, + #[cfg(feature = "dtype-categorical")] + _cat_lock: Option, + #[cfg(not(feature = "dtype-categorical"))] + _cat_lock: Option, } impl<'a> BatchedCsvReader<'a> { diff --git a/polars/polars-pipe/src/lib.rs b/polars/polars-pipe/src/lib.rs index 2edd6151b4401..be450db680c63 100644 --- a/polars/polars-pipe/src/lib.rs +++ b/polars/polars-pipe/src/lib.rs @@ -1 +1 @@ -mod source; +mod operators; diff --git a/polars/polars-pipe/src/operators/chunks.rs b/polars/polars-pipe/src/operators/chunks.rs new file mode 100644 index 0000000000000..7de3d425bf8b0 --- /dev/null +++ b/polars/polars-pipe/src/operators/chunks.rs @@ -0,0 +1,6 @@ +use super::*; + +pub struct DataChunk { + chunk_index: IdxSize, + data: DataFrame, +} diff --git a/polars/polars-pipe/src/operators/context.rs b/polars/polars-pipe/src/operators/context.rs new file mode 100644 index 0000000000000..ea85335ff5ff1 --- /dev/null +++ b/polars/polars-pipe/src/operators/context.rs @@ -0,0 +1 @@ +pub struct PExecutionContext {} diff --git a/polars/polars-pipe/src/operators/mod.rs b/polars/polars-pipe/src/operators/mod.rs new file mode 100644 index 0000000000000..2e2cfc4b1578f --- /dev/null +++ b/polars/polars-pipe/src/operators/mod.rs @@ -0,0 +1,12 @@ +mod chunks; +mod context; +mod operator; +mod sink; +mod source; + +use chunks::*; +use context::*; +use operator::*; +use polars_core::prelude::*; +use sink::*; +use source::*; diff --git a/polars/polars-pipe/src/operators/operator.rs b/polars/polars-pipe/src/operators/operator.rs new file mode 100644 index 0000000000000..c1561f9c44426 --- /dev/null +++ b/polars/polars-pipe/src/operators/operator.rs @@ -0,0 +1,15 @@ +use super::*; + +enum OperatorResult { + NeedMoreInput(Option), + HaveMoreOutPut(Option), + Finished(Option), +} + +pub trait Operator { + fn execute( + &self, + context: &PExecutionContext, + chunk: DataChunk, + ) -> PolarsResult; +} diff --git a/polars/polars-pipe/src/operators/sink.rs b/polars/polars-pipe/src/operators/sink.rs new file mode 100644 index 0000000000000..2366e66a90885 --- /dev/null +++ b/polars/polars-pipe/src/operators/sink.rs @@ -0,0 +1,10 @@ +use super::*; + +pub enum SinkResult { + Finished, + NeedMoreInput, +} + +pub trait Sink { + fn sink(context: &PExecutionContext, chunk: DataChunk) -> PolarsResult; +} diff --git a/polars/polars-pipe/src/operators/source.rs b/polars/polars-pipe/src/operators/source.rs new file mode 100644 index 0000000000000..e9bb232502c96 --- /dev/null +++ b/polars/polars-pipe/src/operators/source.rs @@ -0,0 +1,10 @@ +use super::*; + +pub enum SourceResult { + Finished, + GotMoreData(DataChunk), +} + +pub trait Source { + fn get_batches(context: &PExecutionContext) -> PolarsResult; +} diff --git a/polars/polars-pipe/src/source.rs b/polars/polars-pipe/src/source.rs deleted file mode 100644 index 900a9f8f9cc3c..0000000000000 --- a/polars/polars-pipe/src/source.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::path::PathBuf; - -use polars_core::prelude::*; -use polars_lazy::prelude::*; -use rayon::prelude::*; - -enum SourceState { - Finished, - HasMore(DataFrame), -} - -trait Source { - fn next() -> SourceState; -} - -struct CsvSource { - pub path: PathBuf, - pub schema: SchemaRef, - pub options: CsvParserOptions, -} - -impl Source for CsvSource {}