-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix the schema mismatch between logical and physical for aggregate function, add AggregateUDFImpl::is_null
#11989
Changes from 21 commits
aed01f0
cbfefc6
b3fc2c8
20d0a5f
1132686
611092e
e732adc
ab38a5a
1d299eb
19a1ac7
984ced7
9b75540
6361bc4
794ce12
cb63514
9c12566
a42654c
e45d1bb
83ce363
3519e75
da30827
356faa8
043c332
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -670,6 +670,12 @@ impl DefaultPhysicalPlanner { | |
let input_exec = children.one()?; | ||
let physical_input_schema = input_exec.schema(); | ||
let logical_input_schema = input.as_ref().schema(); | ||
let physical_input_schema_from_logical: Arc<Schema> = | ||
logical_input_schema.as_ref().clone().into(); | ||
|
||
if physical_input_schema != physical_input_schema_from_logical { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🎉 |
||
return internal_err!("Physical input schema should be the same as the one converted from logical input schema."); | ||
} | ||
|
||
let groups = self.create_grouping_physical_expr( | ||
group_expr, | ||
|
@@ -1548,7 +1554,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( | |
e: &Expr, | ||
name: Option<String>, | ||
logical_input_schema: &DFSchema, | ||
_physical_input_schema: &Schema, | ||
physical_input_schema: &Schema, | ||
execution_props: &ExecutionProps, | ||
) -> Result<AggregateExprWithOptionalArgs> { | ||
match e { | ||
|
@@ -1599,11 +1605,10 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter( | |
let ordering_reqs: Vec<PhysicalSortExpr> = | ||
physical_sort_exprs.clone().unwrap_or(vec![]); | ||
|
||
let schema: Schema = logical_input_schema.clone().into(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. workaround cleanup |
||
let agg_expr = | ||
AggregateExprBuilder::new(func.to_owned(), physical_args.to_vec()) | ||
.order_by(ordering_reqs.to_vec()) | ||
.schema(Arc::new(schema)) | ||
.schema(Arc::new(physical_input_schema.to_owned())) | ||
.alias(name) | ||
.with_ignore_nulls(ignore_nulls) | ||
.with_distinct(*distinct) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -335,18 +335,28 @@ impl ExprSchemable for Expr { | |
} | ||
} | ||
Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema), | ||
Expr::ScalarFunction(ScalarFunction { func, args }) => { | ||
Ok(func.is_nullable(args, input_schema)) | ||
} | ||
Expr::AggregateFunction(AggregateFunction { func, .. }) => { | ||
// TODO: UDF should be able to customize nullability | ||
if func.name() == "count" { | ||
Ok(false) | ||
} else { | ||
Ok(true) | ||
} | ||
Ok(func.is_nullable()) | ||
} | ||
Expr::WindowFunction(WindowFunction { fun, .. }) => match fun { | ||
WindowFunctionDefinition::BuiltInWindowFunction(func) => { | ||
if func.name() == "RANK" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. checking the name is probably fine given we are in the process of removing the enum anyways This could also check the variants of |
||
|| func.name() == "NTILE" | ||
|| func.name() == "CUME_DIST" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if this list is complete. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Temporary code, there would be no name checking after #8709 is done. We can see that |
||
{ | ||
Ok(false) | ||
} else { | ||
Ok(true) | ||
} | ||
} | ||
WindowFunctionDefinition::AggregateUDF(func) => Ok(func.is_nullable()), | ||
WindowFunctionDefinition::WindowUDF(udwf) => Ok(udwf.nullable()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is great |
||
}, | ||
Expr::ScalarVariable(_, _) | ||
| Expr::TryCast { .. } | ||
| Expr::ScalarFunction(..) | ||
| Expr::WindowFunction { .. } | ||
| Expr::Unnest(_) | ||
| Expr::Placeholder(_) => Ok(true), | ||
Expr::IsNull(_) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
count() is always non nullable, so this change makes sense to mee