Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark-3822] Ability to add/delete executors from Spark Context. Works in both yarn-client and yarn-cluster mode #2798

Closed
wants to merge 2 commits into from

Conversation

PraveenSeluka
Copy link

sc.addExecutors(count : Int)
sc.deleteExecutors(List[String])
image
Diagram : add/deleteExecutors in Yarn-Client mode

In Yarn-Cluster mode, driver program and Yarn allocator threads are in the same JVM. But, in client mode, they live in different JVM’s. To make this work in both client and cluster mode, we need a way for the driver and Yarn Allocator to communicate and hence uses Actors.

At high level, this works as follows,

  • AutoscaleClientActor is created by SparkContext and AutoscaleServerActor is created by Yarn Application Master code flow.
  • Once AutoscaleServerActor comes up, it registers itself to the AutoscaleClientActor.
  • SparkContext forwards the add/deleteExecutors call through AutoScaleClient -> AutoscaleServerActor.
  • AutoscaleServer has a reference of YarnAllocator. On receiving addExecutors, it just increments the maxExecutor count in YarnAllocator. And, reporter thread will allocate the respective containers. Note that Yarn Allocator now has both numExecutors (initial count of executors) and maxExecutors and reporter thread will request based on maxExecutors
  • YarnAllocator also maintains a map between (executor_id,container_id). Hence to delete executors, we could just call sc.deleteExecutors(List("1","2")) where "1" and "2" are executor id's. We can get the executorId and its storage status from this API - sc.getExecutorStorageStatus.foreach(x=> println(x.blockManagerId.executorId))

