Skip to content

Commit

Permalink
setup skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 28, 2022
1 parent b217c05 commit 73c2007
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 14 deletions.
23 changes: 10 additions & 13 deletions polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#[cfg(feature = "dtype-categorical")]
use polars_core::toggle_string_cache;

use super::*;
use crate::csv::read_impl::BatchedCsvReader;

Expand Down Expand Up @@ -375,7 +372,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
// We only support a few dtypes in the parser and later cast to the required dtype
let mut to_cast = Vec::with_capacity(overwriting_schema.len());

let mut has_categorical = false;
let mut _has_categorical = false;

#[allow(clippy::unnecessary_filter_map)]
let fields: Vec<_> = overwriting_schema
Expand All @@ -396,20 +393,20 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
}
#[cfg(feature = "dtype-categorical")]
Categorical(_) => {
has_categorical = true;
_has_categorical = true;
Some(fld)
}
_ => Some(fld),
}
})
.collect();
let schema = Schema::from(fields);
(schema, to_cast, has_categorical)
(schema, to_cast, _has_categorical)
}

pub fn batched(&'a mut self) -> PolarsResult<BatchedCsvReader<'a>> {
if let Some(schema) = self.schema_overwrite {
let (schema, to_cast) = self.prepare_schema_overwrite(schema);
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
self.owned_schema = Some(Box::new(schema));

// safety
Expand All @@ -423,10 +420,10 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
};

let csv_reader = self.core_reader(schema, to_cast)?;
csv_reader.batched()
csv_reader.batched(has_cat)
} else {
let csv_reader = self.core_reader(self.schema, vec![])?;
csv_reader.batched()
csv_reader.batched(false)
}
}
}
Expand Down Expand Up @@ -479,14 +476,14 @@ where
let low_memory = self.low_memory;

#[cfg(feature = "dtype-categorical")]
let mut cat_lock = None;
let mut _cat_lock = None;

let mut df = if let Some(schema) = schema_overwrite {
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
let (schema, to_cast, _has_cat) = self.prepare_schema_overwrite(schema);

#[cfg(feature = "dtype-categorical")]
if has_cat {
cat_lock = Some(polars_core::IUseStringCache::new())
if _has_cat {
_cat_lock = Some(polars_core::IUseStringCache::new())
}

let mut csv_reader = self.core_reader(Some(&schema), to_cast)?;
Expand Down
18 changes: 17 additions & 1 deletion polars/polars-io/src/csv/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ impl<'a> CoreReader<'a> {
Ok(df)
}

pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
pub fn batched(mut self, _has_cat: bool) -> PolarsResult<BatchedCsvReader<'a>> {
let mut n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
let reader_bytes = self.reader_bytes.take().unwrap();
let logging = std::env::var("POLARS_VERBOSE").is_ok();
Expand All @@ -721,6 +721,17 @@ impl<'a> CoreReader<'a> {
std::mem::transmute::<Vec<&str>, Vec<&'a str>>(self.get_string_columns(&projection)?)
};

// RAII structure that will ensure we maintain a global stringcache
#[cfg(feature = "dtype-categorical")]
let _cat_lock = if _has_cat {
Some(polars_core::IUseStringCache::new())
} else {
None
};

#[cfg(not(feature = "dtype-categorical"))]
let _cat_lock = None;

Ok(BatchedCsvReader {
reader_bytes,
chunk_size,
Expand All @@ -742,6 +753,7 @@ impl<'a> CoreReader<'a> {
delimiter: self.delimiter,
schema: self.schema,
rows_read: 0,
_cat_lock,
})
}
}
Expand All @@ -767,6 +779,10 @@ pub struct BatchedCsvReader<'a> {
delimiter: u8,
schema: Cow<'a, Schema>,
rows_read: IdxSize,
#[cfg(feature = "dtype-categorical")]
_cat_lock: Option<polars_core::IUseStringCache>,
#[cfg(not(feature = "dtype-categorical"))]
_cat_lock: Option<u8>,
}

impl<'a> BatchedCsvReader<'a> {
Expand Down
1 change: 1 addition & 0 deletions polars/polars-pipe/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
mod operators;
mod source;
6 changes: 6 additions & 0 deletions polars/polars-pipe/src/operators/chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use super::*;

pub struct DataChunk {
chunk_index: IdxSize,
data: DataFrame,
}
1 change: 1 addition & 0 deletions polars/polars-pipe/src/operators/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub struct PExecutionContext {}
12 changes: 12 additions & 0 deletions polars/polars-pipe/src/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mod chunks;
mod context;
mod operator;
mod sink;
mod source;

use chunks::*;
use context::*;
use operator::*;
use polars_core::prelude::*;
use sink::*;
use source::*;
15 changes: 15 additions & 0 deletions polars/polars-pipe/src/operators/operator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use super::*;

enum OperatorResult {
NeedMoreInput(Option<DataChunk>),
HaveMoreOutPut(Option<DataChunk>),
Finished(Option<DataChunk>),
}

pub trait Operator {
fn execute(
&self,
context: &PExecutionContext,
chunk: DataChunk,
) -> PolarsResult<OperatorResult>;
}
10 changes: 10 additions & 0 deletions polars/polars-pipe/src/operators/sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use super::*;

pub enum SinkResult {
Finished,
NeedMoreInput,
}

pub trait Sink {
fn sink(context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult>;
}
10 changes: 10 additions & 0 deletions polars/polars-pipe/src/operators/source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use super::*;

pub enum SourceResult {
Finished,
GotMoreData(DataChunk),
}

pub trait Source {
fn get_batches(context: &PExecutionContext) -> PolarsResult<SourceResult>;
}

0 comments on commit 73c2007

Please sign in to comment.