Skip to content

Commit

Permalink
[SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set partition index…
Browse files Browse the repository at this point in the history
… correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec

### What changes were proposed in this pull request?
This is a followup of #41899 and #41939, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup.

### Why are the changes needed?
future-proof

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
existing tests

Closes #42208 from beliefer/SPARK-44340_followup.

Authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
beliefer authored and cloud-fan committed Jul 31, 2023
1 parent 32498b3 commit ea3061b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,22 @@ case class WindowInPandasExec(
)

protected override def doExecute(): RDD[InternalRow] = {
val spillSize = longMetric("spillSize")

val evaluatorFactory =
new WindowInPandasEvaluatorFactory(
windowExpression,
partitionSpec,
orderSpec,
child.output,
spillSize,
longMetric("spillSize"),
pythonMetrics)

// Start processing.
if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitions { iter =>
child.execute().mapPartitionsWithIndex { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, iter)
evaluator.eval(index, rowIterator)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,21 @@ case class WindowExec(
)

protected override def doExecute(): RDD[InternalRow] = {
val spillSize = longMetric("spillSize")

val evaluatorFactory =
new WindowEvaluatorFactory(
windowExpression,
partitionSpec,
orderSpec,
child.output,
spillSize)
longMetric("spillSize"))

// Start processing.
if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitions { iter =>
child.execute().mapPartitionsWithIndex { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, iter)
evaluator.eval(index, rowIterator)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,21 @@ case class WindowGroupLimitExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")

val evaluatorFactory =
new WindowGroupLimitEvaluatorFactory(
partitionSpec,
orderSpec,
rankLikeFunction,
limit,
child.output,
numOutputRows)
longMetric("numOutputRows"))

if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitionsInternal { iter =>
child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, iter)
evaluator.eval(index, rowIterator)
}
}
}
Expand Down

0 comments on commit ea3061b

Please sign in to comment.