From 5300aacb3a26085e7098704fe9f05a8944790dda Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Wed, 3 Sep 2014 14:00:52 +0800 Subject: [PATCH] yarn check exit code --- .../spark/deploy/yarn/ApplicationMaster.scala | 69 ++++++++++++------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index b51daeb437516..1bc52d198080e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -52,6 +52,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, .asInstanceOf[YarnConfiguration] private val isDriver = args.userClass != null + private var exitCode = 0 + // Default to numExecutors * 2, with minimum of 3 private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) @@ -95,7 +97,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc != null) { logInfo("Invoking sc stop from shutdown hook") sc.stop() - finish(FinalApplicationStatus.SUCCEEDED) + } + + // Shuts down the AM. + if (!finished) { + finish(finalStatus) } // Cleanup the staging dir after the app is finished, or if it's the last attempt at @@ -123,13 +129,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, } else { runExecutorLauncher(securityMgr) } - - if (finalStatus != FinalApplicationStatus.UNDEFINED) { - finish(finalStatus) - 0 - } else { - 1 - } + exitCode } final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { @@ -386,31 +386,50 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") System.setProperty("spark.executor.instances", args.numExecutors.toString) + var stopped = false val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) userClassThread = new Thread { override def run() { - var status = FinalApplicationStatus.FAILED + finalStatus = FinalApplicationStatus.FAILED try { - // Copy - val mainArgs = new Array[String](args.userArgs.size) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) - mainMethod.invoke(null, mainArgs) - // Some apps have "System.exit(0)" at the end. The user thread will stop here unless - // it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. - status = FinalApplicationStatus.SUCCEEDED - } catch { - case e: InvocationTargetException => - e.getCause match { - case _: InterruptedException => - // Reporter thread can interrupt to stop user class + System.setSecurityManager(new java.lang.SecurityManager() { + override def checkExit(paramInt: Int) { + if (!stopped) { + exitCode = paramInt + if (exitCode == 0) { + finalStatus = FinalApplicationStatus.SUCCEEDED + } + stopped = true + } + } + + override def checkPermission(perm: java.security.Permission): Unit = { - case e => throw e } - } finally { - logDebug("Finishing main") - finalStatus = status + }) + } + catch { + case e: SecurityException => + logError("setSecurityManager:", e) + } + + Utils.tryOrExit { + try { + val mainArgs = new Array[String](args.userArgs.size) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) + mainMethod.invoke(null, mainArgs) + finalStatus = FinalApplicationStatus.SUCCEEDED + } catch { + case e: InvocationTargetException => + e.getCause match { + case _: InterruptedException => + // Reporter thread can interrupt to stop user class + + case e => throw e + } + } } } }