Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2521] Broadcast RDD object (instead of sending it along with every task) #1498

Closed
wants to merge 12 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Jul 20, 2014

This is a resubmission of #1452. It was reverted because it broke the build.

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

  1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
  2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:

val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count

Numbers on 3 r3.8xlarge instances on EC2

master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s

@SparkQA
Copy link

SparkQA commented Jul 20, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16863/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16889/consoleFull

@rxin
Copy link
Contributor Author

rxin commented Jul 21, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 21, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16893/consoleFull

@pwendell
Copy link
Contributor

Jenkins, retest this please.

@rxin
Copy link
Contributor Author

rxin commented Jul 22, 2014

Somehow this makes the unit test taking very long to finish. I also suspect there is some racing condition in the cleaning code. This PR makes them manifest more often.

@SparkQA
Copy link

SparkQA commented Jul 22, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16952/consoleFull

@pwendell
Copy link
Contributor

I was doing some random other thing locally and noticed that there was some weird issue with synchronization around the TorrentBroadcast lock (there is a single shared lock used a bunch inside of that) a bunch of tasks were waiting for the lock for a long time. Maybe somehow that is slowing things down the tests as well.

@mosharaf
Copy link
Contributor

@pwendell is there any specific test that has become very slow?

I've just taken another look at those synchronized blocks. They are in different code-paths (send vs receive) and shouldn't block each other. Some could be removed though.

@SparkQA
Copy link

SparkQA commented Jul 23, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17016/consoleFull

@rxin
Copy link
Contributor Author

rxin commented Jul 24, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17082/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 24, 2014

QA results for PR 1498:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17082/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 25, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17176/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 25, 2014

QA results for PR 1498:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17176/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17288/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA results for PR 1498:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17288/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17311/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 28, 2014

QA results for PR 1498:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17311/consoleFull

@@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.io.{NotSerializableException, PrintWriter, StringWriter}
import java.io.{NotSerializableException}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix formatting here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

must be wip :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

@rxin
Copy link
Contributor Author

rxin commented Jul 28, 2014

