Skip to content

Commit

Permalink
[SPARK-26680][SQL] Eagerly create inputVars while conditions are appr…
Browse files Browse the repository at this point in the history
…opriate

## What changes were proposed in this pull request?

When a user passes a Stream to groupBy, ```CodegenSupport.consume``` ends up lazily generating ```inputVars``` from a Stream, since the field ```output``` will be a Stream. At the time ```output.zipWithIndex.map``` is called, conditions are correct. However, by the time the map operation actually executes, conditions are no longer appropriate. The closure used by the map operation ends up using a reference to the partially created ```inputVars```. As a result, a StackOverflowError occurs.

This PR ensures that ```inputVars``` is eagerly created while conditions are appropriate. It seems this was also an issue with the code path for creating ```inputVars``` from ```outputVars``` (SPARK-25767). I simply extended the solution for that code path to encompass both code paths.

## How was this patch tested?

SQL unit tests
new test
python tests

Closes apache#23617 from bersprockets/SPARK-26680_opt1.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
(cherry picked from commit d4a30fa)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
  • Loading branch information
bersprockets authored and kai-chi committed Jul 23, 2019
1 parent 4df1c32 commit 79c4b90
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,11 @@ trait CodegenSupport extends SparkPlan {
* Note that `outputVars` and `row` can't both be null.
*/
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
val inputVars =
val inputVarsCandidate =
if (outputVars != null) {
assert(outputVars.length == output.length)
// outputVars will be used to generate the code for UnsafeRow, so we should copy them
outputVars.map(_.copy()) match {
case stream: Stream[ExprCode] => stream.force
case other => other
}
outputVars.map(_.copy())
} else {
assert(row != null, "outputVars and row cannot both be null.")
ctx.currentVars = null
Expand All @@ -159,6 +156,11 @@ trait CodegenSupport extends SparkPlan {
}
}

val inputVars = inputVarsCandidate match {
case stream: Stream[ExprCode] => stream.force
case other => other
}

val rowVar = prepareRowVar(ctx, row, outputVars)

// Set up the `currentVars` in the codegen context, as we generate the code of `inputVars`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,13 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {

checkAnswer(abc, Row(1, "a"))
}

test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") {
val groupByCols = Stream(col("key"))
val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value")
.groupBy(groupByCols: _*)
.max("value")

checkAnswer(df, Seq(Row(1, 3), Row(2, 3)))
}
}

0 comments on commit 79c4b90

Please sign in to comment.