-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Support type coercion for ScalarUDFs #865
Conversation
df.select(array(array(col("_4")), array(col("_4"), lit(null))))) | ||
checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_13")))) | ||
// TODO: Some part of this converts the null to an empty string | ||
// checkSparkAnswerAndOperator(df.select(array(col("_8"), col("_13"), lit(null)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is really odd and I haven't been able to figure it out. Somewhere along the way the lit(null)
ends up becoming an empty string instead of null whenever it has to get cast to a dictionary array. I logged the full plan for this one
Projection: ProjectionExec { expr: [(ScalarFunctionExpr { fun: "<FUNC>", name: "make_array", args: [CastExpr { expr: Column { name: "col_0", index: 0 }, cast_type: Dictionary(Int32, Utf8), cast_options: CastOptions { safe: false, format_options: FormatOptions { safe: true, null: "", date_format: None, datetime_format: None, timestamp_format: None, timestamp_tz_format: None, time_format: None, duration_format: ISO8601 } } }, Column { name: "col_1", index: 1 }, CastExpr { expr: Literal { value: Utf8(NULL) }, cast_type: Dictionary(Int32, Utf8), cast_options: CastOptions { safe: false, format_options: FormatOptions { safe: true, null: "", date_format: None, datetime_format: None, timestamp_format: None, timestamp_tz_format: None, time_format: None, duration_format: ISO8601 } } }], return_type: List(Field { name: "item", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) }, "col_0")], schema: Schema { fields: [Field { name: "col_0", data_type: List(Field { name: "item", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, input: ScanExec { exec_context_id: 0, input_source: Some(GlobalRef { inner: GlobalRefGuard { obj: JObject { internal: 0x12010b128, lifetime: PhantomData<&()> }, vm: JavaVM(0x10b7aeb10) } }), input_source_description: "CometScan parquet (unknown)", data_types: [Utf8, Utf8], batch: Mutex { data: Some(Batch([StringArray
[
null,
null,
"222222222222222222222222222222222222222222222222",
null,
null,
"111111111111111111111111111111111111111111111111",
null,
"333333333333333333333333333333333333333333333333",
"000000000000000000000000000000000000000000000000",
null,
...80 elements...,
"222222222222222222222222222222222222222222222222",
null,
null,
"111111111111111111111111111111111111111111111111",
"222222222222222222222222222222222222222222222222",
"333333333333333333333333333333333333333333333333",
null,
"111111111111111111111111111111111111111111111111",
"222222222222222222222222222222222222222222222222",
"333333333333333333333333333333333333333333333333",
], DictionaryArray {keys: PrimitiveArray<Int32>
[
null,
null,
0,
null,
null,
1,
null,
2,
3,
null,
...80 elements...,
0,
null,
null,
1,
0,
2,
null,
1,
0,
2,
] values: StringArray
[
"2",
"1",
"3",
"0",
]}
], 100)), poisoned: false, .. }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "col_0", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "col_1", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 1, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } } }, metrics: ExecutionPlanMetricsSet { inner: Mutex { data: MetricsSet { metrics: [] } } }, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, constants: [], schema: Schema { fields: [Field { name: "col_0", data_type: List(Field { name: "item", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} } }, partitioning: UnknownPartitioning(1), execution_mode: Bounded, output_ordering: None } }
I'm not sure if this is something on the DataFusion side or something with returning the result back to Spark land. Other columns don't have issues even where the Row has a null value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to recreate with a test on v41.0.0, but it seems to be fixed on main
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found it: apache/datafusion@2730423
Thanks @Kimahriman. I think this looks good but want to do some testing before approving. |
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @Kimahriman
* Add type coercion to ScalarUDFs and support dictionaries in lists * Formatting * Remove unused var * Cleanup planner * Update comment for failing text * Add struct tests * Change back row count (cherry picked from commit 7484588)
Which issue does this PR close?
Closes #841
Rationale for this change
make_array
doesn't support mixed types of dictionaries and non-dictionaries. Type coercion is the fundamental piece that DataFusion uses at analysis time that is missing from Comet's use of ScalarUDFs.What changes are included in this PR?
In the planning for ScalarUDFs on the Rust side, check the function to see if the input types should be coerced to other types, and if so wrap the Expression in a cast to the appropriate type.
How are these changes tested?
CreateArray
unit test is updated with cases that were previously failing.