diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index c6dc3369ba5cc..869f1d602bbcd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -33,6 +33,7 @@ class StageInfo( val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo], + val parentIds: Seq[Int], val details: String) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None @@ -66,6 +67,7 @@ private[spark] object StageInfo { stage.name, numTasks.getOrElse(stage.numTasks), rddInfos, + stage.parents.map(_.id), stage.details) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 7541d3e9c72e7..0fe21b7ab3b8e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -51,7 +51,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { // This could be empty if the JobProgressListener hasn't received information about the // stage or if the stage information has been garbage collected listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown")) + new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) } val activeStages = mutable.Buffer[StageInfo]() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5cc625a727592..b679dcc3d9cae 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -227,6 +227,7 @@ private[spark] object JsonProtocol { def stageInfoToJson(stageInfo: StageInfo): JValue = { val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList) + val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList) val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing) val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing) @@ -235,6 +236,7 @@ private[spark] object JsonProtocol { ("Stage Name" -> stageInfo.name) ~ ("Number of Tasks" -> stageInfo.numTasks) ~ ("RDD Info" -> rddInfo) ~ + ("Parent IDs" -> parentIds) ~ ("Details" -> stageInfo.details) ~ ("Submission Time" -> submissionTime) ~ ("Completion Time" -> completionTime) ~ @@ -521,7 +523,7 @@ private[spark] object JsonProtocol { // The "Stage Infos" field was added in Spark 1.2.0 val stageInfos = Utils.jsonOption(json \ "Stage Infos") .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { - stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown")) + stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")) } SparkListenerJobStart(jobId, submissionTime, stageInfos, properties) } @@ -601,6 +603,9 @@ private[spark] object JsonProtocol { val stageName = (json \ "Stage Name").extract[String] val numTasks = (json \ "Number of Tasks").extract[Int] val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson) + val parentIds = Utils.jsonOption(json \ "Parent IDs") + .map { l => l.extract[List[JValue]].map(_.extract[Int]) } + .getOrElse(Seq.empty) val details = (json \ "Details").extractOpt[String].getOrElse("") val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) @@ -610,7 +615,8 @@ private[spark] object JsonProtocol { case None => Seq[AccumulableInfo]() } - val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details) + val stageInfo = new StageInfo( + stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 22acc270b983e..370c020530d72 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -701,7 +701,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val executorIdleTimeout = 3L private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = { - new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details") + new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details") } private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 21d8267114133..967dd0821ebd0 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val jobCompletionTime = 1421191296660L private def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") SparkListenerStageSubmitted(stageInfo) } private def createStageEndEvent(stageId: Int, failed: Boolean = false) = { - val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "") + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") if (failed) { stageInfo.failureReason = Some("Failed!") } @@ -51,7 +51,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc stageIds: Seq[Int], jobGroup: Option[String] = None): SparkListenerJobStart = { val stageInfos = stageIds.map { stageId => - new StageInfo(stageId, 0, stageId.toString, 0, null, "") + new StageInfo(stageId, 0, stageId.toString, 0, null, null, "") } val properties: Option[Properties] = jobGroup.map { groupId => val props = new Properties() diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 59affc2ce7c35..8cd7b5972f62d 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -54,7 +54,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { assert(storageListener.rddInfoList.isEmpty) // 2 RDDs are known, but none are cached - val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), "details") + val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), Seq.empty, "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.isEmpty) @@ -64,7 +64,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val rddInfo3Cached = rddInfo3 rddInfo2Cached.numCachedPartitions = 1 rddInfo3Cached.numCachedPartitions = 1 - val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details") + val stageInfo1 = new StageInfo( + 1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), Seq.empty, "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo1)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) @@ -72,7 +73,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { // Submitting RDDInfos with duplicate IDs does nothing val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, "scoop", Seq(10)) rddInfo0Cached.numCachedPartitions = 1 - val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details") + val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), Seq.empty, "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached)) assert(storageListener._rddInfoMap.size === 4) assert(storageListener.rddInfoList.size === 2) @@ -88,7 +89,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val rddInfo1Cached = rddInfo1 rddInfo0Cached.numCachedPartitions = 1 rddInfo1Cached.numCachedPartitions = 1 - val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details") + val stageInfo0 = new StageInfo( + 0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), Seq.empty, "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 2) assert(storageListener.rddInfoList.size === 2) @@ -108,7 +110,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val myRddInfo1 = rddInfo1 val myRddInfo2 = rddInfo2 val stageInfo0 = new StageInfo( - 0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") + 0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), Seq.empty, "details") bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 3) @@ -168,8 +170,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, "scoop", Seq(4)) val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, "scoop", Seq(4)) - val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details") - val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details") + val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details") + val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details") val taskMetrics0 = new TaskMetrics val taskMetrics1 = new TaskMetrics val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 30177334bdedc..1bc0b3266c86d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -161,7 +161,7 @@ class JsonProtocolSuite extends FunSuite { assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent)) } - test("StageInfo backward compatibility") { + test("StageInfo backward compatibility (details, accumulables)") { val info = makeStageInfo(1, 2, 3, 4L, 5L) val newJson = JsonProtocol.stageInfoToJson(info) @@ -294,7 +294,7 @@ class JsonProtocolSuite extends FunSuite { val stageIds = Seq[Int](1, 2, 3, 4) val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500)) val dummyStageInfos = - stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown")) + stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")) val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties) val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"}) val expectedJobStart = @@ -320,8 +320,8 @@ class JsonProtocolSuite extends FunSuite { assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent)) } - test("RDDInfo backward compatibility") { - // Prior to Spark 1.4.0, RDDInfo did not have a "Scope" and "Parent IDs" properties + test("RDDInfo backward compatibility (scope, parent IDs)") { + // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, "fable", Seq(1, 6, 8)) val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo) .removeField({ _._1 == "Scope"}) @@ -330,6 +330,14 @@ class JsonProtocolSuite extends FunSuite { assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson)) } + test("StageInfo backward compatibility (parent IDs)") { + // Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property + val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details") + val oldStageInfo = JsonProtocol.stageInfoToJson(stageInfo).removeField({ _._1 == "Parent IDs"}) + val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details") + assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo)) + } + /** -------------------------- * | Helper test running methods | * --------------------------- */ @@ -661,7 +669,7 @@ class JsonProtocolSuite extends FunSuite { private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) } - val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details") + val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 200, 300), "details") val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2)) stageInfo.accumulables(acc1.id) = acc1 stageInfo.accumulables(acc2.id) = acc2 @@ -754,6 +762,7 @@ class JsonProtocolSuite extends FunSuite { | "Stage Name": "greetings", | "Number of Tasks": 200, | "RDD Info": [], + | "ParentIDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -808,6 +817,7 @@ class JsonProtocolSuite extends FunSuite { | "Disk Size": 501 | } | ], + | "ParentIDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1193,6 +1203,7 @@ class JsonProtocolSuite extends FunSuite { | "Disk Size": 500 | } | ], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1252,6 +1263,7 @@ class JsonProtocolSuite extends FunSuite { | "Disk Size": 1001 | } | ], + | "ParentIDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1329,6 +1341,7 @@ class JsonProtocolSuite extends FunSuite { | "Disk Size": 1502 | } | ], + | "ParentIDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1424,6 +1437,7 @@ class JsonProtocolSuite extends FunSuite { | "Disk Size": 2003 | } | ], + | "ParentIDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | {