Skip to content

Commit

Permalink
[SPARK-16630][YARN] Blacklist a node if executors won't launch on it
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This change extends YARN resource allocation handling with blacklisting functionality.
This handles cases when node is messed up or misconfigured such that a container won't launch on it. Before this change backlisting only focused on task execution but this change introduces YarnAllocatorBlacklistTracker which tracks allocation failures per host (when enabled via "spark.yarn.blacklist.executor.launch.blacklisting.enabled").

## How was this patch tested?

### With unit tests

Including a new suite: YarnAllocatorBlacklistTrackerSuite.

#### Manually

It was tested on a cluster by deleting the Spark jars on one of the node.

#### Behaviour before these changes

Starting Spark as:
```
spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6"
```

Log is:
```
18/04/12 06:49:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://apiros-1.gce.test.com:8020/user/systest/.sparkStaging/application_1523459048274_0016
18/04/12 06:49:39 INFO util.ShutdownHookManager: Shutdown hook called
```

#### Behaviour after these changes

Starting Spark as:
```
spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6" --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true"
```

And the log is:
```
18/04/13 05:37:43 INFO yarn.YarnAllocator: Will request 1 executor container(s), each with 1 core(s) and 4505 MB memory (including 409 MB of overhead)
18/04/13 05:37:43 INFO yarn.YarnAllocator: Submitted 1 unlocalized container requests.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Launching container container_1523459048274_0025_01_000008 on host apiros-4.gce.test.com for executor with ID 6
18/04/13 05:37:43 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Completed container container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com (state: COMPLETE, exit status: 1)
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com)
18/04/13 05:37:43 WARN yarn.YarnAllocator: Container marked as failed: container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1523459048274_0025_01_000007
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
        at org.apache.hadoop.util.Shell.run(Shell.java:507)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
```

Where the most important part is:

```
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com)
```

And execution was continued (no shutdown called).

### Testing the backlisting of the whole cluster

Starting Spark with YARN blacklisting enabled then removing a the Spark core jar one by one from all the cluster nodes. Then executing a simple spark job which fails checking the yarn log the expected exit status is contained:

```
18/06/15 01:07:10 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Due to executor failures all available nodes are blacklisted)
18/06/15 01:07:13 INFO util.ShutdownHookManager: Shutdown hook called
```

Author: “attilapiros” <piros.attila.zsolt@gmail.com>

Closes apache#21068 from attilapiros/SPARK-16630.
  • Loading branch information
attilapiros authored and squito committed Jun 21, 2018
1 parent c0cad59 commit b56e9c6
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private[scheduler] class BlacklistTracker (

}

private[scheduler] object BlacklistTracker extends Logging {
private[spark] object BlacklistTracker extends Logging {

private val DEFAULT_TIMEOUT = "1h"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist != null &&
scheduler.nodeBlacklist.contains(hostname)) {
} else if (scheduler.nodeBlacklist.contains(hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark

import java.util.concurrent.{ExecutorService, TimeUnit}

import scala.collection.Map
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
Expand Down Expand Up @@ -73,6 +72,7 @@ class HeartbeatReceiverSuite
sc = spy(new SparkContext(conf))
scheduler = mock(classOf[TaskSchedulerImpl])
when(sc.taskScheduler).thenReturn(scheduler)
when(scheduler.nodeBlacklist).thenReturn(Predef.Set[String]())
when(scheduler.sc).thenReturn(sc)
heartbeatReceiverClock = new ManualClock
heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
Expand Down Expand Up @@ -241,7 +241,7 @@ class HeartbeatReceiverSuite
} === Some(true))
}

private def getTrackedExecutors: Map[String, Long] = {
private def getTrackedExecutors: collection.Map[String, Long] = {
// We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend,
// so exclude it from the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
Expand Down Expand Up @@ -272,7 +272,7 @@ private class FakeSchedulerBackend(

protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String]))
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty))
}

protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
Expand Down
10 changes: 10 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,16 @@ To use a custom metrics.properties for the application master and executors, upd
name matches both the include and the exclude pattern, this file will be excluded eventually.
</td>
</tr>
<tr>
<td><code>spark.yarn.blacklist.executor.launch.blacklisting.enabled</code></td>
<td>false</td>
<td>
Flag to enable blacklisting of nodes having YARN resource allocation problems.
The error limit for blacklisting can be configured by
<code>spark.blacklist.application.maxFailedExecutorsPerNode</code>.
</td>
</tr>

</table>

