Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1331] Added graceful shutdown to Spark Streaming #247

Closed
wants to merge 7 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Mar 27, 2014

Current version of StreamingContext.stop() directly kills all the data receivers (NetworkReceiver) without waiting for the data already received to be persisted and processed. This PR provides the fix. Now, when the StreamingContext.stop() is called, the following sequence of steps will happen.

  1. The driver will send a stop signal to all the active receivers.
  2. Each receiver, when it gets a stop signal from the driver, first stop receiving more data, then waits for the thread that persists data blocks to BlockManager to finish persisting all receive data, and finally quits.
  3. After all the receivers have stopped, the driver will wait for the Job Generator and Job Scheduler to finish processing all the received data.

It also fixes the semantics of StreamingContext.start and stop. It will throw appropriate errors and warnings if stop() is called before start(), stop() is called twice, etc.

@witgo
Copy link
Contributor

witgo commented Mar 27, 2014

Add a lifecycle interface is a good idea?

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@AmplabJenkins
Copy link

Build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13491/

@tdas
Copy link
Contributor Author

tdas commented Mar 27, 2014

Jenkins, test this again.

@tdas
Copy link
Contributor Author

tdas commented Mar 27, 2014

@pwendell

@@ -158,6 +158,15 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object ContextState extends Enumeration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: but how would you feel about calling this StreamingContextState? I found it a bit confusing when looking at it directly. Then you could probably even remove the doc because it would be totally clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. But keeping the doc in. Extra doc do no harm. ;)

@tdas
Copy link
Contributor Author

tdas commented Apr 3, 2014

@witgo I completely agree that a better lifecycle interface is necessary. That has been addressed by a different PR #300. This PR only adds the changes necessary to the other components of the system (NetworkInputTracker, JobScheduler, JobGenerator, etc.) to allow graceful shutdown.

@pwendell
Copy link
Contributor

pwendell commented Apr 3, 2014

Hey TD - Did a pass. Had some questions and surface level comment.

One thing that came up a few was the use of a busy-wait/Thread.sleep along with interruption. Ideally rather than calling sleep in a loop you would sleep on a condition variable and notify it in the other thread. With the current approach you anyways have to keep a pointer to the thread so it knows what to interrupt. Why not just have a CV?

@AmplabJenkins
Copy link

Build triggered.

@AmplabJenkins
Copy link

Build started.

@tdas
Copy link
Contributor Author

tdas commented Apr 4, 2014

Hey @pwendell I did a pass as well. Some of the network receiver related changes I have punted for now as those have actually changed with #300 and its best to analyze them in context of those changes.

tdas added 2 commits April 7, 2014 17:00
…tdown

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
	streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
	streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13865/

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13866/

@pwendell
Copy link
Contributor

pwendell commented Apr 8, 2014

Looks good TD, merging this.

@asfgit asfgit closed this in 83ac9a4 Apr 8, 2014
asfgit pushed a commit that referenced this pull request Apr 22, 2014
…eam API [WIP]

The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51

Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability.

Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented.

This PR is blocked on the graceful shutdown PR #247

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #300 from tdas/network-receiver-api and squashes the following commits:

ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers.
a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues.
91bfa72 [Tathagata Das] Fixed bugs.
8533094 [Tathagata Das] Scala style fixes.
028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver.
43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread.
9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable.
a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
(cherry picked from commit 04c37b6)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
asfgit pushed a commit that referenced this pull request Apr 22, 2014
…eam API [WIP]

The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51

Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability.

Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented.

This PR is blocked on the graceful shutdown PR #247

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #300 from tdas/network-receiver-api and squashes the following commits:

ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers.
a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues.
91bfa72 [Tathagata Das] Fixed bugs.
8533094 [Tathagata Das] Scala style fixes.
028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver.
43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread.
9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable.
a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Current version of StreamingContext.stop() directly kills all the data receivers (NetworkReceiver) without waiting for the data already received to be persisted and processed. This PR provides the fix. Now, when the StreamingContext.stop() is called, the following sequence of steps will happen.
1. The driver will send a stop signal to all the active receivers.
2. Each receiver, when it gets a stop signal from the driver, first stop receiving more data, then waits for the thread that persists data blocks to BlockManager to finish persisting all receive data, and finally quits.
3. After all the receivers have stopped, the driver will wait for the Job Generator and Job Scheduler to finish processing all the received data.

It also fixes the semantics of StreamingContext.start and stop. It will throw appropriate errors and warnings if stop() is called before start(), stop() is called twice, etc.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#247 from tdas/graceful-shutdown and squashes the following commits:

61c0016 [Tathagata Das] Updated MIMA binary check excludes.
ae1d39b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into graceful-shutdown
6b59cfc [Tathagata Das] Minor changes based on Andrew's comment on PR.
d0b8d65 [Tathagata Das] Reduced time taken by graceful shutdown unit test.
f55bc67 [Tathagata Das] Fix scalastyle
c69b3a7 [Tathagata Das] Updates based on Patrick's comments.
c43b8ae [Tathagata Das] Added graceful shutdown to Spark Streaming.
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
…eam API [WIP]

The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51

Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability.

Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented.

This PR is blocked on the graceful shutdown PR apache#247

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#300 from tdas/network-receiver-api and squashes the following commits:

ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers.
a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues.
91bfa72 [Tathagata Das] Fixed bugs.
8533094 [Tathagata Das] Scala style fixes.
028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver.
43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread.
9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable.
a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
liancheng pushed a commit to liancheng/spark that referenced this pull request Mar 17, 2017
…ckage

## What changes were proposed in this pull request?
As part of the effort of clearly separating out Edge functionality, we're hereby moving all code related to the Directory Commit protocol to its own package.

A few things about this change are not ideal - had to:
  - import com.databricks in `PartitioningAwareFileIndex`
  - open up `DirectoryAtomicReadProtocol.testingFs` (for testig hack)
  - write ugly code for getting configs from `SparkEnv.conf`, because of not having access to `ConfigEntry`
  - duplicate a bunch of utility classes:  `Clock`, `ManualClock`, `SystemClock`, `ThreadUtils`

... but most of these (except the last) should hopefully be resolved by [SC-5838](https://databricks.atlassian.net/browse/SC-5838).

## How was this patch tested?
spark-sql tests

Author: Adrian Ionescu <adrian@databricks.com>

Closes apache#247 from adrian-ionescu/SC-5835.
mccheah pushed a commit to mccheah/spark that referenced this pull request Oct 12, 2017
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
cloud-fan pushed a commit that referenced this pull request Apr 20, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <peter.toth@gmail.com>
Co-authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Apr 20, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <peter.toth@gmail.com>
Co-authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e00b81e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants