Skip to content

Commit

Permalink
Address the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Jul 15, 2015
1 parent 2624723 commit 7cc146d
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 12 deletions.
5 changes: 0 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

// This is used for Spark Streaming to check whether driver host and port are set by user,
// if these two configurations are set by user, so the recovery mechanism should not remove this.
private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host")
private[spark] val isDriverPortSetByUser = config.contains("spark.driver.port")

val startTime = System.currentTimeMillis()

private val stopped: AtomicBoolean = new AtomicBoolean(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConfPairs = ssc.conf.getAll.filterNot { kv =>
(!ssc.sc.isDriverHostSetByUser && kv._1 == "spark.driver.host") ||
(!ssc.sc.isDriverPortSetByUser && kv._1 == "spark.driver.port") }
val sparkConfPairs = ssc.conf.getAll

def createSparkConf(): SparkConf = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ class CheckpointSuite extends TestSuiteBase {
conf.foreach(kv => System.setProperty(kv._1, kv._2))
ssc = new StreamingContext(master, framework, batchDuration)
val originalConf = ssc.conf
assert(ssc.sc.isDriverHostSetByUser === true)
assert(ssc.sc.isDriverPortSetByUser === true)
assert(originalConf.get("spark.driver.host") === "localhost")
assert(originalConf.get("spark.driver.port") === "9999")

Expand All @@ -218,8 +216,6 @@ class CheckpointSuite extends TestSuiteBase {
// Check if all the parameters have been restored
ssc = new StreamingContext(null, newCp, null)
val restoredConf = ssc.conf
assert(ssc.sc.isDriverHostSetByUser === true)
assert(ssc.sc.isDriverPortSetByUser === true)
assert(restoredConf.get("spark.driver.host") === "localhost")
assert(restoredConf.get("spark.driver.port") === "9999")
}
Expand Down

0 comments on commit 7cc146d

Please sign in to comment.