From 031d7d41430ec1f3c3353e33eab4821a9bcd58a5 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 16 Jul 2015 16:55:46 -0700 Subject: [PATCH] [SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue. Author: jerryshao Author: Saisai Shao Closes #5060 from jerryshao/SPARK-6304 and squashes the following commits: 89b01f5 [jerryshao] Update the unit test to add more cases 275d252 [jerryshao] Address the comments 7cc146d [jerryshao] Address the comments 2624723 [jerryshao] Fix rebase conflict 45befaa [Saisai Shao] Update the unit test bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue --- .../apache/spark/streaming/Checkpoint.scala | 2 + .../spark/streaming/CheckpointSuite.scala | 45 ++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5279331c9e122..65d4e933bf8e9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) // Reload properties for the checkpoint application since user wants to set a reload property // or spark had changed its value and user wants to set it back. val propertiesToReload = List( + "spark.driver.host", + "spark.driver.port", "spark.master", "spark.yarn.keytab", "spark.yarn.principal") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6a94928076236..d308ac05a54fe 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase { } } + // This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered + // with correct value. + test("get correct spark.driver.[host|port] from checkpoint") { + val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999") + conf.foreach(kv => System.setProperty(kv._1, kv._2)) + ssc = new StreamingContext(master, framework, batchDuration) + val originalConf = ssc.conf + assert(originalConf.get("spark.driver.host") === "localhost") + assert(originalConf.get("spark.driver.port") === "9999") + + val cp = new Checkpoint(ssc, Time(1000)) + ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back + val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) + + val newCpConf = newCp.createSparkConf() + assert(newCpConf.contains("spark.driver.host")) + assert(newCpConf.contains("spark.driver.port")) + assert(newCpConf.get("spark.driver.host") === "localhost") + assert(newCpConf.get("spark.driver.port") === "9999") + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + val restoredConf = ssc.conf + assert(restoredConf.get("spark.driver.host") === "localhost") + assert(restoredConf.get("spark.driver.port") === "9999") + ssc.stop() + + // If spark.driver.host and spark.driver.host is not set in system property, these two + // parameters should not be presented in the newly recovered conf. + conf.foreach(kv => System.clearProperty(kv._1)) + val newCpConf1 = newCp.createSparkConf() + assert(!newCpConf1.contains("spark.driver.host")) + assert(!newCpConf1.contains("spark.driver.port")) + + // Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set + // explicitly. + ssc = new StreamingContext(null, newCp, null) + val restoredConf1 = ssc.conf + assert(restoredConf1.get("spark.driver.host") === "localhost") + assert(restoredConf1.get("spark.driver.port") !== "9999") + } - // This tests whether the systm can recover from a master failure with simple + // This tests whether the system can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input // source - TestInputDStream. test("recovery with map and reduceByKey operations") {