From 90fcd12af936792a99738789ba1eeb9a1e7e3ce1 Mon Sep 17 00:00:00 2001 From: Shahid Date: Mon, 3 Dec 2018 15:11:43 -0800 Subject: [PATCH] [SPARK-26219][CORE][BRANCH-2.4] Executor summary should get updated for failure jobs in the history server UI Back port the commit https://github.com/apache/spark/pull/23181 into Spark2.4 branch Added UT Closes #23191 from shahidki31/branch-2.4. Authored-by: Shahid Signed-off-by: Marcelo Vanzin --- .../spark/status/AppStatusListener.scala | 19 ++-- .../spark/status/AppStatusListenerSuite.scala | 94 ++++++++++++------- 2 files changed, 66 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index e6f0d08a0bceb..5b564efa96849 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -599,9 +599,14 @@ private[spark] class AppStatusListener( } } - // Force an update on live applications when the number of active tasks reaches 0. This is - // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date. - conditionalLiveUpdate(exec, now, exec.activeTasks == 0) + // Force an update on both live and history applications when the number of active tasks + // reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be + // reliably up to date. + if (exec.activeTasks == 0) { + update(exec, now) + } else { + maybeUpdate(exec, now) + } } } @@ -954,14 +959,6 @@ private[spark] class AppStatusListener( } } - private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = { - if (condition) { - liveUpdate(entity, now) - } else { - maybeUpdate(entity, now) - } - } - private def cleanupExecutors(count: Long): Unit = { // Because the limit is on the number of *dead* executors, we need to calculate whether // there are actually enough dead executors to be deleted. diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index b6ddbe01fda9f..f34be48a4d00e 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1274,48 +1274,70 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(allJobs.head.numFailedStages == 1) } - test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { - val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + Seq(true, false).foreach { live => + test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") { - val listener = new AppStatusListener(store, testConf, true) + val testConf = if (live) { + conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + } else { + conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L) + } - val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") - listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) - listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + val listener = new AppStatusListener(store, testConf, live) - val tasks = createTasks(4, Array("1", "2")) - tasks.foreach { task => - listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) - } + Seq("1", "2").foreach { execId => + listener.onExecutorAdded(SparkListenerExecutorAdded(0L, execId, + new ExecutorInfo("host1", 1, Map.empty))) + } + val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") + listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) - time += 1 - tasks(0).markFinished(TaskState.FINISHED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - Success, tasks(0), null)) - time += 1 - tasks(1).markFinished(TaskState.FINISHED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - Success, tasks(1), null)) + val tasks = createTasks(4, Array("1", "2")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) + } - stage.failureReason = Some("Failed") - listener.onStageCompleted(SparkListenerStageCompleted(stage)) - time += 1 - listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor")))) + time += 1 + tasks(0).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), null)) + time += 1 + tasks(1).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(1), null)) - time += 1 - tasks(2).markFinished(TaskState.FAILED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) - time += 1 - tasks(3).markFinished(TaskState.FAILED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType", - ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null)) - - val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) - esummary.foreach { execSummary => - assert(execSummary.failedTasks === 1) - assert(execSummary.succeededTasks === 1) - assert(execSummary.killedTasks === 0) + stage.failureReason = Some("Failed") + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed( + new RuntimeException("Bad Executor")))) + + time += 1 + tasks(2).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) + time += 1 + tasks(3).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null)) + + val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) + esummary.foreach { execSummary => + assert(execSummary.failedTasks === 1) + assert(execSummary.succeededTasks === 1) + assert(execSummary.killedTasks === 0) + } + + val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info) + assert(allExecutorSummary.size === 2) + allExecutorSummary.foreach { allExecSummary => + assert(allExecSummary.failedTasks === 1) + assert(allExecSummary.activeTasks === 0) + assert(allExecSummary.completedTasks === 1) + } + store.delete(classOf[ExecutorSummaryWrapper], "1") + store.delete(classOf[ExecutorSummaryWrapper], "2") } }