From 89b01f5c389ca06be7e1296326a79e34d5a955ac Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 16 Jul 2015 11:08:08 +0800 Subject: [PATCH] Update the unit test to add more cases --- .../spark/streaming/CheckpointSuite.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 52702bb852d8d..d308ac05a54fe 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -193,7 +193,7 @@ class CheckpointSuite extends TestSuiteBase { // This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered // with correct value. - test("correctly recover spark.driver.[host|port] from checkpoint") { + test("get correct spark.driver.[host|port] from checkpoint") { val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999") conf.foreach(kv => System.setProperty(kv._1, kv._2)) ssc = new StreamingContext(master, framework, batchDuration) @@ -218,9 +218,24 @@ class CheckpointSuite extends TestSuiteBase { val restoredConf = ssc.conf assert(restoredConf.get("spark.driver.host") === "localhost") assert(restoredConf.get("spark.driver.port") === "9999") + ssc.stop() + + // If spark.driver.host and spark.driver.host is not set in system property, these two + // parameters should not be presented in the newly recovered conf. + conf.foreach(kv => System.clearProperty(kv._1)) + val newCpConf1 = newCp.createSparkConf() + assert(!newCpConf1.contains("spark.driver.host")) + assert(!newCpConf1.contains("spark.driver.port")) + + // Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set + // explicitly. + ssc = new StreamingContext(null, newCp, null) + val restoredConf1 = ssc.conf + assert(restoredConf1.get("spark.driver.host") === "localhost") + assert(restoredConf1.get("spark.driver.port") !== "9999") } - // This tests whether the systm can recover from a master failure with simple + // This tests whether the system can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input // source - TestInputDStream. test("recovery with map and reduceByKey operations") {