Skip to content

Commit

Permalink
wrap up
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 17, 2024
1 parent 7065ec8 commit 432b648
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 152 deletions.
9 changes: 0 additions & 9 deletions crates/polars-plan/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,15 +407,6 @@ impl DslPlan {
self.write_dot(acc_str, prev_node, current_node, id_map)?;
input.dot(acc_str, (branch, id + 1), current_node, id_map)
},
Error { err, .. } => {
let fmt = format!("{:?}", &err.0);
let current_node = DotNode {
branch,
id,
fmt: &fmt,
};
self.write_dot(acc_str, prev_node, current_node, id_map)
},
}
}

Expand Down
22 changes: 0 additions & 22 deletions crates/polars-plan/src/logical_plan/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,14 +451,6 @@ impl DslBuilder {
.into()
}

pub fn add_err(self, err: PolarsError) -> Self {
DslPlan::Error {
input: Arc::new(self.0),
err: err.into(),
}
.into()
}

pub fn with_context(self, contexts: Vec<DslPlan>) -> Self {
DslPlan::ExtContext {
input: Arc::new(self.0),
Expand Down Expand Up @@ -582,20 +574,6 @@ impl DslBuilder {
right_on: Vec<Expr>,
options: Arc<JoinOptions>,
) -> Self {
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
return DslPlan::Error {
input: Arc::new(self.0),
err: polars_err!(
ComputeError:
"'alias' is not allowed in a join key, use 'with_columns' first",
)
.into(),
}
.into();
}
}

DslPlan::Join {
input_left: Arc::new(self.0),
input_right: Arc::new(other),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ fn expand_expressions(
Ok(to_expr_irs(exprs, expr_arena))
}

fn empty_df() -> IR {
IR::DataFrameScan {
df: Arc::new(Default::default()),
schema: Arc::new(Default::default()),
output_schema: None,
projection: None,
selection: None,
}
}

/// converts LogicalPlan to IR
/// it adds expressions & lps to the respective arenas as it traverses the plan
/// finally it returns the top node of the logical plan
Expand All @@ -33,7 +43,7 @@ pub fn to_alp(
} => {
if let Some(row_index) = &file_options.row_index {
let schema = Arc::make_mut(&mut file_info.schema);
schema
*schema = schema
.new_inserting_at_index(0, row_index.name.as_str().into(), IDX_DTYPE)
.unwrap();
}
Expand Down Expand Up @@ -107,8 +117,7 @@ pub fn to_alp(
let (exprs, schema) = prepare_projection(expr, &schema)?;

if exprs.is_empty() {
// Ensure that input will be the `Default` impl which is an empty DF.
let _ = lp_arena.take(input);
lp_arena.replace(input, empty_df());
}

let schema = Arc::new(schema);
Expand Down Expand Up @@ -184,6 +193,15 @@ pub fn to_alp(
right_on,
options,
} => {
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
polars_bail!(
ComputeError:
"'alias' is not allowed in a join key, use 'with_columns' first",
)
}
}

let input_left = to_alp(owned(input_left), expr_arena, lp_arena)?;
let input_right = to_alp(owned(input_right), expr_arena, lp_arena)?;

Expand Down Expand Up @@ -225,11 +243,11 @@ pub fn to_alp(
},
DslPlan::MapFunction { input, function } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let schema = lp_arena.get(input).schema(lp_arena);
let input_schema = lp_arena.get(input).schema(lp_arena);

match function {
DslFunction::FillNan(fill_value) => {
let exprs = schema
let exprs = input_schema
.iter()
.filter_map(|(name, dtype)| match dtype {
DataType::Float32 | DataType::Float64 => {
Expand Down Expand Up @@ -260,22 +278,24 @@ pub fn to_alp(
.collect::<Vec<_>>(),
),
}?;
let predicate = rewrite_projections(vec![predicate], &input_schema, &[])?
.pop()
.unwrap();
let predicate = to_expr_ir(predicate, expr_arena);
IR::Filter { predicate, input }
},
DslFunction::Drop(to_drop) => {
let mut output_schema =
Schema::with_capacity(schema.len().saturating_sub(to_drop.len()));
Schema::with_capacity(input_schema.len().saturating_sub(to_drop.len()));

for (col_name, dtype) in schema.iter() {
for (col_name, dtype) in input_schema.iter() {
if !to_drop.contains(col_name.as_str()) {
output_schema.with_column(col_name.clone(), dtype.clone());
}
}

if output_schema.is_empty() {
// Ensure that input will be the `Default` impl which is an empty DF.
let _ = lp_arena.take(input);
lp_arena.replace(input, empty_df());
}

IR::SimpleProjection {
Expand All @@ -289,17 +309,17 @@ pub fn to_alp(
StatsFunction::Var { ddof } => stats_helper(
|dt| dt.is_numeric() || dt.is_bool(),
|name| col(name).var(ddof),
&schema,
&input_schema,
),
StatsFunction::Std { ddof } => stats_helper(
|dt| dt.is_numeric() || dt.is_bool(),
|name| col(name).std(ddof),
&schema,
&input_schema,
),
StatsFunction::Quantile { quantile, interpol } => stats_helper(
|dt| dt.is_numeric(),
|name| col(name).quantile(quantile.clone(), interpol),
&schema,
&input_schema,
),
StatsFunction::Mean => stats_helper(
|dt| {
Expand All @@ -313,7 +333,7 @@ pub fn to_alp(
)
},
|name| col(name).mean(),
&schema,
&input_schema,
),
StatsFunction::Sum => stats_helper(
|dt| {
Expand All @@ -322,13 +342,13 @@ pub fn to_alp(
|| matches!(dt, DataType::Boolean | DataType::Duration(_))
},
|name| col(name).sum(),
&schema,
&input_schema,
),
StatsFunction::Min => {
stats_helper(|dt| dt.is_ord(), |name| col(name).min(), &schema)
stats_helper(|dt| dt.is_ord(), |name| col(name).min(), &input_schema)
},
StatsFunction::Max => {
stats_helper(|dt| dt.is_ord(), |name| col(name).max(), &schema)
stats_helper(|dt| dt.is_ord(), |name| col(name).max(), &input_schema)
},
StatsFunction::Median => stats_helper(
|dt| {
Expand All @@ -342,11 +362,14 @@ pub fn to_alp(
)
},
|name| col(name).median(),
&schema,
&input_schema,
),
};
let schema =
Arc::new(expressions_to_schema(&exprs, &schema, Context::Default)?);
let schema = Arc::new(expressions_to_schema(
&exprs,
&input_schema,
Context::Default,
)?);
let eirs = to_expr_irs(exprs, expr_arena);
let expr = eirs.into();
IR::Select {
Expand All @@ -360,16 +383,11 @@ pub fn to_alp(
}
},
_ => {
let function = function.into_function_node(&schema)?;
let function = function.into_function_node(&input_schema)?;
IR::MapFunction { input, function }
},
}
},
DslPlan::Error { err, .. } => {
// We just take the error. The LogicalPlan should not be used anymore once this
// is taken.
return Err(err.take());
},
DslPlan::ExtContext { input, contexts } => {
let input = to_alp(owned(input), expr_arena, lp_arena)?;
let contexts = contexts
Expand Down Expand Up @@ -502,7 +520,7 @@ fn resolve_group_by(
let current_schema = lp_arena.get(input).schema(lp_arena);
let current_schema = current_schema.as_ref();
let keys = rewrite_projections(keys, current_schema, &[])?;
let aggs = rewrite_projections(aggs, current_schema, &[])?;
let aggs = rewrite_projections(aggs, current_schema, &keys)?;

// Initialize schema from keys
let mut schema = expressions_to_schema(&keys, current_schema, Context::Default)?;
Expand Down
1 change: 0 additions & 1 deletion crates/polars-plan/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ impl DslPlan {
write!(f, "{:indent$}{function_fmt}", "")?;
input._format(f, sub_indent)
},
Error { err, .. } => write!(f, "{err:?}"),
ExtContext { input, .. } => {
write!(f, "{:indent$}EXTERNAL_CONTEXT", "")?;
input._format(f, sub_indent)
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-plan/src/logical_plan/functions/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ pub enum StatsFunction {
}

impl DslFunction {
pub(crate) fn into_function_node(self, schema: &Schema) -> PolarsResult<FunctionNode> {
pub(crate) fn into_function_node(self, input_schema: &Schema) -> PolarsResult<FunctionNode> {
let function = match self {
DslFunction::Explode { columns } => {
let columns = rewrite_projections(columns, schema, &[])?;
let columns = rewrite_projections(columns, input_schema, &[])?;
// columns to string
let columns = columns
.iter()
Expand Down Expand Up @@ -79,11 +79,11 @@ impl DslFunction {
schema: Default::default(),
},
DslFunction::Rename { existing, new } => {
let swapping = new.iter().any(|name| schema.get(name).is_some());
let swapping = new.iter().any(|name| input_schema.get(name).is_some());

// Check if the name exists.
for name in new.iter() {
let _ = schema.try_get(name)?;
for name in existing.iter() {
let _ = input_schema.try_get(name)?;
}

FunctionNode::Rename {
Expand Down
71 changes: 1 addition & 70 deletions crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Debug;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use polars_core::prelude::*;
use recursive::recursive;
Expand Down Expand Up @@ -66,68 +66,6 @@ pub enum Context {
Default,
}

#[derive(Debug)]
pub(crate) struct ErrorStateUnsync {
n_times: usize,
err: PolarsError,
}

#[derive(Clone)]
pub struct ErrorState(pub(crate) Arc<Mutex<ErrorStateUnsync>>);

impl std::fmt::Debug for ErrorState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let this = self.0.lock().unwrap();
// Skip over the Arc<Mutex<ErrorStateUnsync>> and just print the fields we care
// about. Technically this is misleading, but the insides of ErrorState are not
// public, so this only affects authors of polars, not users (and the odds that
// this affects authors is slim)
f.debug_struct("ErrorState")
.field("n_times", &this.n_times)
.field("err", &this.err)
.finish()
}
}

impl From<PolarsError> for ErrorState {
fn from(err: PolarsError) -> Self {
Self(Arc::new(Mutex::new(ErrorStateUnsync { n_times: 0, err })))
}
}

impl ErrorState {
fn take(&self) -> PolarsError {
let mut this = self.0.lock().unwrap();

let ret_err = if this.n_times == 0 {
this.err.wrap_msg(&|msg| msg.to_owned())
} else {
this.err.wrap_msg(&|msg| {
let n_times = this.n_times;

let plural_s;
let was_were;

if n_times == 1 {
plural_s = "";
was_were = "was"
} else {
plural_s = "s";
was_were = "were";
};
format!(
"{msg}\n\nLogicalPlan had already failed with the above error; \
after failure, {n_times} additional operation{plural_s} \
{was_were} attempted on the LazyFrame",
)
})
};
this.n_times += 1;

ret_err
}
}

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum DslPlan {
Expand Down Expand Up @@ -224,12 +162,6 @@ pub enum DslPlan {
schema: SchemaRef,
options: HConcatOptions,
},
/// Catches errors and throws them later
#[cfg_attr(feature = "serde", serde(skip))]
Error {
input: Arc<DslPlan>,
err: ErrorState,
},
/// This allows expressions to access other tables
ExtContext {
input: Arc<DslPlan>,
Expand Down Expand Up @@ -265,7 +197,6 @@ impl Clone for DslPlan {
Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() },
Self::Union { inputs, options } => Self::Union { inputs: inputs.clone(), options: options.clone() },
Self::HConcat { inputs, schema, options } => Self::HConcat { inputs: inputs.clone(), schema: schema.clone(), options: options.clone() },
Self::Error { input, err } => Self::Error { input: input.clone(), err: err.clone() },
Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() },
Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() },
}
Expand Down
1 change: 0 additions & 1 deletion crates/polars-plan/src/logical_plan/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ impl<'a> TreeFmtNode<'a> {
NL(h, MapFunction { input, function }) => {
ND(wh(h, &format!("{function}")), vec![NL(None, input)])
},
NL(h, Error { input, err }) => ND(wh(h, &format!("{err:?}")), vec![NL(None, input)]),
NL(h, ExtContext { input, .. }) => ND(wh(h, "EXTERNAL_CONTEXT"), vec![NL(None, input)]),
NL(h, Sink { input, payload }) => ND(
wh(
Expand Down
9 changes: 2 additions & 7 deletions py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -980,14 +980,9 @@ impl PyLazyFrame {
out.into()
}

fn quantile(
&self,
quantile: PyExpr,
interpolation: Wrap<QuantileInterpolOptions>,
) -> Self {
fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileInterpolOptions>) -> Self {
let ldf = self.ldf.clone();
let out = ldf
.quantile(quantile.inner, interpolation.0);
let out = ldf.quantile(quantile.inner, interpolation.0);
out.into()
}

Expand Down
Loading

0 comments on commit 432b648

Please sign in to comment.