Skip to content

Commit

Permalink
More JSON protocol backwards-compatibility fixes.
Browse files Browse the repository at this point in the history
The root problem was that I removed a field from the JSON and recomputed
it from another field.  In order for the backwards-compatibility test
to work, I needed to manually re-add the removed field in order to
construct JSON that’s in the right (old) format.
  • Loading branch information
JoshRosen committed Nov 21, 2014
1 parent ff804cd commit b89c258
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ private[spark] object JsonProtocol {
val properties = propertiesToJson(jobStart.properties)
("Event" -> Utils.getFormattedClassName(jobStart)) ~
("Job ID" -> jobStart.jobId) ~
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~
// ("Stage IDs" -> jobStart.stageIds) ~ // Removed in 1.2.0
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in 1.2.0
("Properties" -> properties)
}

Expand Down Expand Up @@ -459,9 +460,9 @@ private[spark] object JsonProtocol {
// This block of code handles backwards compatibility:
val stageIds: Option[Seq[Int]] =
Utils.jsonOption(json \ "Stage IDs").map(_.extract[List[JValue]].map(_.extract[Int]))
if (stageIds.isDefined) {
if (stageIds.isDefined) { // Reading JSON written prior to 1.2.0
stageIds.get.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
} else {
} else { // Reading JSON written after 1.2.0
Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse(Seq.empty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.Properties

import scala.collection.Map

import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.scalatest.FunSuite

Expand Down Expand Up @@ -234,6 +236,7 @@ class JsonProtocolSuite extends FunSuite {
val stageInfos = stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
val jobStart = SparkListenerJobStart(10, stageInfos, properties)
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
.asInstanceOf[JObject] ~ ("Stage IDs" -> stageIds)
val expectedJobStart = SparkListenerJobStart(10, stageInfos, properties)
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
}
Expand Down Expand Up @@ -320,7 +323,7 @@ class JsonProtocolSuite extends FunSuite {
case (e1: SparkListenerJobStart, e2: SparkListenerJobStart) =>
assert(e1.jobId === e2.jobId)
assert(e1.properties === e2.properties)
assertSeqEquals(e1.stageIds, e2.stageIds, (i1: Int, i2: Int) => assert(i1 === i2))
assert(e1.stageIds === e2.stageIds)
case (e1: SparkListenerJobEnd, e2: SparkListenerJobEnd) =>
assert(e1.jobId === e2.jobId)
assertEquals(e1.jobResult, e2.jobResult)
Expand Down Expand Up @@ -1319,12 +1322,6 @@ class JsonProtocolSuite extends FunSuite {
| ]
| }
| ],
| "Stage IDs": [
| 1,
| 2,
| 3,
| 4
| ],
| "Properties": {
| "France": "Paris",
| "Germany": "Berlin",
Expand Down

0 comments on commit b89c258

Please sign in to comment.