Skip to content

Commit

Permalink
[SPARK-26219][CORE] Executor summary should get updated for failure j…
Browse files Browse the repository at this point in the history
…obs in the history server UI

The root cause of the problem is, whenever the taskEnd event comes after stageCompleted event, execSummary is updating only for live UI. we need to update for history UI too.

To see the previous discussion, refer: PR for apache#23038, https://issues.apache.org/jira/browse/SPARK-26100.

Added UT. Manually verified

Test step to reproduce:

```
bin/spark-shell --master yarn --conf spark.executor.instances=3
sc.parallelize(1 to 10000, 10).map{ x => throw new RuntimeException("Bad executor")}.collect()
```

Open Executors page from the History UI

Before patch:
![screenshot from 2018-11-29 22-13-34](https://user-images.githubusercontent.com/23054875/49246338-a21ead00-f43a-11e8-8214-f1020420be52.png)

After patch:
![screenshot from 2018-11-30 00-54-49](https://user-images.githubusercontent.com/23054875/49246353-aa76e800-f43a-11e8-98ef-7faecaa7a50e.png)

Closes apache#23181 from shahidki31/executorUpdate.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
shahidki31 authored and jackylee-ch committed Feb 18, 2019
1 parent 49def9e commit 81bddf9
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,14 @@ private[spark] class AppStatusListener(
}
}

// Force an update on live applications when the number of active tasks reaches 0. This is
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
// Force an update on both live and history applications when the number of active tasks
// reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
// reliably up to date.
if (exec.activeTasks == 0) {
update(exec, now)
} else {
maybeUpdate(exec, now)
}
}
}

Expand Down Expand Up @@ -1024,14 +1029,6 @@ private[spark] class AppStatusListener(
}
}

private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
if (condition) {
liveUpdate(entity, now)
} else {
maybeUpdate(entity, now)
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,48 +1273,68 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(allJobs.head.numFailedStages == 1)
}

test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
Seq(true, false).foreach { live =>
test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") {

val listener = new AppStatusListener(store, testConf, true)
val testConf = if (live) {
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
} else {
conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L)
}

val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
val listener = new AppStatusListener(store, testConf, live)

val tasks = createTasks(4, Array("1", "2"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
}
listener.onExecutorAdded(createExecutorAddedEvent(1))
listener.onExecutorAdded(createExecutorAddedEvent(2))
val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))

time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(0), null))
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(1), null))
val tasks = createTasks(4, Array("1", "2"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
}

stage.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage))
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(0), null))
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
Success, tasks(1), null))

time += 1
tasks(2).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
time += 1
tasks(3).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))

val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
esummary.foreach { execSummary =>
assert(execSummary.failedTasks === 1)
assert(execSummary.succeededTasks === 1)
assert(execSummary.killedTasks === 0)
stage.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage))
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(
new RuntimeException("Bad Executor"))))

time += 1
tasks(2).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
time += 1
tasks(3).markFinished(TaskState.FAILED, time)
listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))

val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
esummary.foreach { execSummary =>
assert(execSummary.failedTasks === 1)
assert(execSummary.succeededTasks === 1)
assert(execSummary.killedTasks === 0)
}

val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
assert(allExecutorSummary.size === 2)
allExecutorSummary.foreach { allExecSummary =>
assert(allExecSummary.failedTasks === 1)
assert(allExecSummary.activeTasks === 0)
assert(allExecSummary.completedTasks === 1)
}
store.delete(classOf[ExecutorSummaryWrapper], "1")
store.delete(classOf[ExecutorSummaryWrapper], "2")
}
}

Expand Down

0 comments on commit 81bddf9

Please sign in to comment.