Skip to content

Commit

Permalink
feat(rust, python): show expression where error originated if raised … (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and zundertj committed Jan 7, 2023
1 parent 556ef2b commit af165fe
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 75 deletions.
11 changes: 11 additions & 0 deletions polars/polars-lazy/src/physical_plan/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
macro_rules! expression_err {
($msg:expr, $source:expr, $error:ident) => {{
let msg = format!(
"{}\n\n> Error originated in expression: '{:?}'",
$msg, $source
);
PolarsError::$error(msg.into())
}};
}

pub(super) use expression_err;
29 changes: 16 additions & 13 deletions polars/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use polars_io::predicates::StatsEvaluator;
use polars_plan::dsl::FunctionExpr;
use rayon::prelude::*;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

Expand Down Expand Up @@ -80,9 +81,10 @@ fn all_unit_length(ca: &ListChunked) -> bool {
(offset[offset.len() - 1] as usize) == list_arr.len() as usize
}

fn check_map_output_len(input_len: usize, output_len: usize) -> PolarsResult<()> {
fn check_map_output_len(input_len: usize, output_len: usize, expr: &Expr) -> PolarsResult<()> {
if input_len != output_len {
Err(PolarsError::ComputeError("A 'map' functions output length must be equal to that of the input length. Consider using 'apply' in favor of 'map'.".into()))
let msg = "A 'map' functions output length must be equal to that of the input length. Consider using 'apply' in favor of 'map'.";
Err(expression_err!(msg, expr, ComputeError))
} else {
Ok(())
}
Expand Down Expand Up @@ -131,13 +133,11 @@ impl PhysicalExpr for ApplyExpr {
let s = ac.series();

if matches!(ac.agg_state(), AggState::AggregatedFlat(_)) {
return Err(PolarsError::ComputeError(
format!(
"Cannot aggregate {:?}. The column is already aggregated.",
self.expr
)
.into(),
));
let msg = format!(
"Cannot aggregate {:?}. The column is already aggregated.",
self.expr
);
return Err(expression_err!(msg, self.expr, ComputeError));
}

// collection of empty list leads to a null dtype
Expand Down Expand Up @@ -194,7 +194,7 @@ impl PhysicalExpr for ApplyExpr {
let input_len = input.len();
let s = self.function.call_udf(&mut [input])?;

check_map_output_len(input_len, s.len())?;
check_map_output_len(input_len, s.len(), &self.expr)?;
ac.with_series(s, false);

if set_update_groups {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl PhysicalExpr for ApplyExpr {
self.collect_groups,
acs[0].agg_state(),
) {
apply_multiple_flat(acs, self.function.as_ref())
apply_multiple_flat(acs, self.function.as_ref(), &self.expr)
} else {
let mut container = vec![Default::default(); acs.len()];
let name = acs[0].series().name().to_string();
Expand Down Expand Up @@ -272,7 +272,9 @@ impl PhysicalExpr for ApplyExpr {
Ok(ac)
}
}
(_, ApplyOptions::ApplyFlat) => apply_multiple_flat(acs, self.function.as_ref()),
(_, ApplyOptions::ApplyFlat) => {
apply_multiple_flat(acs, self.function.as_ref(), &self.expr)
}
}
}
}
Expand Down Expand Up @@ -308,6 +310,7 @@ impl PhysicalExpr for ApplyExpr {
fn apply_multiple_flat<'a>(
mut acs: Vec<AggregationContext<'a>>,
function: &dyn SeriesUdf,
expr: &Expr,
) -> PolarsResult<AggregationContext<'a>> {
let mut s = acs
.iter_mut()
Expand All @@ -324,7 +327,7 @@ fn apply_multiple_flat<'a>(

let input_len = s[0].len();
let s = function.call_udf(&mut s)?;
check_map_output_len(input_len, s.len())?;
check_map_output_len(input_len, s.len(), expr)?;

// take the first aggregation context that as that is the input series
let mut ac = acs.swap_remove(0);
Expand Down
8 changes: 3 additions & 5 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::series::unstable::UnstableSeries;
use polars_core::POOL;
use rayon::prelude::*;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::{ExecutionState, StateFlags};
use crate::prelude::*;

Expand Down Expand Up @@ -124,11 +125,8 @@ impl PhysicalExpr for BinaryExpr {
let mut ac_r = result_b?;

if !ac_l.can_combine(&ac_r) {
return Err(PolarsError::InvalidOperation(
"\
cannot combine this binary expression, the groups do not match"
.into(),
));
let msg = "Cannot combine this binary expression, the groups do not match.";
return Err(expression_err!(msg, self.expr, InvalidOperation));
}

match (
Expand Down
69 changes: 38 additions & 31 deletions polars/polars-lazy/src/physical_plan/expressions/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::POOL;
use rayon::prelude::*;
use AnyValue::Null;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

Expand All @@ -17,50 +18,56 @@ pub struct SliceExpr {
pub(crate) expr: Expr,
}

fn extract_offset(offset: &Series) -> PolarsResult<i64> {
fn extract_offset(offset: &Series, expr: &Expr) -> PolarsResult<i64> {
if offset.len() > 1 {
return Err(PolarsError::ComputeError(format!("Invalid argument to slice; expected an offset literal but got a Series of length {}", offset.len()).into()));
let msg = format!(
"Invalid argument to slice; expected an offset literal but got a Series of length {}.",
offset.len()
);
return Err(expression_err!(msg, expr, ComputeError));
}
offset.get(0).extract::<i64>().ok_or_else(|| {
PolarsError::ComputeError(format!("could not get an offset from {:?}", offset).into())
})
}

fn extract_length(length: &Series) -> PolarsResult<usize> {
fn extract_length(length: &Series, expr: &Expr) -> PolarsResult<usize> {
if length.len() > 1 {
return Err(PolarsError::ComputeError(format!("Invalid argument to slice; expected a length literal but got a Series of length {}", length.len()).into()));
let msg = format!(
"Invalid argument to slice; expected a length literal but got a Series of length {}.",
length.len()
);
return Err(expression_err!(msg, expr, ComputeError));
}
match length.get(0) {
Null => Ok(usize::MAX),
v => v.extract::<usize>().ok_or_else(|| {
PolarsError::ComputeError(format!("could not get a length from {:?}", length).into())
let msg = format!("Could not get a length from {:?}.", length);
expression_err!(msg, expr, ComputeError)
}),
}
}

fn extract_args(offset: &Series, length: &Series) -> PolarsResult<(i64, usize)> {
Ok((extract_offset(offset)?, extract_length(length)?))
fn extract_args(offset: &Series, length: &Series, expr: &Expr) -> PolarsResult<(i64, usize)> {
Ok((extract_offset(offset, expr)?, extract_length(length, expr)?))
}

fn check_argument(arg: &Series, groups: &GroupsProxy, name: &str) -> PolarsResult<()> {
fn check_argument(arg: &Series, groups: &GroupsProxy, name: &str, expr: &Expr) -> PolarsResult<()> {
if let DataType::List(_) = arg.dtype() {
Err(PolarsError::ComputeError(
format!(
"Invalid slice argument: cannot use an array as {} argument",
name
)
.into(),
))
let msg = format!(
"Invalid slice argument: cannot use an array as {} argument.",
name
);
Err(expression_err!(msg, expr, ComputeError))
} else if arg.len() != groups.len() {
Err(PolarsError::ComputeError(format!("Invalid slice argument: the evaluated length expression was of different {} than the number of groups", name).into()))
let msg = format!("Invalid slice argument: the evaluated length expression was of different {} than the number of groups.", name);
Err(expression_err!(msg, expr, ComputeError))
} else if arg.null_count() > 0 {
Err(PolarsError::ComputeError(
format!(
"Invalid slice argument: the {} expression should not have null values",
name
)
.into(),
))
let msg = format!(
"Invalid slice argument: the {} expression should not have null values.",
name
);
Err(expression_err!(msg, expr, ComputeError))
} else {
Ok(())
}
Expand Down Expand Up @@ -94,7 +101,7 @@ impl PhysicalExpr for SliceExpr {
let offset = &results[0];
let length = &results[1];
let series = &results[2];
let (offset, length) = extract_args(offset, length)?;
let (offset, length) = extract_args(offset, length, &self.expr)?;

Ok(series.slice(offset, length))
}
Expand All @@ -120,7 +127,7 @@ impl PhysicalExpr for SliceExpr {
use AggState::*;
let groups = match (&ac_offset.state, &ac_length.state) {
(Literal(offset), Literal(length)) => {
let (offset, length) = extract_args(offset, length)?;
let (offset, length) = extract_args(offset, length, &self.expr)?;

match groups.as_ref() {
GroupsProxy::Idx(groups) => {
Expand All @@ -143,9 +150,9 @@ impl PhysicalExpr for SliceExpr {
}
}
(Literal(offset), _) => {
let offset = extract_offset(offset)?;
let offset = extract_offset(offset, &self.expr)?;
let length = ac_length.aggregated();
check_argument(&length, groups, "length")?;
check_argument(&length, groups, "length", &self.expr)?;

let length = length.cast(&IDX_DTYPE)?;
let length = length.idx().unwrap();
Expand Down Expand Up @@ -177,9 +184,9 @@ impl PhysicalExpr for SliceExpr {
}
}
(_, Literal(length)) => {
let length = extract_length(length)?;
let length = extract_length(length, &self.expr)?;
let offset = ac_offset.aggregated();
check_argument(&offset, groups, "offset")?;
check_argument(&offset, groups, "offset", &self.expr)?;

let offset = offset.cast(&DataType::Int64)?;
let offset = offset.i64().unwrap();
Expand Down Expand Up @@ -213,8 +220,8 @@ impl PhysicalExpr for SliceExpr {
_ => {
let length = ac_length.aggregated();
let offset = ac_offset.aggregated();
check_argument(&length, groups, "length")?;
check_argument(&offset, groups, "offset")?;
check_argument(&length, groups, "length", &self.expr)?;
check_argument(&offset, groups, "offset", &self.expr)?;

let offset = offset.cast(&DataType::Int64)?;
let offset = offset.i64().unwrap();
Expand Down
18 changes: 10 additions & 8 deletions polars/polars-lazy/src/physical_plan/expressions/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::frame::groupby::GroupsProxy;
use polars_core::prelude::*;
use polars_core::utils::NoNull;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

Expand Down Expand Up @@ -47,6 +48,11 @@ impl PhysicalExpr for TakeExpr {
let mut ac = self.phys_expr.evaluate_on_groups(df, groups, state)?;
let mut idx = self.idx.evaluate_on_groups(df, groups, state)?;

let oob_err = || {
let msg = "Out of bounds.";
Err(expression_err!(msg, self.expr, ComputeError))
};

let idx =
match idx.state {
AggState::AggregatedFlat(s) => {
Expand Down Expand Up @@ -74,7 +80,7 @@ impl PhysicalExpr for TakeExpr {
Some(idx) => idx >= g.len() as IdxSize,
},
) {
return Err(PolarsError::ComputeError("out of bounds".into()));
return oob_err();
}

idx.into_iter()
Expand All @@ -91,7 +97,7 @@ impl PhysicalExpr for TakeExpr {
Some(idx) => idx >= g[1],
})
{
return Err(PolarsError::ComputeError("out of bounds".into()));
return oob_err();
}

idx.into_iter()
Expand Down Expand Up @@ -130,18 +136,14 @@ impl PhysicalExpr for TakeExpr {
let idx: NoNull<IdxCa> = match groups.as_ref() {
GroupsProxy::Idx(groups) => {
if groups.all().iter().any(|g| idx >= g.len() as IdxSize) {
return Err(PolarsError::ComputeError(
"out of bounds".into(),
));
return oob_err();
}

groups.first().iter().map(|f| *f + idx).collect_trusted()
}
GroupsProxy::Slice { groups, .. } => {
if groups.iter().any(|g| idx >= g[1]) {
return Err(PolarsError::ComputeError(
"out of bounds".into(),
));
return oob_err();
}

groups.iter().map(|g| g[0] + idx).collect_trusted()
Expand Down
4 changes: 3 additions & 1 deletion polars/polars-lazy/src/physical_plan/expressions/ternary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::frame::groupby::GroupsProxy;
use polars_core::prelude::*;
use polars_core::POOL;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::{ExecutionState, StateFlags};
use crate::prelude::*;

Expand Down Expand Up @@ -182,7 +183,8 @@ impl PhysicalExpr for TernaryExpr {
let mask = mask_s.bool()?;
let check_length = |ca: &ListChunked, mask: &BooleanChunked| {
if ca.len() != mask.len() {
Err(PolarsError::ComputeError(format!("the predicates length: '{}' does not match the length of the groups: {}", mask.len(), ca.len()).into()))
let msg = format!("The predicates length: '{}' does not match the length of the groups: {}.", mask.len(), ca.len());
Err(expression_err!(msg, self.expr, ComputeError))
} else {
Ok(())
}
Expand Down
Loading

0 comments on commit af165fe

Please sign in to comment.