diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index d52b7e8dae71e..e2c190ea198e0 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -1073,16 +1073,6 @@ private[spark] class AppStatusListener( kvstore.delete(e.getClass(), e.id) } - val tasks = kvstore.view(classOf[TaskDataWrapper]) - .index("stage") - .first(key) - .last(key) - .asScala - - tasks.foreach { t => - kvstore.delete(t.getClass(), t.taskId) - } - // Check whether there are remaining attempts for the same stage. If there aren't, then // also delete the RDD graph data. val remainingAttempts = kvstore.view(classOf[StageDataWrapper]) @@ -1105,6 +1095,15 @@ private[spark] class AppStatusListener( cleanupCachedQuantiles(key) } + + // Delete tasks for all stages in one pass, as deleting them for each stage individually is slow + val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala + val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet + tasks.foreach { t => + if (keys.contains((t.stageId, t.stageAttemptId))) { + kvstore.delete(t.getClass(), t.taskId) + } + } } private def cleanupTasks(stage: LiveStage): Unit = {