Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1337: Application web UI garbage collects newest stages #320

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
/** If stages is too large, remove and garbage collect old stages */
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = retainedStages / 10
stages.takeRight(toRemove).foreach( s => {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTaskData.remove(s.stageId)
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
Expand All @@ -94,8 +94,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToTasksFailed.remove(s.stageId)
stageIdToPool.remove(s.stageId)
if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
})
stages.trimEnd(toRemove)
}
stages.trimStart(toRemove)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,42 @@
package org.apache.spark.ui.jobs

import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers

import org.apache.spark.{LocalSparkContext, SparkContext, Success}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils

class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
test("test LRU eviction of stages") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)

def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
SparkListenerStageSubmitted(stageInfo)
}

def createStageEndEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
SparkListenerStageCompleted(stageInfo)
}

for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}

listener.completedStages.size should be (5)
listener.completedStages.filter(_.stageId == 50).size should be (1)
listener.completedStages.filter(_.stageId == 49).size should be (1)
listener.completedStages.filter(_.stageId == 48).size should be (1)
listener.completedStages.filter(_.stageId == 47).size should be (1)
listener.completedStages.filter(_.stageId == 46).size should be (1)
}

test("test executor id to summary") {
val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc.conf)
Expand Down