Skip to content

Commit

Permalink
setup skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 28, 2022
1 parent b217c05 commit 06a54fd
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 38 deletions.
23 changes: 10 additions & 13 deletions polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#[cfg(feature = "dtype-categorical")]
use polars_core::toggle_string_cache;

use super::*;
use crate::csv::read_impl::BatchedCsvReader;

Expand Down Expand Up @@ -375,7 +372,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
// We only support a few dtypes in the parser and later cast to the required dtype
let mut to_cast = Vec::with_capacity(overwriting_schema.len());

let mut has_categorical = false;
let mut _has_categorical = false;

#[allow(clippy::unnecessary_filter_map)]
let fields: Vec<_> = overwriting_schema
Expand All @@ -396,20 +393,20 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
}
#[cfg(feature = "dtype-categorical")]
Categorical(_) => {
has_categorical = true;
_has_categorical = true;
Some(fld)
}
_ => Some(fld),
}
})
.collect();
let schema = Schema::from(fields);
(schema, to_cast, has_categorical)
(schema, to_cast, _has_categorical)
}

pub fn batched(&'a mut self) -> PolarsResult<BatchedCsvReader<'a>> {
if let Some(schema) = self.schema_overwrite {
let (schema, to_cast) = self.prepare_schema_overwrite(schema);
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
self.owned_schema = Some(Box::new(schema));

// safety
Expand All @@ -423,10 +420,10 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
};

let csv_reader = self.core_reader(schema, to_cast)?;
csv_reader.batched()
csv_reader.batched(has_cat)
} else {
let csv_reader = self.core_reader(self.schema, vec![])?;
csv_reader.batched()
csv_reader.batched(false)
}
}
}
Expand Down Expand Up @@ -479,14 +476,14 @@ where
let low_memory = self.low_memory;

#[cfg(feature = "dtype-categorical")]
let mut cat_lock = None;
let mut _cat_lock = None;

let mut df = if let Some(schema) = schema_overwrite {
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
let (schema, to_cast, _has_cat) = self.prepare_schema_overwrite(schema);

#[cfg(feature = "dtype-categorical")]
if has_cat {
cat_lock = Some(polars_core::IUseStringCache::new())
if _has_cat {
_cat_lock = Some(polars_core::IUseStringCache::new())
}

let mut csv_reader = self.core_reader(Some(&schema), to_cast)?;
Expand Down
18 changes: 17 additions & 1 deletion polars/polars-io/src/csv/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ impl<'a> CoreReader<'a> {
Ok(df)
}

pub fn batched(mut self) -> PolarsResult<BatchedCsvReader<'a>> {
pub fn batched(mut self, _has_cat: bool) -> PolarsResult<BatchedCsvReader<'a>> {
let mut n_threads = self.n_threads.unwrap_or_else(|| POOL.current_num_threads());
let reader_bytes = self.reader_bytes.take().unwrap();
let logging = std::env::var("POLARS_VERBOSE").is_ok();
Expand All @@ -721,6 +721,17 @@ impl<'a> CoreReader<'a> {
std::mem::transmute::<Vec<&str>, Vec<&'a str>>(self.get_string_columns(&projection)?)
};

// RAII structure that will ensure we maintain a global stringcache
#[cfg(feature = "dtype-categorical")]
let _cat_lock = if _has_cat {
Some(polars_core::IUseStringCache::new())
} else {
None
};

#[cfg(not(feature = "dtype-categorical"))]
let _cat_lock = None;

Ok(BatchedCsvReader {
reader_bytes,
chunk_size,
Expand All @@ -742,6 +753,7 @@ impl<'a> CoreReader<'a> {
delimiter: self.delimiter,
schema: self.schema,
rows_read: 0,
_cat_lock,
})
}
}
Expand All @@ -767,6 +779,10 @@ pub struct BatchedCsvReader<'a> {
delimiter: u8,
schema: Cow<'a, Schema>,
rows_read: IdxSize,
#[cfg(feature = "dtype-categorical")]
_cat_lock: Option<polars_core::IUseStringCache>,
#[cfg(not(feature = "dtype-categorical"))]
_cat_lock: Option<u8>,
}

