-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add GetStructField expression #731
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did my best at implementing a new expression. Had to adjust some typing checking around, since nearly all of the type checking is off a single list or two of supported types, but I'm only trying to add some struct support. Got all the tests to pass at least.
spark/src/main/scala/org/apache/spark/sql/comet/CometRowToColumnarExec.scala
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #731 +/- ##
=============================================
- Coverage 53.78% 33.66% -20.13%
- Complexity 815 860 +45
=============================================
Files 107 111 +4
Lines 10279 42679 +32400
Branches 1934 9379 +7445
=============================================
+ Hits 5529 14366 +8837
- Misses 3773 25350 +21577
- Partials 977 2963 +1986 ☔ View full report in Codecov by Sentry. |
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Outdated
Show resolved
Hide resolved
@@ -1128,7 +1133,7 @@ object CometSparkSessionExtensions extends Logging { | |||
// Only consider converting leaf nodes to columnar currently, so that all the following | |||
// operators can have a chance to be converted to columnar. | |||
// TODO: consider converting other intermediate operators to columnar. | |||
op.isInstanceOf[LeafExecNode] && !op.supportsColumnar && isSchemaSupported(op.schema) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the comments here to cover this change in functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a little bit more, but still is just a little weird since it's technically still called RowToColumnar
spark/src/main/scala/org/apache/spark/sql/comet/CometRowToColumnarExec.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @Kimahriman. I think this looks good.
} | ||
} | ||
|
||
impl PhysicalExpr for GetStructField { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, if it's possible to implement these as ScalarUDFImpl instead of PhysicalExpr, that makes reusing them elsewhere easier (at least for me, but maybe for otheres too, since I think going from ScalarUDF -> PhysicalExpr is easier than the other way around) :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know enough about DataFusion to really know what the difference is. Just on the Spark side, UDFs are usually slightly less performant, so if you don't have to do a UDF you're usually better off. DataFusion does have a get_field ScalarUDF already it looks like, but that's by name and not index, and there seems like a lot more ceremony about checking all the input types, vs the PhysicalExpr is more tailored to what we already know about the input data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should start implementing functions as ScalarUDFImpl instead of PhysicalExpr. I think it would be fine to convert this one as a follow on PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the tl;dr on the benefits of that? Easier to use outside of the Comet/Spark use case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it makes it easier for DataFusion users to switch between different function implementations and can be used from the logical plan as well as from the physical plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this particular case GetStructField
might not even make sense as a UDF because it's created by the Spark analyzer from an ExtractValue
expression, which resolves the name to an ordinal in the struct. It'd be odd to use GetStructField
directly, since you would normally want a nested column by name, and not by index. I think the existing get_field
UDF in DataFusion covers that use case, but using this PhysicalExpr on the Spark side seems like it makes more sense instead of having to convert the ordinal back into a field name to then use get_field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @Kimahriman
@@ -59,12 +59,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |||
logWarning(s"Comet native execution is disabled due to: $reason") | |||
} | |||
|
|||
def supportedDataType(dt: DataType): Boolean = dt match { | |||
def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just thinking do we really need allowComplex
, which more looks like a supportComplexTypes
or we can use the Comet param like its done for operator/expressions.
Moreover the isSchemaSupported doesn't have this flag which causes some inconsistency imho
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes there's a lot of oddities in how supported data types works right now. It should really just be operator/expression dependent that gets bubbled up to whether the whole plan is supported or not.
I'm not sure what you mean by
or we can use the Comet param like its done for operator/expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like
private[comet] def isCometAllOperatorEnabled(conf: SQLConf): Boolean = {
COMET_EXEC_ALL_OPERATOR_ENABLED.get(conf)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be a little odd to have a config for struct type support IMO, since either the code supports it or it doesn't. Maybe you would want to override to disable struct support, but you would still need all the same code to support it as well
spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm (some minor comments)
@@ -1115,6 +1119,7 @@ object CometSparkSessionExtensions extends Logging { | |||
BinaryType | StringType | _: DecimalType | DateType | TimestampType => | |||
true | |||
case t: DataType if t.typeName == "timestamp_ntz" => true | |||
case s: StructType => isSchemaSupported(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change needed? In general, structs are not supported (yet) and since this method is not operator specific, we probably shouldn't have this here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #731 (comment), it's still needed by shouldApplyRowToColumnar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I'm concerned that someone might use this method and get unexpected behavior. A comment to explain why this is here would be justified, I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it probably makes sense just to move it into CometRowToColumnarExec
@@ -59,12 +59,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |||
logWarning(s"Comet native execution is disabled due to: $reason") | |||
} | |||
|
|||
def supportedDataType(dt: DataType): Boolean = dt match { | |||
def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we consider renaming this to allowStruct
to make it explicit that this is only for structs (and not maps and arrays).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
spark/src/main/scala/org/apache/spark/sql/comet/CometRowToColumnarExec.scala
Show resolved
Hide resolved
Thanks for addressing the feedback @Kimahriman. Could you fix the merge conflict? |
Done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I think this is ready to merge @parthchandra / @kazuyukitanimura ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @Kimahriman this PR def brings up the benefit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Kimahriman Would you mind merge with the latest main to resolve the conflict?
Done again! |
* Add GetStructField support * Add custom types to CometBatchScanExec * Remove test explain * Rust fmt * Fix struct type support checks * Support converting StructArray to native * fix style * Attempt to fix scalar subquery issue * Fix other unit test * Cleanup * Default query plan supporting complex type to false * Migrate struct expressions to spark-expr * Update shouldApplyRowToColumnar comment * Add nulls to test * Rename to allowStruct * Add DataTypeSupport trait * Fix parquet datatype test (cherry picked from commit 5b5142b)
Which issue does this PR close?
Closes #730
Rationale for this change
To support struct types in expressions, you need to be able to pull out values from structs, which Spark does through the GetStructField expression.
What changes are included in this PR?
Adds a new PhysicalExpr
GetStructField
that gets a field from within a struct.Additionally to support this and testing it:
DataTypeSupport
that lets operators opt-in to supporting certain data types. This will help incrementally add complex data type support.CometRowToColumnar
to support columnar sources. This is so I can use Spark's vectorized parquet read to read complex data, and then immediately forward to Comet viaCOMET_ROW_TO_COLUMNAR_SUPPORTED_OPERATOR_LIST
. I do this in the UT I made.How are these changes tested?
New UT showing comet operators take affect, existing Spark tests for functionality