# Important notes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.nodeBlacklist).thenReturn(Set[String]())
when(taskScheduler.sc).thenReturn(sc)

externalShuffleClient = mock[MesosExternalShuffleClient]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeBlacklisted) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are blacklisted")
} else {
logDebug("Sending progress")
allocator.allocateResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.regex.Pattern

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.control.NonFatal

import org.apache.hadoop.yarn.api.records._
Expand Down Expand Up @@ -66,7 +66,8 @@ private[yarn] class YarnAllocator(
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver)
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging {

import YarnAllocator._
Expand Down Expand Up @@ -102,18 +103,14 @@ private[yarn] class YarnAllocator(
private var executorIdCounter: Int =
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)

// Queue to store the timestamp of failed executors
private val failedExecutorsTimeStamps = new Queue[Long]()
private[spark] val failureTracker = new FailureTracker(sparkConf, clock)

private var clock: Clock = new SystemClock

private val executorFailuresValidityInterval =
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
private val allocatorBlacklistTracker =
new YarnAllocatorBlacklistTracker(sparkConf, amClient, failureTracker)

@volatile private var targetNumExecutors =
SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)

private var currentNodeBlacklist = Set.empty[String]

// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
// list of requesters that should be responded to once we find out why the given executor
Expand Down Expand Up @@ -149,7 +146,6 @@ private[yarn] class YarnAllocator(

private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)


// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty

Expand All @@ -160,26 +156,11 @@ private[yarn] class YarnAllocator(
private[yarn] val containerPlacementStrategy =
new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource, resolver)

/**
* Use a different clock for YarnAllocator. This is mainly used for testing.
*/
def setClock(newClock: Clock): Unit = {
clock = newClock
}

def getNumExecutorsRunning: Int = runningExecutors.size()

def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors

while (executorFailuresValidityInterval > 0
&& failedExecutorsTimeStamps.nonEmpty
&& failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) {
failedExecutorsTimeStamps.dequeue()
}

failedExecutorsTimeStamps.size
}
def isAllNodeBlacklisted: Boolean = allocatorBlacklistTracker.isAllNodeBlacklisted

/**
* A sequence of pending container requests that have not yet been fulfilled.
Expand All @@ -204,9 +185,8 @@ private[yarn] class YarnAllocator(
* @param localityAwareTasks number of locality aware tasks to be used as container placement hint
* @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
* container placement hint.
* @param nodeBlacklist a set of blacklisted nodes, which is passed in to avoid allocating new
* containers on them. It will be used to update the application master's
* blacklist.
* @param nodeBlacklist blacklisted nodes, which is passed in to avoid allocating new containers
* on them. It will be used to update the application master's blacklist.
* @return Whether the new requested total is different than the old value.
*/
def requestTotalExecutorsWithPreferredLocalities(
Expand All @@ -220,19 +200,7 @@ private[yarn] class YarnAllocator(
if (requestedTotal != targetNumExecutors) {
logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
targetNumExecutors = requestedTotal

// Update blacklist infomation to YARN ResouceManager for this application,
// in order to avoid allocating new Containers on the problematic nodes.
val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist
val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist
if (blacklistAdditions.nonEmpty) {
logInfo(s"adding nodes to YARN application master's blacklist: $blacklistAdditions")
}
if (blacklistRemovals.nonEmpty) {
logInfo(s"removing nodes from YARN application master's blacklist: $blacklistRemovals")
}
amClient.updateBlacklist(blacklistAdditions.toList.asJava, blacklistRemovals.toList.asJava)
currentNodeBlacklist = nodeBlacklist
allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist)
true
} else {
false
Expand Down Expand Up @@ -268,6 +236,7 @@ private[yarn] class YarnAllocator(
val allocateResponse = amClient.allocate(progressIndicator)

val allocatedContainers = allocateResponse.getAllocatedContainers()
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
Expand Down Expand Up @@ -602,8 +571,9 @@ private[yarn] class YarnAllocator(
completedContainer.getDiagnostics,
PMEM_EXCEEDED_PATTERN))
case _ =>
// Enqueue the timestamp of failed executor
failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
// all the failures which not covered above, like:
// disk failure, kill by app master or resource manager, ...
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
(true, "Container marked as failed: " + containerId + onHostStr +
". Exit status: " + completedContainer.getExitStatus +
". Diagnostics: " + completedContainer.getDiagnostics)
Expand Down
Loading

0 comments on commit b56e9c6

Please sign in to comment.