-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44341][SQL][PYTHON] Define the computing logic through PartitionEvaluator API and use it in WindowExec and WindowInPandasExec #41939
Conversation
ping @cloud-fan @HyukjinKwon @zhengruifeng @viirya cc @vinodkc |
val windowExpression: Seq[NamedExpression], | ||
val partitionSpec: Seq[Expression], | ||
val orderSpec: Seq[SortOrder], | ||
val conf: SQLConf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually don't need to pass the SQLConf, as worker side can get it via SQLConf.get
We can probably skip testing it. Overall it's just a refactor and it's probably too much to run many tests twice. We can enable it by default later so that all tests cover this code path. |
def windowExpression: Seq[NamedExpression] | ||
def partitionSpec: Seq[Expression] | ||
def orderSpec: Seq[SortOrder] | ||
def conf: SQLConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems conf
is always SQLConf.get
in both implementations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have used SQLConf.get
in both implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored code looks okay to me.
As tests are skipped now, maybe you can enable it to test change in CI, and revert back current config before merging. |
In fact, CI passed in previous commit. |
Oh okay. 👍 |
The CI failure is unrelated. |
} | ||
|
||
// Unwrap the expressions and factories from the map. | ||
private val expressionsWithFrameIndex = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so all these private val
s will be serialized to the executor side, right? I think it's better to define them in the evaluator class, not the factor, to reduce the data to be serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @Hisoka-X can you also check your PRs (either merged or not)? We should avoid putting variables in the factor class, but define them in the evaluator class instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't come across a factory that needs to define member variables (except for parameters)😂
…onEvaluator API and use it in WindowExec and WindowInPandasExec
thanks, merging to master! |
@cloud-fan @viirya @Hisoka-X Thank you. |
import org.apache.spark.sql.types.{CalendarIntervalType, DateType, DayTimeIntervalType, DecimalType, IntegerType, TimestampNTZType, TimestampType, YearMonthIntervalType} | ||
import org.apache.spark.util.collection.Utils | ||
|
||
trait WindowEvaluatorFactoryBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with hindsight, shall we move it to a new file? Its two sub-classes are not both in this file, so it's a bit weird to put it with one of the sub-class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let's move it.
…torFactoryBase to a single file ### What changes were proposed in this pull request? #41939 defined the computing logic through PartitionEvaluator API and use it in `WindowExec` and `WindowInPandasExec`. According to the comment #41939 (comment), this PR want move the base trait `WindowEvaluatorFactoryBase` to a single file. ### Why are the changes needed? Improve the code. ### Does this PR introduce _any_ user-facing change? 'No'. Just update inner implementation. ### How was this patch tested? N/A Closes #42106 from beliefer/SPARK-44341_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
} else { | ||
child.execute().mapPartitions { iter => | ||
val evaluator = evaluatorFactory.createEvaluator() | ||
evaluator.eval(0, iter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the reminder. I will take a look!
… correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec ### What changes were proposed in this pull request? This is a followup of #41899 and #41939, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? existing tests Closes #42208 from beliefer/SPARK-44340_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…onEvaluator API and use it in WindowExec and WindowInPandasExec ### What changes were proposed in this pull request? `WindowExec` and `WindowInPandasExec` are updated to use the `PartitionEvaluator` API to do execution. ### Why are the changes needed? To define the computing logic and requires the caller side to explicitly list what needs to be serialized and sent to executors ### Does this PR introduce _any_ user-facing change? 'No'. Just update the inner implementation. ### How was this patch tested? Add new test cases. Closes apache#41939 from beliefer/SPARK-44341. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…torFactoryBase to a single file ### What changes were proposed in this pull request? apache#41939 defined the computing logic through PartitionEvaluator API and use it in `WindowExec` and `WindowInPandasExec`. According to the comment apache#41939 (comment), this PR want move the base trait `WindowEvaluatorFactoryBase` to a single file. ### Why are the changes needed? Improve the code. ### Does this PR introduce _any_ user-facing change? 'No'. Just update inner implementation. ### How was this patch tested? N/A Closes apache#42106 from beliefer/SPARK-44341_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec ### What changes were proposed in this pull request? This is a followup of apache#41899 and apache#41939, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? existing tests Closes apache#42208 from beliefer/SPARK-44340_followup. Authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
WindowExec
andWindowInPandasExec
are updated to use thePartitionEvaluator
API to do execution.Why are the changes needed?
To define the computing logic and requires the caller side to explicitly list what needs to be serialized and sent to executors
Does this PR introduce any user-facing change?
'No'.
Just update the inner implementation.
How was this patch tested?
Add new test cases.