Skip to content

Commit

Permalink
SPARK-1337: Application web UI garbage collects newest stages
Browse files Browse the repository at this point in the history
Simple fix...

Author: Patrick Wendell <pwendell@gmail.com>

Closes #320 from pwendell/stage-clean-up and squashes the following commits:

29be62e [Patrick Wendell] SPARK-1337: Application web UI garbage collects newest stages instead old ones
(cherry picked from commit ee6e9e7)

Conflicts:

	core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
	core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
  • Loading branch information
pwendell committed Apr 4, 2014
1 parent d9c7a80 commit 7f727cf
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
/** If stages is too large, remove and garbage collect old stages */
def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
val toRemove = math.max(RETAINED_STAGES / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTaskInfos.remove(s.stageId)
stageIdToTime.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
Expand All @@ -87,8 +87,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
stageIdToTasksFailed.remove(s.stageId)
stageIdToPool.remove(s.stageId)
if (stageIdToDescription.contains(s.stageId)) {stageIdToDescription.remove(s.stageId)}
})
stages.trimEnd(toRemove)
}
stages.trimStart(toRemove)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,47 @@

package org.apache.spark.ui.jobs

import scala.collection.mutable.Buffer

import org.scalatest.FunSuite
import org.apache.spark.scheduler._
import org.scalatest.matchers.ShouldMatchers

import org.apache.spark.{LocalSparkContext, SparkContext, Success}
import org.apache.spark.scheduler.SparkListenerTaskStart
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.rdd.EmptyRDD

class JobProgressListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
test("test LRU eviction of stages") {
System.setProperty("spark.ui.retainedStages", 5.toString)
val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc)

def createStageStartEvent(stageId: Int) = {
val stage = new Stage(stageId, new EmptyRDD(sc), 0, None, List(), 0, None)
val stageInfo = new StageInfo(stage, Buffer())
SparkListenerStageSubmitted(stageInfo, null)
}

def createStageEndEvent(stageId: Int) = {
val stage = new Stage(stageId, new EmptyRDD(sc), 0, None, List(), 0, None)
val stageInfo = new StageInfo(stage, Buffer())
SparkListenerStageCompleted(stageInfo)
}

for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}

listener.completedStages.size should be (5)
listener.completedStages.filter(_.stageId == 50).size should be (1)
listener.completedStages.filter(_.stageId == 49).size should be (1)
listener.completedStages.filter(_.stageId == 48).size should be (1)
listener.completedStages.filter(_.stageId == 47).size should be (1)
listener.completedStages.filter(_.stageId == 46).size should be (1)
}

class JobProgressListenerSuite extends FunSuite with LocalSparkContext {
test("test executor id to summary") {
val sc = new SparkContext("local", "test")
val listener = new JobProgressListener(sc)
Expand Down

0 comments on commit 7f727cf

Please sign in to comment.