Skip to content

Commit

Permalink
[SPARK-49977][SQL] Use stack-based iterative computation to avoid cre…
Browse files Browse the repository at this point in the history
…ating many Scala List objects for deep expression trees

### What changes were proposed in this pull request?
In some use cases with deep expression trees, the driver's heap shows many `scala.collection.immutable.$colon$colon` objects from the heap. The objects are allocated due to deep recursion in the `gatherCommutative` method which uses `flatmap` recursively. Each invocation of `flatmap` creates a new temporary Scala collection. Our claim is based on the following stack trace (>1K lines) of a thread in the driver below, truncated here for brevity:
```
"HiveServer2-Background-Pool: Thread-9867" #9867 daemon prio=5 os_prio=0 tid=0x00007f35080bf000 nid=0x33e7 runnable [0x00007f3393372000]
   java.lang.Thread.State: RUNNABLE
   	at scala.collection.immutable.List$Appender$1.apply(List.scala:350)
	at scala.collection.immutable.List$Appender$1.apply(List.scala:341)
	at scala.collection.immutable.List.flatMap(List.scala:431)
	at org.apache.spark.sql.catalyst.expressions.CommutativeExpression.gatherCommutative(Expression.scala:1479)
	at org.apache.spark.sql.catalyst.expressions.CommutativeExpression.$anonfun$gatherCommutative$1(Expression.scala:1479)
	at org.apache.spark.sql.catalyst.expressions.CommutativeExpression$$Lambda$5280/143713747.apply(Unknown Source)
	at scala.collection.immutable.List.flatMap(List.scala:366)
....
	at org.apache.spark.sql.catalyst.expressions.CommutativeExpression.gatherCommutative(Expression.scala:1479)
	at org.apache.spark.sql.catalyst.expressions.CommutativeExpression.$anonfun$gatherCommutative$1(Expression.scala:1479)
	at org.apache.spark.sql.catalyst.expressions.CommutativeExpression$$Lambda$5280/143713747.apply(Unknown Source)
	at scala.collection.immutable.List.flatMap(List.scala:366)
....
```

This PR fixes the issue by using a stack-based iterative computation, completely avoiding the creation of temporary Scala objects.

### Why are the changes needed?

Reduce heap usage of the driver

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests, refactor

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #48481 from utkarsh39/SPARK-49977.

Lead-authored-by: Utkarsh <utkarsh.agarwal@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
utkarsh39 and cloud-fan committed Oct 17, 2024
1 parent e374b94 commit 175d563
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1347,9 +1347,21 @@ trait CommutativeExpression extends Expression {
/** Collects adjacent commutative operations. */
private def gatherCommutative(
e: Expression,
f: PartialFunction[CommutativeExpression, Seq[Expression]]): Seq[Expression] = e match {
case c: CommutativeExpression if f.isDefinedAt(c) => f(c).flatMap(gatherCommutative(_, f))
case other => other.canonicalized :: Nil
f: PartialFunction[CommutativeExpression, Seq[Expression]]): Seq[Expression] = {
val resultBuffer = scala.collection.mutable.Buffer[Expression]()
val stack = scala.collection.mutable.Stack[Expression](e)

// [SPARK-49977]: Use iterative approach to avoid creating many temporary List objects
// for deep expression trees through recursion.
while (stack.nonEmpty) {
stack.pop() match {
case c: CommutativeExpression if f.isDefinedAt(c) =>
stack.pushAll(f(c))
case other =>
resultBuffer += other.canonicalized
}
}
resultBuffer.toSeq
}

/**
Expand Down

0 comments on commit 175d563

Please sign in to comment.