Skip to content

Commit

Permalink
[SPARK-25837][CORE] Fix potential slowdown in AppStatusListener when …
Browse files Browse the repository at this point in the history
…cleaning up stages

## What changes were proposed in this pull request?

* Update `AppStatusListener` `cleanupStages` method to remove tasks for those stages in a single pass instead of 1 for each stage.
* This fixes an issue where the cleanupStages method would get backed up, causing a backup in the executor in ElementTrackingStore, resulting in stages and jobs not getting cleaned up properly.

Tasks seem most susceptible to this as there are a lot of them, however a similar issue could arise in other locations the `KVStore` `view` method is used. A broader fix might involve updates to `KVStoreView` and `InMemoryView` as it appears this interface and implementation can lead to multiple and inefficient traversals of the stored data.

## How was this patch tested?

Using existing tests in AppStatusListenerSuite

This is my original work and I license the work to the project under the project’s open source license.

Closes apache#22883 from patrickbrownsync/cleanup-stages-fix.

Authored-by: Patrick Brown <patrick.brown@blyncsy.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
patrickbrownsync authored and Marcelo Vanzin committed Nov 1, 2018
1 parent fc82222 commit e9d3ca0
Showing 1 changed file with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1073,16 +1073,6 @@ private[spark] class AppStatusListener(
kvstore.delete(e.getClass(), e.id)
}

val tasks = kvstore.view(classOf[TaskDataWrapper])
.index("stage")
.first(key)
.last(key)
.asScala

tasks.foreach { t =>
kvstore.delete(t.getClass(), t.taskId)
}

// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
Expand All @@ -1105,6 +1095,15 @@ private[spark] class AppStatusListener(

cleanupCachedQuantiles(key)
}

// Delete tasks for all stages in one pass, as deleting them for each stage individually is slow
val tasks = kvstore.view(classOf[TaskDataWrapper]).asScala
val keys = stages.map { s => (s.info.stageId, s.info.attemptId) }.toSet
tasks.foreach { t =>
if (keys.contains((t.stageId, t.stageAttemptId))) {
kvstore.delete(t.getClass(), t.taskId)
}
}
}

private def cleanupTasks(stage: LiveStage): Unit = {
Expand Down

0 comments on commit e9d3ca0

Please sign in to comment.