Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3293] yarn's web show "SUCCEEDED" when the driver throw a exception in yarn-client #2311

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
.asInstanceOf[YarnConfiguration]
private val isDriver = args.userClass != null

private var exitCode = 0

// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
Expand Down Expand Up @@ -95,7 +97,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc != null) {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
finish(FinalApplicationStatus.SUCCEEDED)
}

// Shuts down the AM.
if (!finished) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could cause problems with YARN retry logic. We shouldn't be finishing (unregistering with RM) unless we've explicitly succeeded or failed and don't want a retry to happen in those weird cases. Since this is in shutdown hook I'm not sure we explicitly know that.

i guess it comes down to us defining when the spark app is cleanly exiting. This could be failed, killed, or succeeded. Unfortunately I'm not sure spark really has this final reporting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mateiz @pwendell is there anything in spark that we can/should use where we consider the application in a final state (either success or failure) such that we wouldn't want to retry it? On MR there is explicit states for finishing and it also has checks for the committed output. Spark I don't think is quite as straight forward. Do we have any guidance on what an application should do on error and success?

ie can we say if sc.stop() is called then it finished cleanly, or perhaps if System.exit(0) is called. Should we be peaking at the metrics or something to see if any jobs failed with the application. thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand, is the YARN retry logic there for resubmitting the AM? Most Spark applications don't support that, since their driver contains a lot of state. (Though I guess applications submitted in yarn-client mode might).

In general, if the user's driver is running within the AM, I would say it's successful if the driver's main() returns 0 or calls System.exit(0). If the driver is running remotely, it should tell the AM when to shut down cleanly when it closes its SparkContext or exits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the retry logic is for resubmitting the AM in the case it was on machine that went down, slow, network loss, etc..

Ok so we'll go with if it System.exit(0) or returns 0 then it succeeded and everything else for now is failure and will retry if configured to retry. If we don't want it to retry in certain cases we would need more information from the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well my feeling is that if the driver program crashed, it's going to crash again the next time you try. Is that not the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that depends a lot on the crash. If the host where the AM was running just dies, running it on a different host might succeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically there are a few cases:

  • If the machine with the AM dies, then sure, YARN could relaunch it (hopefully that is user-configurable in case their code has weird side effects)
  • If the driver returns 0, it's finished
  • If the driver crashes for another reason, it could be a bug in the driver; I'm not sure we should just resubmit in that case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin But this code is in the AM, isn't it? That's what I'm saying above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, unless we want to add more logic about trying to figure out what shouldn't be retried, the easy thing is to just retry on any failure. Obviously if the machine dies this code won't be running, its more that something weird happens causing it to crash or exit badly.

There are actually some potential issues with rerunning the AM though. One is what we refer to as split brain (one AM losing connection from RM but still running so it starts a second AM) and both write to the same output dir and cause issues with the output data. I filed a jira for this to try to handle in Spark AM.

The second occurs if the fist run had committed its output and we rerun it when shouldn't.
The reason we don't want that to happen is to prevent data corruption. Many times in MR one job will start once anothers output is committed, so if it was to get changed out from under them by a rerun of the AM it could lose data. I'm not sure that same kind of check is as easy with Spark.

MR handles both of those cases. Obviously if your MR job is writing to some other service or using custom fileoutput or has some other side effects its up to the user to guarantee that it can be rerun.

I'm assuming its the users responsibility with Spark since spark can rerun tasks/stages on failure. Any input on that Matei?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our saveAs*File operations in Spark also avoid running if the output directly already exists, so there is that support in there. But for Spark I'd make this AM rerun configurable. The reason is that some Spark apps might be accessing weird data sources, serving a UI to outside users, or just doing fairly complicate logic inside that is hard to retry. These are things that MR jobs don't do as commonly.

finish(finalStatus)
}

// Cleanup the staging dir after the app is finished, or if it's the last attempt at
Expand Down Expand Up @@ -123,13 +129,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
} else {
runExecutorLauncher(securityMgr)
}

if (finalStatus != FinalApplicationStatus.UNDEFINED) {
finish(finalStatus)
0
} else {
1
}
exitCode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like with your changes there's a possibility that finish() might never be called. Might be better to add a call to that method here before returning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind. I see you added the call to the shutdown hook. Still, the finalStatus needs to be properly set in runExecutorLauncher.

}

final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
Expand Down Expand Up @@ -211,7 +211,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

// In client mode the actor will stop the reporter thread.
reporterThread.join()
finalStatus = FinalApplicationStatus.SUCCEEDED
}

private def launchReporterThread(): Thread = {
Expand Down Expand Up @@ -386,31 +385,50 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
var stopped = false
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])

userClassThread = new Thread {
override def run() {
var status = FinalApplicationStatus.FAILED
finalStatus = FinalApplicationStatus.FAILED
try {
// Copy
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
// Some apps have "System.exit(0)" at the end. The user thread will stop here unless
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
status = FinalApplicationStatus.SUCCEEDED
} catch {
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
System.setSecurityManager(new java.lang.SecurityManager() {
override def checkExit(paramInt: Int) {
if (!stopped) {
exitCode = paramInt
if (exitCode == 0) {
finalStatus = FinalApplicationStatus.SUCCEEDED
}
stopped = true
}
}

override def checkPermission(perm: java.security.Permission): Unit = {

case e => throw e
}
} finally {
logDebug("Finishing main")
finalStatus = status
})
}
catch {
case e: SecurityException =>
logError("Error in setSecurityManager:", e)
}

Utils.tryOrExit {
try {
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
finalStatus = FinalApplicationStatus.SUCCEEDED
} catch {
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class

case e => throw e
}
}
}
}
}
Expand Down