Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1099:Spark's local mode should probably respect spark.cores.max by default #110

Closed
wants to merge 3 commits into from

Conversation

qqsun8819
Copy link
Contributor

This is for JIRA:https://spark-project.atlassian.net/browse/SPARK-1099
And this is what I do in this patch (also commented in the JIRA) @aarondav

This is really a behavioral change, so I do this with great caution, and welcome any review advice:

1 I change the "MASTER=local" pattern of create LocalBackEnd . In the past, we passed 1 core to it . now it use a default cores
The reason here is that when someone use spark-shell to start local mode , Repl will use this "MASTER=local" pattern as default.
So if one also specify cores in the spark-shell command line, it will all go in here. So here pass 1 core is not suitalbe reponding to our change here.
2 In the LocalBackEnd , the "totalCores" variable are fetched following a different rule(in the past it just take in a userd passed cores, like 1 in "MASTER=local" pattern, 2 in "MASTER=local[2]" pattern"
rules:
a The second argument of LocalBackEnd 's constructor indicating cores have a default value which is Int.MaxValue. If user didn't pass it , its first default value is Int.MaxValue
b In getMaxCores, we first compare the former value to Int.MaxValue. if it's not equal, we think that user has passed their desired value, so just use it
c. If b is not satified, we then get cores from spark.cores.max, and we get real logical cores from Runtime. And if cores specified by spark.cores.max is bigger than logical cores, we use logical cores, otherwise we use spark.cores.max
3 In SparkContextSchedulerCreationSuite 's test("local") case, assertion is modified from 1 to logical cores, because "MASTER=local" pattern use default vaules.

qqsun8819 added 2 commits March 9, 2014 14:19
…pecified cores when no cores are passed to it
… construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@ash211
Copy link
Contributor

ash211 commented Mar 10, 2014

I find that new users often wonder why Spark is only using 1 core, and it's because they expected local to use all their cores rather than defaulting to just one. Changing the default to use all the cores available on the machine when MASTER=local makes sense to me.

@@ -1204,7 +1204,7 @@ object SparkContext extends Logging {
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalBackend(scheduler, 1)
val backend = new LocalBackend(scheduler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could simplify the logic by simply putting the default behavior here. Something like

// Use all cores available, up to user-specified limit
val realCores = Runtime.getRuntime.availableProcessors()
val numCores = math.min(realCores, conf.getInt("spark.cores.max", realCores))
val backend = new LocalBackend(scheduler, numCores)

This would allow us to avoid changing much in LocalBackend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aarondav I'll update the patch accroding to your review

@qqsun8819
Copy link
Contributor Author

modify patch according to @aarondav 's review

…res and pass it to original LocalBackend constructor
@aarondav
Copy link
Contributor

This looks good to me, but I will leave this PR for a little longer in case anyone wants to raise questions about changing the behavior here.

@qqsun8819
Copy link
Contributor Author

thanks @aarondav It doesn't matter. And welcome any advice for this patch

@aarondav
Copy link
Contributor

Oops, I screwed up here and never let Jenkins test. I had to revert it because of a build break. I will reopen it in #182.

@qqsun8819 qqsun8819 deleted the local-cores branch April 30, 2016 03:31
mccheah pushed a commit to mccheah/spark that referenced this pull request Mar 16, 2017
cenyuhai added a commit to cenyuhai/spark that referenced this pull request Oct 8, 2017
[SPARK-21774] 数字类型和字符串比较的时候都统一转成double类型进行比较

现在字符串和数值的比较都是把字符串转成跟数值一样的数据格式之后再去比较  
测试case:
`select "1.1" = 1;`  
`"1.1" = 1`这样的判断,如果是把1.1转成int类型之后就是1了,它就和1相等了...  
resolve apache#110

See merge request !67
Igosuki pushed a commit to Adikteev/spark that referenced this pull request Jul 31, 2018
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
- Improve the `sed` statement to avoid using specific line number.
- Remove the healthcheck woorkaround since it has been fixed in upstream
- Extend the time to wait LB deployment.
- Do not need collect-k8s-logs role
fishcus pushed a commit to fishcus/spark that referenced this pull request Jun 12, 2020
upgrade version to 2.2.1-kylin-r10
wangyum added a commit that referenced this pull request Apr 7, 2022
### What changes were proposed in this pull request?

Use sideBySide to format the log plan in `AdaptiveSparkPlanExec`.
Before:
```
12:08:36.876 ERROR org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec: Plan changed from SortMergeJoin [key#13], [a#23], Inner
:- Sort [key#13 ASC NULLS FIRST], false, 0
:  +- ShuffleQueryStage 0
:     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]
:        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
:           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
:              +- Scan[obj#12]
+- Sort [a#23 ASC NULLS FIRST], false, 0
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]
         +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]
 to BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#145]
:  +- ShuffleQueryStage 0
:     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]
:        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
:           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
:              +- Scan[obj#12]
+- ShuffleQueryStage 1
   +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]
      +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
         +- Scan[obj#22]
```

After:
```
15:57:59.481 ERROR org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec: Plan changed:
!SortMergeJoin [key#13], [a#23], Inner                                                                                                                                                                                                                                                                                                                                         BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
!:- Sort [key#13 ASC NULLS FIRST], false, 0                                                                                                                                                                                                                                                                                                                                    :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#145]
 :  +- ShuffleQueryStage 0                                                                                                                                                                                                                                                                                                                                                     :  +- ShuffleQueryStage 0
 :     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]                                                                                                                                                                                                                                                                                                 :     +- Exchange hashpartitioning(key#13, 5), ENSURE_REQUIREMENTS, [id=#110]
 :        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))                                                                                                                                                                                                                                                                                                              :        +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
 :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]   :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
 :              +- Scan[obj#12]                                                                                                                                                                                                                                                                                                                                                :              +- Scan[obj#12]
!+- Sort [a#23 ASC NULLS FIRST], false, 0                                                                                                                                                                                                                                                                                                                                      +- ShuffleQueryStage 1
!   +- ShuffleQueryStage 1                                                                                                                                                                                                                                                                                                                                                        +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]
!      +- Exchange hashpartitioning(a#23, 5), ENSURE_REQUIREMENTS, [id=#129]                                                                                                                                                                                                                                                                                                         +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
!         +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]                                                                                                                                  +- Scan[obj#22]
!            +- Scan[obj#22]
```

### Why are the changes needed?

Enhance readability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual testing.

Closes #36045 from wangyum/SPARK-38772.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
cloud-fan pushed a commit that referenced this pull request Apr 20, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <peter.toth@gmail.com>
Co-authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Apr 20, 2022
### What changes were proposed in this pull request?
This PR adds a new optimizer rule `MergeScalarSubqueries` to merge multiple non-correlated `ScalarSubquery`s to compute multiple scalar values once.

E.g. the following query:
```
SELECT
  (SELECT avg(a) FROM t),
  (SELECT sum(b) FROM t)
```
is optimized from:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [] AS scalarsubquery()#253, scalar-subquery#243 [] AS scalarsubquery()#254L]
:  :- Aggregate [avg(a#244) AS avg(a)#247]
:  :  +- Project [a#244]
:  :     +- Relation default.t[a#244,b#245] parquet
:  +- Aggregate [sum(a#251) AS sum(a)#250L]
:     +- Project [a#251]
:        +- Relation default.t[a#251,b#252] parquet
+- OneRowRelation
```
to:
```
== Optimized Logical Plan ==
Project [scalar-subquery#242 [].avg(a) AS scalarsubquery()#253, scalar-subquery#243 [].sum(a) AS scalarsubquery()#254L]
:  :- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:  :  +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:  :     +- Project [a#244]
:  :        +- Relation default.t[a#244,b#245] parquet
:  +- Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
:     +- Aggregate [avg(a#244) AS avg(a)#247, sum(a#244) AS sum(a)#250L]
:        +- Project [a#244]
:           +- Relation default.t[a#244,b#245] parquet
+- OneRowRelation
```
and in the physical plan subqueries are reused:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Project [Subquery subquery#242, [id=#113].avg(a) AS scalarsubquery()#253, ReusedSubquery Subquery subquery#242, [id=#113].sum(a) AS scalarsubquery()#254L]
   :  :- Subquery subquery#242, [id=#113]
   :  :  +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- *(2) HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- ShuffleQueryStage 0
                  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#158]
                     +- *(1) HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
         +- == Initial Plan ==
            Project [named_struct(avg(a), avg(a)#247, sum(a), sum(a)#250L) AS mergedValue#260]
            +- HashAggregate(keys=[], functions=[avg(a#244), sum(a#244)], output=[avg(a)#247, sum(a)#250L])
               +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#110]
                  +- HashAggregate(keys=[], functions=[partial_avg(a#244), partial_sum(a#244)], output=[sum#262, count#263L, sum#264L])
                     +- FileScan parquet default.t[a#244] Batched: true, DataFilters: [], Format: Parquet, Location: ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int>
   :  +- ReusedSubquery Subquery subquery#242, [id=#113]
   +- *(1) Scan OneRowRelation[]
+- == Initial Plan ==
...
```

Please note that the above simple example could be easily optimized into a common select expression without reuse node, but this PR can handle more complex queries as well.

### Why are the changes needed?
Performance improvement.
```
[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9 - MergeScalarSubqueries off                    50798          52521        1423          0.0      Infinity       1.0X
[info] q9 - MergeScalarSubqueries on                     19484          19675         226          0.0      Infinity       2.6X

[info] TPCDS Snappy:                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] q9b - MergeScalarSubqueries off                   15430          17803         NaN          0.0      Infinity       1.0X
[info] q9b - MergeScalarSubqueries on                     3862           4002         196          0.0      Infinity       4.0X
```
Please find `q9b` in the description of SPARK-34079. It is a variant of [q9.sql](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q9.sql) using CTE.
The performance improvement in case of `q9` comes from merging 15 subqueries into 5 and in case of `q9b` it comes from merging 5 subqueries into 1.

### Does this PR introduce _any_ user-facing change?
No. But this optimization can be disabled with `spark.sql.optimizer.excludedRules` config.

### How was this patch tested?
Existing and new UTs.

Closes #32298 from peter-toth/SPARK-34079-multi-column-scalar-subquery.

Lead-authored-by: Peter Toth <peter.toth@gmail.com>
Co-authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e00b81e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants