Skip to content

Commit

Permalink
[SPARK-26219][CORE][BRANCH-2.4] Executor summary should get updated f…
Browse files Browse the repository at this point in the history
…or failure jobs in the history server UI

Back port the commit #23181 into Spark2.4 branch

Added UT

Closes #23191 from shahidki31/branch-2.4.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
shahidki31 authored and Marcelo Vanzin committed Dec 3, 2018
1 parent 349e25b commit 90fcd12
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,14 @@ private[spark] class AppStatusListener(
}
}

// Force an update on live applications when the number of active tasks reaches 0. This is
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
// Force an update on both live and history applications when the number of active tasks
// reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
// reliably up to date.
if (exec.activeTasks == 0) {
update(exec, now)
} else {
maybeUpdate(exec, now)
}
}
}

Expand Down Expand Up @@ -954,14 +959,6 @@ private[spark] class AppStatusListener(
}
}

private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
if (condition) {
liveUpdate(entity, now)
} else {
maybeUpdate(entity, now)
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1274,48 +1274,70 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(allJobs.head.numFailedStages == 1)
}

test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
Seq(true, false).foreach { live =>
test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") {

val listener = new AppStatusListener(store, testConf, true)
val testConf = if (live) {
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
} else {
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L)
}

val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
val listener = new AppStatusListener(store, testConf, live)

val tasks = createTasks(4, Array("1", "2"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
}
Seq("1", "2").foreach { execId =>
listener.onExecutorAdded(SparkListenerExecutorAdded(0L, execId,
new ExecutorInfo("host1", 1, Map.empty)))
}
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))

time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
Success, tasks(0), null))
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
Success, tasks(1), null))
val tasks = createTasks(4, Array("1", "2"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
}

stage.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage))
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(0), null))
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(1), null))

time += 1
tasks(2).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
time += 1
tasks(3).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))

val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
esummary.foreach { execSummary =>
assert(execSummary.failedTasks === 1)
assert(execSummary.succeededTasks === 1)
assert(execSummary.killedTasks === 0)
stage.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage))
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(
new RuntimeException("Bad Executor"))))

time += 1
tasks(2).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
time += 1
tasks(3).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))

val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
esummary.foreach { execSummary =>
assert(execSummary.failedTasks === 1)
assert(execSummary.succeededTasks === 1)
assert(execSummary.killedTasks === 0)
}

val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
assert(allExecutorSummary.size === 2)
allExecutorSummary.foreach { allExecSummary =>
assert(allExecSummary.failedTasks === 1)
assert(allExecSummary.activeTasks === 0)
assert(allExecSummary.completedTasks === 1)
}
store.delete(classOf[ExecutorSummaryWrapper], "1")
store.delete(classOf[ExecutorSummaryWrapper], "2")
}
}

Expand Down

0 comments on commit 90fcd12

Please sign in to comment.