Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust, python): show expression where error originated if raised … #5263

Merged
merged 1 commit into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions polars/polars-lazy/src/physical_plan/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
macro_rules! expression_err {
($msg:expr, $source:expr, $error:ident) => {{
let msg = format!(
"{}\n\n> Error originated in expression: '{:?}'",
$msg, $source
);
PolarsError::$error(msg.into())
}};
}

pub(super) use expression_err;
29 changes: 16 additions & 13 deletions polars/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use polars_io::predicates::StatsEvaluator;
use polars_plan::dsl::FunctionExpr;
use rayon::prelude::*;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

Expand Down Expand Up @@ -80,9 +81,10 @@ fn all_unit_length(ca: &ListChunked) -> bool {
(offset[offset.len() - 1] as usize) == list_arr.len() as usize
}

fn check_map_output_len(input_len: usize, output_len: usize) -> PolarsResult<()> {
fn check_map_output_len(input_len: usize, output_len: usize, expr: &Expr) -> PolarsResult<()> {
if input_len != output_len {
Err(PolarsError::ComputeError("A 'map' functions output length must be equal to that of the input length. Consider using 'apply' in favor of 'map'.".into()))
let msg = "A 'map' functions output length must be equal to that of the input length. Consider using 'apply' in favor of 'map'.";
Err(expression_err!(msg, expr, ComputeError))
} else {
Ok(())
}
Expand Down Expand Up @@ -131,13 +133,11 @@ impl PhysicalExpr for ApplyExpr {
let s = ac.series();

if matches!(ac.agg_state(), AggState::AggregatedFlat(_)) {
return Err(PolarsError::ComputeError(
format!(
"Cannot aggregate {:?}. The column is already aggregated.",
self.expr
)
.into(),
));
let msg = format!(
"Cannot aggregate {:?}. The column is already aggregated.",
self.expr
);
return Err(expression_err!(msg, self.expr, ComputeError));
}

// collection of empty list leads to a null dtype
Expand Down Expand Up @@ -194,7 +194,7 @@ impl PhysicalExpr for ApplyExpr {
let input_len = input.len();
let s = self.function.call_udf(&mut [input])?;

check_map_output_len(input_len, s.len())?;
check_map_output_len(input_len, s.len(), &self.expr)?;
ac.with_series(s, false);

if set_update_groups {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl PhysicalExpr for ApplyExpr {
self.collect_groups,
acs[0].agg_state(),
) {
apply_multiple_flat(acs, self.function.as_ref())
apply_multiple_flat(acs, self.function.as_ref(), &self.expr)
} else {
let mut container = vec![Default::default(); acs.len()];
let name = acs[0].series().name().to_string();
Expand Down Expand Up @@ -272,7 +272,9 @@ impl PhysicalExpr for ApplyExpr {
Ok(ac)
}
}
(_, ApplyOptions::ApplyFlat) => apply_multiple_flat(acs, self.function.as_ref()),
(_, ApplyOptions::ApplyFlat) => {
apply_multiple_flat(acs, self.function.as_ref(), &self.expr)
}
}
}
}
Expand Down Expand Up @@ -308,6 +310,7 @@ impl PhysicalExpr for ApplyExpr {
fn apply_multiple_flat<'a>(
mut acs: Vec<AggregationContext<'a>>,
function: &dyn SeriesUdf,
expr: &Expr,
) -> PolarsResult<AggregationContext<'a>> {
let mut s = acs
.iter_mut()
Expand All @@ -324,7 +327,7 @@ fn apply_multiple_flat<'a>(

let input_len = s[0].len();
let s = function.call_udf(&mut s)?;
check_map_output_len(input_len, s.len())?;
check_map_output_len(input_len, s.len(), expr)?;

// take the first aggregation context that as that is the input series
let mut ac = acs.swap_remove(0);
Expand Down
8 changes: 3 additions & 5 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::series::unstable::UnstableSeries;
use polars_core::POOL;
use rayon::prelude::*;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::{ExecutionState, StateFlags};
use crate::prelude::*;

Expand Down Expand Up @@ -124,11 +125,8 @@ impl PhysicalExpr for BinaryExpr {
let mut ac_r = result_b?;

if !ac_l.can_combine(&ac_r) {
return Err(PolarsError::InvalidOperation(
"\
cannot combine this binary expression, the groups do not match"
.into(),
));
let msg = "Cannot combine this binary expression, the groups do not match.";
return Err(expression_err!(msg, self.expr, InvalidOperation));
}

match (
Expand Down
69 changes: 38 additions & 31 deletions polars/polars-lazy/src/physical_plan/expressions/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::POOL;
use rayon::prelude::*;
use AnyValue::Null;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

Expand All @@ -17,50 +18,56 @@ pub struct SliceExpr {
pub(crate) expr: Expr,
}

fn extract_offset(offset: &Series) -> PolarsResult<i64> {
fn extract_offset(offset: &Series, expr: &Expr) -> PolarsResult<i64> {
if offset.len() > 1 {
return Err(PolarsError::ComputeError(format!("Invalid argument to slice; expected an offset literal but got a Series of length {}", offset.len()).into()));
let msg = format!(
"Invalid argument to slice; expected an offset literal but got a Series of length {}.",
offset.len()
);
return Err(expression_err!(msg, expr, ComputeError));
}
offset.get(0).extract::<i64>().ok_or_else(|| {
PolarsError::ComputeError(format!("could not get an offset from {:?}", offset).into())
})
}

fn extract_length(length: &Series) -> PolarsResult<usize> {
fn extract_length(length: &Series, expr: &Expr) -> PolarsResult<usize> {
if length.len() > 1 {
return Err(PolarsError::ComputeError(format!("Invalid argument to slice; expected a length literal but got a Series of length {}", length.len()).into()));
let msg = format!(
"Invalid argument to slice; expected a length literal but got a Series of length {}.",
length.len()
);
return Err(expression_err!(msg, expr, ComputeError));
}
match length.get(0) {
Null => Ok(usize::MAX),
v => v.extract::<usize>().ok_or_else(|| {
PolarsError::ComputeError(format!("could not get a length from {:?}", length).into())
let msg = format!("Could not get a length from {:?}.", length);
expression_err!(msg, expr, ComputeError)
}),
}
}

fn extract_args(offset: &Series, length: &Series) -> PolarsResult<(i64, usize)> {
Ok((extract_offset(offset)?, extract_length(length)?))
fn extract_args(offset: &Series, length: &Series, expr: &Expr) -> PolarsResult<(i64, usize)> {
Ok((extract_offset(offset, expr)?, extract_length(length, expr)?))
}

fn check_argument(arg: &Series, groups: &GroupsProxy, name: &str) -> PolarsResult<()> {
fn check_argument(arg: &Series, groups: &GroupsProxy, name: &str, expr: &Expr) -> PolarsResult<()> {
if let DataType::List(_) = arg.dtype() {
Err(PolarsError::ComputeError(
format!(
"Invalid slice argument: cannot use an array as {} argument",
name
)
.into(),
))
let msg = format!(
"Invalid slice argument: cannot use an array as {} argument.",
name
);
Err(expression_err!(msg, expr, ComputeError))
} else if arg.len() != groups.len() {
Err(PolarsError::ComputeError(format!("Invalid slice argument: the evaluated length expression was of different {} than the number of groups", name).into()))
let msg = format!("Invalid slice argument: the evaluated length expression was of different {} than the number of groups.", name);
Err(expression_err!(msg, expr, ComputeError))
} else if arg.null_count() > 0 {
Err(PolarsError::ComputeError(
format!(
"Invalid slice argument: the {} expression should not have null values",
name
)
.into(),
))
let msg = format!(
"Invalid slice argument: the {} expression should not have null values.",
name
);
Err(expression_err!(msg, expr, ComputeError))
} else {
Ok(())
}
Expand Down Expand Up @@ -94,7 +101,7 @@ impl PhysicalExpr for SliceExpr {
let offset = &results[0];
let length = &results[1];
let series = &results[2];
let (offset, length) = extract_args(offset, length)?;
let (offset, length) = extract_args(offset, length, &self.expr)?;

Ok(series.slice(offset, length))
}
Expand All @@ -120,7 +127,7 @@ impl PhysicalExpr for SliceExpr {
use AggState::*;
let groups = match (&ac_offset.state, &ac_length.state) {
(Literal(offset), Literal(length)) => {
let (offset, length) = extract_args(offset, length)?;
let (offset, length) = extract_args(offset, length, &self.expr)?;

match groups.as_ref() {
GroupsProxy::Idx(groups) => {
Expand All @@ -143,9 +150,9 @@ impl PhysicalExpr for SliceExpr {
}
}
(Literal(offset), _) => {
let offset = extract_offset(offset)?;
let offset = extract_offset(offset, &self.expr)?;
let length = ac_length.aggregated();
check_argument(&length, groups, "length")?;
check_argument(&length, groups, "length", &self.expr)?;

let length = length.cast(&IDX_DTYPE)?;
let length = length.idx().unwrap();
Expand Down Expand Up @@ -177,9 +184,9 @@ impl PhysicalExpr for SliceExpr {
}
}
(_, Literal(length)) => {
let length = extract_length(length)?;
let length = extract_length(length, &self.expr)?;
let offset = ac_offset.aggregated();
check_argument(&offset, groups, "offset")?;
check_argument(&offset, groups, "offset", &self.expr)?;

let offset = offset.cast(&DataType::Int64)?;
let offset = offset.i64().unwrap();
Expand Down Expand Up @@ -213,8 +220,8 @@ impl PhysicalExpr for SliceExpr {
_ => {
let length = ac_length.aggregated();
let offset = ac_offset.aggregated();
check_argument(&length, groups, "length")?;
check_argument(&offset, groups, "offset")?;
check_argument(&length, groups, "length", &self.expr)?;
check_argument(&offset, groups, "offset", &self.expr)?;

let offset = offset.cast(&DataType::Int64)?;
let offset = offset.i64().unwrap();
Expand Down
18 changes: 10 additions & 8 deletions polars/polars-lazy/src/physical_plan/expressions/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::frame::groupby::GroupsProxy;
use polars_core::prelude::*;
use polars_core::utils::NoNull;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

Expand Down Expand Up @@ -47,6 +48,11 @@ impl PhysicalExpr for TakeExpr {
let mut ac = self.phys_expr.evaluate_on_groups(df, groups, state)?;
let mut idx = self.idx.evaluate_on_groups(df, groups, state)?;

let oob_err = || {
let msg = "Out of bounds.";
Err(expression_err!(msg, self.expr, ComputeError))
};

let idx =
match idx.state {
AggState::AggregatedFlat(s) => {
Expand Down Expand Up @@ -74,7 +80,7 @@ impl PhysicalExpr for TakeExpr {
Some(idx) => idx >= g.len() as IdxSize,
},
) {
return Err(PolarsError::ComputeError("out of bounds".into()));
return oob_err();
}

idx.into_iter()
Expand All @@ -91,7 +97,7 @@ impl PhysicalExpr for TakeExpr {
Some(idx) => idx >= g[1],
})
{
return Err(PolarsError::ComputeError("out of bounds".into()));
return oob_err();
}

idx.into_iter()
Expand Down Expand Up @@ -130,18 +136,14 @@ impl PhysicalExpr for TakeExpr {
let idx: NoNull<IdxCa> = match groups.as_ref() {
GroupsProxy::Idx(groups) => {
if groups.all().iter().any(|g| idx >= g.len() as IdxSize) {
return Err(PolarsError::ComputeError(
"out of bounds".into(),
));
return oob_err();
}

groups.first().iter().map(|f| *f + idx).collect_trusted()
}
GroupsProxy::Slice { groups, .. } => {
if groups.iter().any(|g| idx >= g[1]) {
return Err(PolarsError::ComputeError(
"out of bounds".into(),
));
return oob_err();
}

groups.iter().map(|g| g[0] + idx).collect_trusted()
Expand Down
4 changes: 3 additions & 1 deletion polars/polars-lazy/src/physical_plan/expressions/ternary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_core::frame::groupby::GroupsProxy;
use polars_core::prelude::*;
use polars_core::POOL;

use crate::physical_plan::expression_err;
use crate::physical_plan::state::{ExecutionState, StateFlags};
use crate::prelude::*;

Expand Down Expand Up @@ -182,7 +183,8 @@ impl PhysicalExpr for TernaryExpr {
let mask = mask_s.bool()?;
let check_length = |ca: &ListChunked, mask: &BooleanChunked| {
if ca.len() != mask.len() {
Err(PolarsError::ComputeError(format!("the predicates length: '{}' does not match the length of the groups: {}", mask.len(), ca.len()).into()))
let msg = format!("The predicates length: '{}' does not match the length of the groups: {}.", mask.len(), ca.len());
Err(expression_err!(msg, self.expr, ComputeError))
} else {
Ok(())
}
Expand Down
Loading