Difference when compared to what is proposed

  • This creates separate AutoscaleClient whereas the current design proposed(https://issues.apache.org/jira/secure/attachment/12673181/dynamic-scaling-executors-10-6-14.pdf) was to add these new messages in CoarseGrainedSchedulerBackend itself. One minor advantage of doing this way is, the dynamic scaling itself lives as a separate module in Spark and none of the other Spark Core code is affected much. Also, we could add more messages going forward. If you think, its not required, let me know - I can move it to CoarseGrainedSchedulerBackend itself.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@andrewor14
Copy link
Contributor

I mentioned this in the JIRA, but I'll post it here again. First of all thanks @PraveenSeluka for working on this. However, the JIRA is currently assigned to me and I do intend to implement this as well according to the design doc I posted here SPARK-3174. The interface exposed here is a little different from what I intend, and I would prefer to stick to my original design.

@andrewor14
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Oct 14, 2014

QA tests have started for PR 2798 at commit c4a883f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 14, 2014

QA tests have finished for PR 2798 at commit c4a883f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class AutoscaleClient(actorSystem: ActorSystem) extends Logging
    • case class AddExecutors(count: Int) extends AutoscaleMessage
    • case class DeleteExecutors(execId: List[String]) extends AutoscaleMessage

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21732/
Test FAILed.

@PraveenSeluka
Copy link
Author

@andrewor14 Thanks for your comment. I do see that the JIRA is assigned to you. But only after looking at the design doc(only posted last week), it became evident that I have it implemented already and thought to contribute it (although slightly different, and I have explained this in design doc also). I am ready to make this change to move it to CoarseGrainedSchedulerBackend too (as per your design) if you wish thats the way to go forward.

If you look at the design doc I posted(https://issues.apache.org/jira/secure/attachment/12674773/SparkElasticScalingDesignB.pdf), Autoscaler itself is a separate class - thats also one of the reason why all the code changes related to Autoscaling can be part of org/apache/spark/autoscale/* and it will be cleaner I believe. It also doesnt make any big changes in Spark Core.

@andrewor14
Copy link
Contributor

I see. Good to know that this is a problem important enough for others to have already implemented parts of it. Another part of this bigger design is in progress at #2746, and that already introduces a new class, so I'd rather not introduce another one here. I think the scheduler backend is an appropriate place to do this because much of the communication with the cluster manager is already done there.

@andrewor14
Copy link
Contributor

Hey @PraveenSeluka I took a closer look at this. I think functionality-wise this basically does exactly what we need. I have tested this on a cluster and it seems to work (though there was some thrashing when I removed executors, but I think there's a quick fix for that). However, I think we should move the functionality into the scheduler, because the "autoscaling server" should behave differently for each cluster manager, whereas right now it's specific to Yarn. Also, for Yarn client mode, there is already an existing actor on the AM side (MonitorActor) that we can probably reuse.

I don't know what your timeline looks like, but I would like to get a first-cut working prototype for SPARK-3174 as soon as possible before the 1.2 release (~end of month). Is it OK if I build my prototype on top of the changes in this PR? We'll be sure to include you on the list of contributors for the release.

@PraveenSeluka
Copy link
Author

Thanks @andrewor14 for taking time to test this out.

  • Regarding AutoscaleServer is Yarn Specific => Good point. Here is a proposal to fix this issue so that it works in Mesos mode also.
trait Autoscalable {
  def addExecutors(count: Int)
  def deleteExecutors(execIds: List[String])
}

After this, these will be the following changes

YarnAllocator extends Autoscalable with Logging

private[spark] class AutoscaleServer(allocator: Autoscalable, sparkConf: SparkConf, actorSystem: ActorSystem, isClientRemote : Boolean = true)

Note that, its not yarn specific after this. If we want to use AutoscaleSever for Mesos mode, then Mesos equivalent of YarnAllocator must extend Autoscalable trait. And, Mesos mode can also use AutoscaleSever now

  • On reusing MonitorActor, lets say we use MonitorActor for yarn-client mode. Now, I am not clear on what you are proposing for yarn-cluster mode - will it have a different Actor for yarn-cluster mode ? (if so, wont it be better to keep it consistent)

Let me know if the above sounds good, I have a patch tested and will update immediately.

@PraveenSeluka
Copy link
Author

On using CoarseGrainedSchedulerBackend, here are some thoughts

  • It already receives StopExecutor, RemoveExecutor messages. If we add AddExecutor, DeleteExecutor, it will be very confusing. This was one of the reason I started with AutoscaleClient - to keep it separate and clear.

Please take a look, if you feel it wont be confusing - I will go ahead and add it to CoarseGrainedSchedulerBackend itself.

@andrewor14
Copy link
Contributor

Actually my proposal is more like the following. CoarseGrainedSchedulerBackend will define what you propose as the Autoscaling trait, and each of its subclasses will override it. I prefer to keep the actual autoscaling messages outside of the CoarseGrainedSchedulerBackend$DriverActor because some of them might be specific to the cluster manager. For instance, for Yarn we'll need to save the sender ActorRef as you have done here, whereas for standalone mode we don't need to do this because we already have the akka URL of our Master.

For Yarn, we can just generalize the MonitorActor to become the equivalent of your AutoscalingServer. Currently we only start this on the AM for client mode, but I don't see a reason why we can't also start it for cluster mode. Then, on the scheduler side, my plan is to put the equivalent of your AutoscalingClient in a new parent trait YarnSchedulerBackend that both of the existing Yarn scheduler backends implement. There are already messages in CoarseGrainedSchedulerBackend that are pretty Yarn specific (e.g. addWebUIFilter) and this is an opportunity to pull them out into this new actor.

For standalone mode, the existing communication between the scheduler and the Master goes through this thing called the AppClient, and SparkDeploySchedulerBackend already has a reference to it. I prefer to reuse this channel and add the new messages there rather than introducing a new client actor as we are doing in the Yarn case.

Basically my thinking is to add a general communication channel between the scheduler and the cluster manager, so in the future we can just add messages specific to the cluster manager there instead of having to create another actor. This channel already exists for a subset of the cluster managers (e.g. standalone), so there we can just reuse it. Note that in my proposal I am not introducing any new classes (only Actor subclasses). This feature is ultimately intended to work with #2746, which already introduces a new class for the broader feature of SPARK-3174, so I prefer to just implement this in the existing interfaces.

Does that make sense?

@PraveenSeluka
Copy link
Author

@andrewor14 I have got a full picture of what you are planning to do there. The details are exactly the same as this PR. I have tried to keep the auto-scaling pieces totally separate whereas your design adds to the existing infrastructure. Don't necessarily see that one design is better than other, hence if you plan to build a prototype on top of this PR itself, I am fine with that.

@andrewor14
Copy link
Contributor

Great, thanks. I'll let you know once I have a PR for this.

@andrewor14
Copy link
Contributor

Hey @PraveenSeluka I opened a PR at #2840. Let me know if you have any questions or comments. Thanks for your work!

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

asfgit pushed a commit that referenced this pull request Oct 29, 2014
This is part of a broader effort to enable dynamic scaling of executors ([SPARK-3174](https://issues.apache.org/jira/browse/SPARK-3174)). This is intended to work alongside SPARK-3795 (#2746), SPARK-3796 and SPARK-3797, but is functionally independently of these other issues.

The logic is built on top of PraveenSeluka's changes at #2798. This is different from the changes there in a few major ways: (1) the mechanism is implemented within the existing scheduler backend framework rather than in new `Actor` classes. This also introduces a parent abstract class `YarnSchedulerBackend` to encapsulate common logic to communicate with the Yarn `ApplicationMaster`. (2) The interface of requesting executors exposed to the `SparkContext` is the same, but the communication between the scheduler backend and the AM uses total number executors desired instead of an incremental number. This is discussed in #2746 and explained in the comments in the code.

I have tested this significantly on a stable Yarn cluster.

------------
A remaining task for this issue is to tone down the error messages emitted when an executor is removed.
Currently, `SparkContext` and its components react as if the executor has failed, resulting in many scary error messages and eventual timeouts. While it's not strictly necessary to fix this as of the first-cut implementation of this mechanism, it would be good to add logic to distinguish this case. I prefer to address this in a separate PR. I have filed a separate JIRA for this task at SPARK-4134.

Author: Andrew Or <andrew@databricks.com>
Author: Andrew Or <andrewor14@gmail.com>

Closes #2840 from andrewor14/yarn-scaling-mechanism and squashes the following commits:

485863e [Andrew Or] Minor log message changes
4920be8 [Andrew Or] Clarify that public API is only for Yarn mode for now
1c57804 [Andrew Or] Reword a few comments + other review comments
6321140 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
02836c0 [Andrew Or] Limit scope of synchronization
4e2ed7f [Andrew Or] Fix bug: keep track of removed executors properly
73ade46 [Andrew Or] Wording changes (minor)
2a7a6da [Andrew Or] Add `sc.killExecutor` as a shorthand (minor)
665f229 [Andrew Or] Mima excludes
79aa2df [Andrew Or] Simplify the request interface by asking for a total
04f625b [Andrew Or] Fix race condition that causes over-allocation of executors
f4783f8 [Andrew Or] Change the semantics of requesting executors
005a124 [Andrew Or] Fix tests
4628b16 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
db4a679 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
572f5c5 [Andrew Or] Unused import (minor)
f30261c [Andrew Or] Kill multiple executors rather than one at a time
de260d9 [Andrew Or] Simplify by skipping useless null check
9c52542 [Andrew Or] Simplify by skipping the TaskSchedulerImpl
97dd1a8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-scaling-mechanism
d987b3e [Andrew Or] Move addWebUIFilters to Yarn scheduler backend
7b76d0a [Andrew Or] Expose mechanism in SparkContext as developer API
47466cd [Andrew Or] Refactor common Yarn scheduler backend logic
c4dfaac [Andrew Or] Avoid thrashing when removing executors
53e8145 [Andrew Or] Start yarn actor early to listen for AM registration message
bbee669 [Andrew Or] Add mechanism in yarn client mode
@andrewor14
Copy link
Contributor

Hey @PraveenSeluka now that #2840 mind closing this? Thanks.

@tgravescs
Copy link
Contributor

@PraveenSeluka can we close this?

@asfgit asfgit closed this in 5923dd9 Nov 7, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants