From b89c2587efb52ab3f5d8fc1a60fbd5a4f9c07510 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 20 Nov 2014 23:44:38 -0800 Subject: [PATCH] More JSON protocol backwards-compatibility fixes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The root problem was that I removed a field from the JSON and recomputed it from another field. In order for the backwards-compatibility test to work, I needed to manually re-add the removed field in order to construct JSON that’s in the right (old) format. --- .../scala/org/apache/spark/util/JsonProtocol.scala | 7 ++++--- .../org/apache/spark/util/JsonProtocolSuite.scala | 11 ++++------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5123df0bcfe0c..9d1475cf1b474 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -121,7 +121,8 @@ private[spark] object JsonProtocol { val properties = propertiesToJson(jobStart.properties) ("Event" -> Utils.getFormattedClassName(jobStart)) ~ ("Job ID" -> jobStart.jobId) ~ - ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ + // ("Stage IDs" -> jobStart.stageIds) ~ // Removed in 1.2.0 + ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in 1.2.0 ("Properties" -> properties) } @@ -459,9 +460,9 @@ private[spark] object JsonProtocol { // This block of code handles backwards compatibility: val stageIds: Option[Seq[Int]] = Utils.jsonOption(json \ "Stage IDs").map(_.extract[List[JValue]].map(_.extract[Int])) - if (stageIds.isDefined) { + if (stageIds.isDefined) { // Reading JSON written prior to 1.2.0 stageIds.get.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown")) - } else { + } else { // Reading JSON written after 1.2.0 Utils.jsonOption(json \ "Stage Infos") .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse(Seq.empty) } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 11f428d07f23b..3e5e8b1e84546 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -21,6 +21,8 @@ import java.util.Properties import scala.collection.Map +import org.json4s.JsonAST.JObject +import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.scalatest.FunSuite @@ -234,6 +236,7 @@ class JsonProtocolSuite extends FunSuite { val stageInfos = stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown")) val jobStart = SparkListenerJobStart(10, stageInfos, properties) val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"}) + .asInstanceOf[JObject] ~ ("Stage IDs" -> stageIds) val expectedJobStart = SparkListenerJobStart(10, stageInfos, properties) assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent)) } @@ -320,7 +323,7 @@ class JsonProtocolSuite extends FunSuite { case (e1: SparkListenerJobStart, e2: SparkListenerJobStart) => assert(e1.jobId === e2.jobId) assert(e1.properties === e2.properties) - assertSeqEquals(e1.stageIds, e2.stageIds, (i1: Int, i2: Int) => assert(i1 === i2)) + assert(e1.stageIds === e2.stageIds) case (e1: SparkListenerJobEnd, e2: SparkListenerJobEnd) => assert(e1.jobId === e2.jobId) assertEquals(e1.jobResult, e2.jobResult) @@ -1319,12 +1322,6 @@ class JsonProtocolSuite extends FunSuite { | ] | } | ], - | "Stage IDs": [ - | 1, - | 2, - | 3, - | 4 - | ], | "Properties": { | "France": "Paris", | "Germany": "Berlin",