Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YARN][SPARK-3293]Fix yarn's web show "SUCCEEDED" when the driver throw a exception in yarn-client #3508

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1929,7 +1929,6 @@ object SparkContext extends Logging {
Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ object SparkSubmit {
case null => throw e
}
}

System.exit(0)
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ private[spark] object CoarseGrainedClusterMessages {
case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String)
extends CoarseGrainedClusterMessage

// Send the exit status to the ExecutorLauncher in yarn-client mode, 0: SUCCEEDED and 1: FAILED
case class SendAmExitStatus(status: Int) extends CoarseGrainedClusterMessage

// Stop the ExecutorLauncher in yarn client mode
case object StopExecutorLauncher extends CoarseGrainedClusterMessage

// Messages exchanged between the driver and the cluster manager for executor allocation
// In Yarn mode, these are exchanged between the driver and the AM

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.scheduler.cluster

import java.security.{ProtectionDomain, Permission, Policy}

import akka.actor.{Actor, ActorRef, Props}
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.SparkContext
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.ui.JettyUtils
Expand Down Expand Up @@ -69,6 +71,16 @@ private[spark] abstract class YarnSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}

// Make sure stopExecutorLauncher message only sent once
var isStopExecutorLauncher: Boolean = false

def stopExecutorLauncher(): Unit = {
if (!isStopExecutorLauncher) {
isStopExecutorLauncher = true
yarnSchedulerActor ! StopExecutorLauncher
}
}

/**
* Add filters to the SparkUI.
*/
Expand All @@ -91,6 +103,57 @@ private[spark] abstract class YarnSchedulerBackend(
}
}

/**
* This system security manager applies to the entire process.
* It's main purpose is to handle the case if the user code does a System.exit.
* This allows us to catch that and properly set the YARN application status and
* cleanup if needed.
*/
private def setupSystemSecurityManager(amActor: ActorRef): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The securityManager in the AM was causing a performance impact and we just removed it. I expect the same issue to happen here. #3484

try {
System.setSecurityManager(new java.lang.SecurityManager() {
override def checkExit(paramInt: Int) {
logInfo("In securityManager checkExit, exit code: " + paramInt)
if (paramInt == 0) {
amActor ! SendAmExitStatus(0)
} else {
amActor ! SendAmExitStatus(1)
}
}

// required for the checkExit to work properly
override def checkPermission(perm: java.security.Permission): Unit = {}
}
)
}
catch {
case e: SecurityException =>
amActor ! SendAmExitStatus(1)
logError("Error in setSecurityManager:", e)
}
}

private def setupUserPolicy(): Unit = {
Policy.setPolicy(new UserPolicy())
}

/**
* Set the permission for javax.management.MBeanTrustPermission to register.
* Otherwise, it will throw AccessControlException in metrics.JmxReporter.
*/
private class UserPolicy extends java.security.Policy {
private final val defaultPolicy: Policy = Policy.getPolicy

override def implies(domain: ProtectionDomain, permission: Permission): Boolean = {
if (permission.isInstanceOf[javax.management.MBeanTrustPermission]) {
return true
}
else {
return defaultPolicy.implies(domain, permission)
}
}
}

/**
* An actor that communicates with the ApplicationMaster.
*/
Expand All @@ -105,6 +168,8 @@ private[spark] abstract class YarnSchedulerBackend(
override def receive = {
case RegisterClusterManager =>
logInfo(s"ApplicationMaster registered as $sender")
setupSystemSecurityManager(sender)
setupUserPolicy
amActor = Some(sender)

case r: RequestExecutors =>
Expand All @@ -129,6 +194,14 @@ private[spark] abstract class YarnSchedulerBackend(
addWebUIFilter(filterName, filterParams, proxyBase)
sender ! true

case StopExecutorLauncher =>
amActor match {
case Some(actor) =>
actor ! SendAmExitStatus(0)
case None =>
logWarning("Attempted to stop executorLauncher before the AM has registered!")
}

case d: DisassociatedEvent =>
if (amActor.isDefined && sender == amActor.get) {
logWarning(s"ApplicationMaster has disassociated: $d")
Expand All @@ -137,6 +210,6 @@ private[spark] abstract class YarnSchedulerBackend(
}
}

private[spark] object YarnSchedulerBackend {
private[spark] object YarnSchedulerBackend extends Logging{
val ACTOR_NAME = "YarnScheduler"
}
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
override def receive = {
case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
unregister(FinalApplicationStatus.FAILED, "Driver has been dead, no need to retry the AM.")

case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x")
Expand All @@ -487,6 +487,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
sender ! true

case SendAmExitStatus(status) =>
logInfo(s"Received exit code $status from driver.")
if (0 == status) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
} else {
unregister(FinalApplicationStatus.FAILED, "Driver has been dead, no need to retry the AM.")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private[spark] class YarnClientSchedulerBackend(
* Stop the scheduler. This assumes `start()` has already been called.
*/
override def stop() {
stopExecutorLauncher()
assert(client != null, "Attempted to stop this scheduler before starting it!")
stopping = true
super.stop()
Expand Down