impl<'a> BatchedCsvReader<'a> {
Expand Down
7 changes: 6 additions & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ arange = []
mode = ["polars-core/mode"]
cum_agg = ["polars-core/cum_agg"]
interpolate = ["polars-core/interpolate"]
rolling_window = ["polars-core/rolling_window", "polars-time/rolling_window", "polars-ops/rolling_window"]
rolling_window = [
"polars-core/rolling_window",
"polars-time/rolling_window",
"polars-ops/rolling_window",
"polars-time/rolling_window",
]
rank = ["polars-core/rank"]
diff = ["polars-core/diff", "polars-ops/diff"]
pct_change = ["polars-core/pct_change"]
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "rolling_window")]
pub(crate) use polars_time::chunkedarray::{RollingOptions, RollingOptionsImpl};
#[cfg(feature = "temporal")]
pub(crate) use polars_time::in_nanoseconds_window;
#[cfg(feature = "dynamic_groupby")]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-pipe/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
mod source;
mod operators;
6 changes: 6 additions & 0 deletions polars/polars-pipe/src/operators/chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use super::*;

pub struct DataChunk {
chunk_index: IdxSize,
data: DataFrame,
}
1 change: 1 addition & 0 deletions polars/polars-pipe/src/operators/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub struct PExecutionContext {}
12 changes: 12 additions & 0 deletions polars/polars-pipe/src/operators/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mod chunks;
mod context;
mod operator;
mod sink;
mod source;

use chunks::*;
use context::*;
use operator::*;
use polars_core::prelude::*;
use sink::*;
use source::*;
15 changes: 15 additions & 0 deletions polars/polars-pipe/src/operators/operator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use super::*;

enum OperatorResult {
NeedMoreInput(Option<DataChunk>),
HaveMoreOutPut(Option<DataChunk>),
Finished(Option<DataChunk>),
}

pub trait Operator {
fn execute(
&self,
context: &PExecutionContext,
chunk: DataChunk,
) -> PolarsResult<OperatorResult>;
}
10 changes: 10 additions & 0 deletions polars/polars-pipe/src/operators/sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use super::*;

pub enum SinkResult {
Finished,
NeedMoreInput,
}

pub trait Sink {
fn sink(context: &PExecutionContext, chunk: DataChunk) -> PolarsResult<SinkResult>;
}
10 changes: 10 additions & 0 deletions polars/polars-pipe/src/operators/source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use super::*;

pub enum SourceResult {
Finished,
GotMoreData(DataChunk),
}

pub trait Source {
fn get_batches(context: &PExecutionContext) -> PolarsResult<SourceResult>;
}
22 changes: 0 additions & 22 deletions polars/polars-pipe/src/source.rs

This file was deleted.

7 changes: 7 additions & 0 deletions polars/polars-time/src/chunkedarray/rolling_window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use polars_arrow::kernels::rolling;
use polars_arrow::prelude::QuantileInterpolOptions;
use polars_core::prelude::*;

#[cfg(feature = "rolling_window")]
use crate::prelude::*;
use crate::series::WrapFloat;

#[derive(Clone)]
#[cfg(feature = "rolling_window")]
pub struct RollingOptions {
/// The length of the window.
pub window_size: Duration,
Expand All @@ -38,6 +40,7 @@ pub struct RollingOptions {
pub closed_window: Option<ClosedWindow>,
}

#[cfg(feature = "rolling_window")]
impl Default for RollingOptions {
fn default() -> Self {
RollingOptions {
Expand All @@ -52,6 +55,7 @@ impl Default for RollingOptions {
}

#[derive(Clone)]
#[cfg(feature = "rolling_window")]
pub struct RollingOptionsImpl<'a> {
/// The length of the window.
pub window_size: Duration,
Expand All @@ -67,6 +71,7 @@ pub struct RollingOptionsImpl<'a> {
pub closed_window: Option<ClosedWindow>,
}

#[cfg(feature = "rolling_window")]
impl From<RollingOptions> for RollingOptionsImpl<'static> {
fn from(options: RollingOptions) -> Self {
let window_size = options.window_size;
Expand Down Expand Up @@ -105,6 +110,7 @@ impl From<RollingOptions> for RollingOptionsFixedWindow {
}
}

#[cfg(feature = "rolling_window")]
impl Default for RollingOptionsImpl<'static> {
fn default() -> Self {
RollingOptionsImpl {
Expand Down Expand Up @@ -185,6 +191,7 @@ pub trait RollingAgg {
/// A window of length `window_size` will traverse the array. The values that fill this window
/// will (optionally) be multiplied with the weights given by the `weights` vector. The resulting
/// values will be aggregated to their var.
#[cfg(feature = "rolling_window")]
fn rolling_var(&self, options: RollingOptionsImpl) -> PolarsResult<Series>;

/// Apply a rolling std (moving std) over the values in this array.
Expand Down

0 comments on commit 06a54fd

Please sign in to comment.