Skip to content

Commit

Permalink
[SPARK-937] adding EXITED executor state and not relaunching cleanly …
Browse files Browse the repository at this point in the history
…exited executors

updated code comments

minor comment wording change
  • Loading branch information
kanzhang committed May 9, 2014
1 parent bd67551 commit cb0cc86
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,11 @@ private[spark] class Master(
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus.exists(_ == 0)
// Only retry certain number of times so we don't go into an infinite loop.
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
} else if (!normalExit) {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)

// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.FAILED
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
Expand Down

0 comments on commit cb0cc86

Please sign in to comment.