diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index 94604f301dd46..584d8de1bd681 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -56,6 +56,112 @@ the same RDDs. For example, the [Shark](http://shark.cs.berkeley.edu) JDBC serve queries. In future releases, in-memory storage systems such as [Tachyon](http://tachyon-project.org) will provide another approach to share RDDs. +## Dynamic Resource Allocation + +Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to +your application up and down based on the workload. This means that your application may give +resources back to the cluster if they are no longer used and request them again later when there +is demand. This feature is particularly useful if multiple applications share resources in your +Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be +returned to the cluster's pool of resources and acquired by other applications. In Spark, dynamic +resource allocation is performed on the granularity of the executor and can be enabled through +`spark.dynamicAllocation.enabled`. + +This feature is currently disabled by default and available only on [YARN](running-on-yarn.html). +A future release will extend this to [standalone mode](spark-standalone.html) and +[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that although Spark on +Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling +dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency +scheduling while sharing cluster resources efficiently. + +Lastly, it is worth noting that Spark's dynamic resource allocation mechanism is cooperative. +This means if a Spark application enables this feature, other applications on the same cluster +are also expected to do so. Otherwise, the cluster's resources will end up being unfairly +distributed to the applications that do not voluntarily give up unused resources they have +acquired. + +### Configuration and Setup + +All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace. +To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and +provide lower and upper bounds for the number of executors through +`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant +configurations are described on the [configurations page](configuration.html) and in the subsequent +sections in detail. + +Additionally, your application must use an external shuffle service (described below). To enable +this, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service is +implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager` +in your cluster. To start this service, follow these steps: + +1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a +pre-packaged distribution. +2. Locate the `spark--yarn-shuffle.jar`. This should be under +`$SPARK_HOME/network/yarn/target/scala-` if you are building Spark yourself, and under +`lib` if you are using a distribution. +2. Add this jar to the classpath of all `NodeManager`s in your cluster. +3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`, +then set `yarn.nodemanager.aux-services.spark_shuffle.class` to +`org.apache.spark.yarn.network.YarnShuffleService`. Additionally, set all relevant +`spark.shuffle.service.*` [configurations](configuration.html). +4. Restart all `NodeManager`s in your cluster. + +### Resource Allocation Policy + +On a high level, Spark should relinquish executors when they are no longer used and acquire +executors when they are needed. Since there is no definitive way to predict whether an executor +that is about to be removed will run a task in the near future, or whether a new executor that is +about to be added will actually be idle, we need a set of heuristics to determine when to remove +and request executors. + +#### Request Policy + +A Spark application with dynamic allocation enabled requests additional executors when it has +pending tasks waiting to be scheduled. This condition necessarily implies that the existing set +of executors is insufficient to simultaneously saturate all tasks that have been submitted but +not yet finished. + +Spark requests executors in rounds. The actual request is triggered when there have been pending +tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then triggered again +every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds thereafter if the queue +of pending tasks persists. Additionally, the number of executors requested in each round increases +exponentially from the previous round. For instance, an application will add 1 executor in the +first round, and then 2, 4, 8 and so on executors in the subsequent rounds. + +The motivation for an exponential increase policy is twofold. First, an application should request +executors cautiously in the beginning in case it turns out that only a few additional executors is +sufficient. This echoes the justification for TCP slow start. Second, the application should be +able to ramp up its resource usage in a timely manner in case it turns out that many executors are +actually needed. + +#### Remove Policy + +The policy for removing executors is much simpler. A Spark application removes an executor when +it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds. Note that, +under most circumstances, this condition is mutually exclusive with the request condition, in that +an executor should not be idle if there are still pending tasks to be scheduled. + +### Graceful Decommission of Executors + +Before dynamic allocation, a Spark executor exits either on failure or when the associated +application has also exited. In both scenarios, all state associated with the executor is no +longer needed and can be safely discarded. With dynamic allocation, however, the application +is still running when an executor is explicitly removed. If the application attempts to access +state stored in or written by the executor, it will have to perform a recompute the state. Thus, +Spark needs a mechanism to decommission an executor gracefully by preserving its state before +removing it. + +This requirement is especially important for shuffles. During a shuffle, the Spark executor first +writes its own map outputs locally to disk, and then acts as the server for those files when other +executors attempt to fetch them. In the event of stragglers, which are tasks that run for much +longer than their peers, dynamic allocation may remove an executor before the shuffle completes, +in which case the shuffle files written by that executor must be recomputed unnecessarily. + +The solution for preserving shuffle files is to use an external shuffle service, also introduced +in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster +independently of your Spark applications and their executors. If the service is enabled, Spark +executors will fetch shuffle files from the service instead of from each other. This means any +shuffle state written by an executor may continue to be served beyond the executor's lifetime. # Scheduling Within an Application