Skip to content

Commit

Permalink
Add documentation for dynamic allocation (without configs)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Dec 18, 2014
1 parent 3cd5161 commit 8c64004
Showing 1 changed file with 106 additions and 0 deletions.
106 changes: 106 additions & 0 deletions docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,112 @@ the same RDDs. For example, the [Shark](http://shark.cs.berkeley.edu) JDBC serve
queries. In future releases, in-memory storage systems such as [Tachyon](http://tachyon-project.org) will
provide another approach to share RDDs.

## Dynamic Resource Allocation

Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to
your application up and down based on the workload. This means that your application may give
resources back to the cluster if they are no longer used and request them again later when there
is demand. This feature is particularly useful if multiple applications share resources in your
Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be
returned to the cluster's pool of resources and acquired by other applications. In Spark, dynamic
resource allocation is performed on the granularity of the executor and can be enabled through
`spark.dynamicAllocation.enabled`.

This feature is currently disabled by default and available only on [YARN](running-on-yarn.html).
A future release will extend this to [standalone mode](spark-standalone.html) and
[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that although Spark on
Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling
dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency
scheduling while sharing cluster resources efficiently.

Lastly, it is worth noting that Spark's dynamic resource allocation mechanism is cooperative.
This means if a Spark application enables this feature, other applications on the same cluster
are also expected to do so. Otherwise, the cluster's resources will end up being unfairly
distributed to the applications that do not voluntarily give up unused resources they have
acquired.

### Configuration and Setup

All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace.
To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and
provide lower and upper bounds for the number of executors through
`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant
configurations are described on the [configurations page](configuration.html) and in the subsequent
sections in detail.

Additionally, your application must use an external shuffle service (described below). To enable
this, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service is
implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager`
in your cluster. To start this service, follow these steps:

1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
pre-packaged distribution.
2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
`$SPARK_HOME/network/yarn/target/scala-<version>` if you are building Spark yourself, and under
`lib` if you are using a distribution.
2. Add this jar to the classpath of all `NodeManager`s in your cluster.
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
`org.apache.spark.yarn.network.YarnShuffleService`. Additionally, set all relevant
`spark.shuffle.service.*` [configurations](configuration.html).
4. Restart all `NodeManager`s in your cluster.

### Resource Allocation Policy

On a high level, Spark should relinquish executors when they are no longer used and acquire
executors when they are needed. Since there is no definitive way to predict whether an executor
that is about to be removed will run a task in the near future, or whether a new executor that is
about to be added will actually be idle, we need a set of heuristics to determine when to remove
and request executors.

#### Request Policy

A Spark application with dynamic allocation enabled requests additional executors when it has
pending tasks waiting to be scheduled. This condition necessarily implies that the existing set
of executors is insufficient to simultaneously saturate all tasks that have been submitted but
not yet finished.

Spark requests executors in rounds. The actual request is triggered when there have been pending
tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then triggered again
every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds thereafter if the queue
of pending tasks persists. Additionally, the number of executors requested in each round increases
exponentially from the previous round. For instance, an application will add 1 executor in the
first round, and then 2, 4, 8 and so on executors in the subsequent rounds.

The motivation for an exponential increase policy is twofold. First, an application should request
executors cautiously in the beginning in case it turns out that only a few additional executors is
sufficient. This echoes the justification for TCP slow start. Second, the application should be
able to ramp up its resource usage in a timely manner in case it turns out that many executors are
actually needed.

#### Remove Policy

The policy for removing executors is much simpler. A Spark application removes an executor when
it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds. Note that,
under most circumstances, this condition is mutually exclusive with the request condition, in that
an executor should not be idle if there are still pending tasks to be scheduled.

### Graceful Decommission of Executors

Before dynamic allocation, a Spark executor exits either on failure or when the associated
application has also exited. In both scenarios, all state associated with the executor is no
longer needed and can be safely discarded. With dynamic allocation, however, the application
is still running when an executor is explicitly removed. If the application attempts to access
state stored in or written by the executor, it will have to perform a recompute the state. Thus,
Spark needs a mechanism to decommission an executor gracefully by preserving its state before
removing it.

This requirement is especially important for shuffles. During a shuffle, the Spark executor first
writes its own map outputs locally to disk, and then acts as the server for those files when other
executors attempt to fetch them. In the event of stragglers, which are tasks that run for much
longer than their peers, dynamic allocation may remove an executor before the shuffle completes,
in which case the shuffle files written by that executor must be recomputed unnecessarily.

The solution for preserving shuffle files is to use an external shuffle service, also introduced
in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster
independently of your Spark applications and their executors. If the service is enabled, Spark
executors will fetch shuffle files from the service instead of from each other. This means any
shuffle state written by an executor may continue to be served beyond the executor's lifetime.

# Scheduling Within an Application

Expand Down

0 comments on commit 8c64004

Please sign in to comment.