Ok the tests seem to be running ok now speed wise (after #1604). Need to fix correctness though.

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17327/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17328/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1498:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17327/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1498:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17328/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17333/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 29, 2014

QA results for PR 1498:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17333/consoleFull

@rxin
Copy link
Contributor Author

rxin commented Jul 29, 2014

Ok I pushed a new version that also broadcasts the final task closure as well as the shuffle dependency. This one should be good to go (pending Jenkins happiness).


import scala.collection.mutable.HashMap
import scala.language.existentials
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we decide to put these language features at the very top of the import list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about it. Where is it discussed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah dunno, I just thought we were doing that, but I guess it's not in https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports. No need to do it then.

@mateiz
Copy link
Contributor

mateiz commented Jul 30, 2014

I did a pass through this -- looks pretty good.

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA tests have started for PR 1498. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17428/consoleFull

@@ -36,20 +38,24 @@ abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just FYI - this is an API breaking change... probably not a huge deal, but FYI

@SparkQA
Copy link

SparkQA commented Jul 30, 2014

QA results for PR 1498:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class Dependency[T] extends Serializable {
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17428/consoleFull

@rxin
Copy link
Contributor Author

rxin commented Jul 30, 2014

Ok merging this. Thanks for reviewing.

@@ -155,19 +155,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
override def getPartitions: Array[Partition] = Array(onlySplit)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
if (shouldFail) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for changing this test in RDDSuite?

@asfgit asfgit closed this in 774142f Jul 30, 2014
asfgit pushed a commit that referenced this pull request Aug 16, 2014
More detail on the issue is described in [SPARK-3015](https://issues.apache.org/jira/browse/SPARK-3015), but the TLDR is if we send too many blocking Akka messages that are dependent on each other in quick successions, then we end up causing a few of these messages to time out and ultimately kill the executors. As of #1498, we broadcast each RDD whether or not it is persisted. This means if we create many RDDs (each of which becomes a broadcast) and the driver performs a GC that cleans up all of these broadcast blocks, then we end up sending many `RemoveBroadcast` messages in parallel and trigger the chain of blocking messages at high frequencies.

We do not know of the Akka-level root cause yet, so this is intended to be a temporary solution until we identify the real issue. I have done some preliminary testing of enabling blocking and observed that the queue length remains quite low (< 1000) even under very intensive workloads.

In the long run, we should do something more sophisticated to allow a limited degree of parallelism through batching clean up tasks or processing them in a sliding window. In the longer run, we should clean up the whole `BlockManager*` message passing interface to avoid unnecessarily awaiting on futures created from Akka asks.

tdas pwendell mengxr

Author: Andrew Or <andrewor14@gmail.com>

Closes #1931 from andrewor14/reference-blocking and squashes the following commits:

d0f7195 [Andrew Or] Merge branch 'master' of github.com:apache/spark into reference-blocking
ce9daf5 [Andrew Or] Remove logic for logging queue length
111192a [Andrew Or] Add missing space in log message (minor)
a183b83 [Andrew Or] Switch order of code blocks (minor)
9fd1fe6 [Andrew Or] Remove outdated log
104b366 [Andrew Or] Use the actual reference queue length
0b7e768 [Andrew Or] Block on cleaning tasks by default + log error on queue full
asfgit pushed a commit that referenced this pull request Aug 16, 2014
More detail on the issue is described in [SPARK-3015](https://issues.apache.org/jira/browse/SPARK-3015), but the TLDR is if we send too many blocking Akka messages that are dependent on each other in quick successions, then we end up causing a few of these messages to time out and ultimately kill the executors. As of #1498, we broadcast each RDD whether or not it is persisted. This means if we create many RDDs (each of which becomes a broadcast) and the driver performs a GC that cleans up all of these broadcast blocks, then we end up sending many `RemoveBroadcast` messages in parallel and trigger the chain of blocking messages at high frequencies.

We do not know of the Akka-level root cause yet, so this is intended to be a temporary solution until we identify the real issue. I have done some preliminary testing of enabling blocking and observed that the queue length remains quite low (< 1000) even under very intensive workloads.

In the long run, we should do something more sophisticated to allow a limited degree of parallelism through batching clean up tasks or processing them in a sliding window. In the longer run, we should clean up the whole `BlockManager*` message passing interface to avoid unnecessarily awaiting on futures created from Akka asks.

tdas pwendell mengxr

Author: Andrew Or <andrewor14@gmail.com>

Closes #1931 from andrewor14/reference-blocking and squashes the following commits:

d0f7195 [Andrew Or] Merge branch 'master' of github.com:apache/spark into reference-blocking
ce9daf5 [Andrew Or] Remove logic for logging queue length
111192a [Andrew Or] Add missing space in log message (minor)
a183b83 [Andrew Or] Switch order of code blocks (minor)
9fd1fe6 [Andrew Or] Remove outdated log
104b366 [Andrew Or] Use the actual reference queue length
0b7e768 [Andrew Or] Block on cleaning tasks by default + log error on queue full
(cherry picked from commit c9da466)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…very task)

This is a resubmission of apache#1452. It was reverted because it broke the build.

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).

A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```

Author: Reynold Xin <rxin@apache.org>

Closes apache#1498 from rxin/broadcast-task and squashes the following commits:

f7364db [Reynold Xin] Code review feedback.
f8535dc [Reynold Xin] Fixed the style violation.
252238d [Reynold Xin] Serialize the final task closure as well as ShuffleDependency in taskBinary.
111007d [Reynold Xin] Fix broadcast tests.
797c247 [Reynold Xin] Properly send SparkListenerStageSubmitted and SparkListenerStageCompleted.
bab1d8b [Reynold Xin] Check for NotSerializableException in submitMissingTasks.
cf38450 [Reynold Xin] Use TorrentBroadcastFactory.
991c002 [Reynold Xin] Use HttpBroadcast.
de779f8 [Reynold Xin] Fix TaskContextSuite.
cc152fc [Reynold Xin] Don't cache the RDD broadcast variable.
d256b45 [Reynold Xin] Fixed unit test failures. One more to go.
cae0af3 [Reynold Xin] [SPARK-2521] Broadcast RDD object (instead of sending it along with every task).
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
More detail on the issue is described in [SPARK-3015](https://issues.apache.org/jira/browse/SPARK-3015), but the TLDR is if we send too many blocking Akka messages that are dependent on each other in quick successions, then we end up causing a few of these messages to time out and ultimately kill the executors. As of apache#1498, we broadcast each RDD whether or not it is persisted. This means if we create many RDDs (each of which becomes a broadcast) and the driver performs a GC that cleans up all of these broadcast blocks, then we end up sending many `RemoveBroadcast` messages in parallel and trigger the chain of blocking messages at high frequencies.

We do not know of the Akka-level root cause yet, so this is intended to be a temporary solution until we identify the real issue. I have done some preliminary testing of enabling blocking and observed that the queue length remains quite low (< 1000) even under very intensive workloads.

In the long run, we should do something more sophisticated to allow a limited degree of parallelism through batching clean up tasks or processing them in a sliding window. In the longer run, we should clean up the whole `BlockManager*` message passing interface to avoid unnecessarily awaiting on futures created from Akka asks.

tdas pwendell mengxr

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#1931 from andrewor14/reference-blocking and squashes the following commits:

d0f7195 [Andrew Or] Merge branch 'master' of github.com:apache/spark into reference-blocking
ce9daf5 [Andrew Or] Remove logic for logging queue length
111192a [Andrew Or] Add missing space in log message (minor)
a183b83 [Andrew Or] Switch order of code blocks (minor)
9fd1fe6 [Andrew Or] Remove outdated log
104b366 [Andrew Or] Use the actual reference queue length
0b7e768 [Andrew Or] Block on cleaning tasks by default + log error on queue full
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 16, 2022
* Disable boson for explain sql test

* Set ADAPTIVE_EXECUTION_FORCE_APPLY to false

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants