From 7f727cf971a63d8d7217c8e1fca8196f80ece4f5 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 3 Apr 2014 22:13:56 -0700 Subject: [PATCH] SPARK-1337: Application web UI garbage collects newest stages Simple fix... Author: Patrick Wendell Closes #320 from pwendell/stage-clean-up and squashes the following commits: 29be62e [Patrick Wendell] SPARK-1337: Application web UI garbage collects newest stages instead old ones (cherry picked from commit ee6e9e7d863022304ac9ced405b353b63accb6ab) Conflicts: core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala --- .../spark/ui/jobs/JobProgressListener.scala | 8 ++-- .../ui/jobs/JobProgressListenerSuite.scala | 40 +++++++++++++++++-- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 858a10ce750ff..369a7a5374410 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -74,8 +74,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList /** If stages is too large, remove and garbage collect old stages */ def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > RETAINED_STAGES) { - val toRemove = RETAINED_STAGES / 10 - stages.takeRight(toRemove).foreach( s => { + val toRemove = math.max(RETAINED_STAGES / 10, 1) + stages.take(toRemove).foreach { s => stageIdToTaskInfos.remove(s.stageId) stageIdToTime.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) @@ -87,8 +87,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToTasksFailed.remove(s.stageId) stageIdToPool.remove(s.stageId) if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)} - }) - stages.trimEnd(toRemove) + } + stages.trimStart(toRemove) } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 67a57a0e7f9d0..348fbe44cf575 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -17,13 +17,47 @@ package org.apache.spark.ui.jobs +import scala.collection.mutable.Buffer + import org.scalatest.FunSuite -import org.apache.spark.scheduler._ +import org.scalatest.matchers.ShouldMatchers + import org.apache.spark.{LocalSparkContext, SparkContext, Success} -import org.apache.spark.scheduler.SparkListenerTaskStart import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics} +import org.apache.spark.scheduler._ +import org.apache.spark.rdd.EmptyRDD + +class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers { + test("test LRU eviction of stages") { + System.setProperty("spark.ui.retainedStages", 5.toString) + val sc = new SparkContext("local", "test") + val listener = new JobProgressListener(sc) + + def createStageStartEvent(stageId: Int) = { + val stage = new Stage(stageId, new EmptyRDD(sc), 0, None, List(), 0, None) + val stageInfo = new StageInfo(stage, Buffer()) + SparkListenerStageSubmitted(stageInfo, null) + } + + def createStageEndEvent(stageId: Int) = { + val stage = new Stage(stageId, new EmptyRDD(sc), 0, None, List(), 0, None) + val stageInfo = new StageInfo(stage, Buffer()) + SparkListenerStageCompleted(stageInfo) + } + + for (i <- 1 to 50) { + listener.onStageSubmitted(createStageStartEvent(i)) + listener.onStageCompleted(createStageEndEvent(i)) + } + + listener.completedStages.size should be (5) + listener.completedStages.filter(_.stageId == 50).size should be (1) + listener.completedStages.filter(_.stageId == 49).size should be (1) + listener.completedStages.filter(_.stageId == 48).size should be (1) + listener.completedStages.filter(_.stageId == 47).size should be (1) + listener.completedStages.filter(_.stageId == 46).size should be (1) + } -class JobProgressListenerSuite extends FunSuite with LocalSparkContext { test("test executor id to summary") { val sc = new SparkContext("local", "test") val listener = new JobProgressListener(sc)