diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 8e845573a903d..bd3f58b6182c0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -641,9 +641,14 @@ private[spark] class AppStatusListener( } } - // Force an update on live applications when the number of active tasks reaches 0. This is - // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date. - conditionalLiveUpdate(exec, now, exec.activeTasks == 0) + // Force an update on both live and history applications when the number of active tasks + // reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be + // reliably up to date. + if (exec.activeTasks == 0) { + update(exec, now) + } else { + maybeUpdate(exec, now) + } } } @@ -1024,14 +1029,6 @@ private[spark] class AppStatusListener( } } - private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = { - if (condition) { - liveUpdate(entity, now) - } else { - maybeUpdate(entity, now) - } - } - private def cleanupExecutors(count: Long): Unit = { // Because the limit is on the number of *dead* executors, we need to calculate whether // there are actually enough dead executors to be deleted. diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 7860a0df4bb2d..d0bb533ae72bf 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1273,48 +1273,68 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(allJobs.head.numFailedStages == 1) } - test("SPARK-25451: total tasks in the executor summary should match total stage tasks") { - val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + Seq(true, false).foreach { live: Boolean => + test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") { - val listener = new AppStatusListener(store, testConf, true) + val testConf = if (live) { + conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue) + } else { + conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L) + } - val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") - listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) - listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) + val listener = new AppStatusListener(store, testConf, live) - val tasks = createTasks(4, Array("1", "2")) - tasks.foreach { task => - listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) - } + listener.onExecutorAdded(createExecutorAddedEvent(1)) + listener.onExecutorAdded(createExecutorAddedEvent(2)) + val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details") + listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null)) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) - time += 1 - tasks(0).markFinished(TaskState.FINISHED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", - Success, tasks(0), null)) - time += 1 - tasks(1).markFinished(TaskState.FINISHED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", - Success, tasks(1), null)) + val tasks = createTasks(4, Array("1", "2")) + tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task)) + } - stage.failureReason = Some("Failed") - listener.onStageCompleted(SparkListenerStageCompleted(stage)) - time += 1 - listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor")))) + time += 1 + tasks(0).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(0), null)) + time += 1 + tasks(1).markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + Success, tasks(1), null)) - time += 1 - tasks(2).markFinished(TaskState.FAILED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", - ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) - time += 1 - tasks(3).markFinished(TaskState.FAILED, time) - listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", - ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null)) - - val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) - esummary.foreach { execSummary => - assert(execSummary.failedTasks === 1) - assert(execSummary.succeededTasks === 1) - assert(execSummary.killedTasks === 0) + stage.failureReason = Some("Failed") + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed( + new RuntimeException("Bad Executor")))) + + time += 1 + tasks(2).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null)) + time += 1 + tasks(3).markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType", + ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null)) + + val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info) + esummary.foreach { execSummary => + assert(execSummary.failedTasks === 1) + assert(execSummary.succeededTasks === 1) + assert(execSummary.killedTasks === 0) + } + + val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info) + assert(allExecutorSummary.size === 2) + allExecutorSummary.foreach { allExecSummary => + assert(allExecSummary.failedTasks === 1) + assert(allExecSummary.activeTasks === 0) + assert(allExecSummary.completedTasks === 1) + } + store.delete(classOf[ExecutorSummaryWrapper], "1") + store.delete(classOf[ExecutorSummaryWrapper], "2") } }