From 2286e6bdc417cf9caca4beabd37ff3d025019ac3 Mon Sep 17 00:00:00 2001 From: WangTao Date: Wed, 5 Nov 2014 23:19:52 +0800 Subject: [PATCH 1/6] ignore spark.driver.host in yarn-cluster mode --- .../main/scala/org/apache/spark/SparkContext.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3cdaa6a9cc8a8..0c21b0316303a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -175,10 +175,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { logInfo("Spark configuration:\n" + conf.toDebugString) } - // Set Spark driver host and port system properties - conf.setIfMissing("spark.driver.host", Utils.localHostName()) - conf.setIfMissing("spark.driver.port", "0") - val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten @@ -206,6 +202,14 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + // Set Spark driver host and port system properties. Ignore host setting in yarn-cluster mode. + if (master == "yarn-cluster") { + conf.set("spark.driver.host", Utils.localHostName()) + } else { + conf.setIfMissing("spark.driver.host", Utils.localHostName()) + } + conf.setIfMissing("spark.driver.port", "0") + // An asynchronous listener bus for Spark events private[spark] val listenerBus = new LiveListenerBus From ff8d5f7409cf1eccaa3122400ce3904cfe7b145f Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 11 Nov 2014 11:58:20 +0800 Subject: [PATCH 2/6] also ignore it in standalone cluster mode --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/deploy/Client.scala | 8 ++++++++ .../spark/deploy/yarn/ApplicationMasterArguments.scala | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0c21b0316303a..860f7c3145bac 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -203,7 +203,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") // Set Spark driver host and port system properties. Ignore host setting in yarn-cluster mode. - if (master == "yarn-cluster") { + if (master.contains("cluster")) { conf.set("spark.driver.host", Utils.localHostName()) } else { conf.setIfMissing("spark.driver.host", Utils.localHostName()) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index f2687ce6b42b4..8e54c0042f996 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -155,6 +155,14 @@ object Client { } conf.set("spark.akka.askTimeout", "10") conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) + + // Set the web ui port to be ephemeral so we don't conflict with other spark processes + // running on the same box + conf.set("spark.ui.port", "0") + + // Set the master property to match the requested mode. + conf.set("spark.master", "standalone-cluster") + Logger.getRootLogger.setLevel(driverArgs.logLevel) val (actorSystem, _) = AkkaUtils.createActorSystem( diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 8b32c76d14037..d76a63276d752 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -36,7 +36,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var args = inputArgs - while (! args.isEmpty) { + while (!args.isEmpty) { // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, // the properties with executor in their names are preferred. args match { From 667cf24e688d6f936263fa65324996cdc33c2233 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 11 Nov 2014 14:00:33 +0800 Subject: [PATCH 3/6] document fix --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 860f7c3145bac..7114c336e3e9f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -202,7 +202,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") - // Set Spark driver host and port system properties. Ignore host setting in yarn-cluster mode. + // Set Spark driver host and port system properties. Ignore host setting in all cluster modes. if (master.contains("cluster")) { conf.set("spark.driver.host", Utils.localHostName()) } else { From 32a3f3f81b3422ed7dbc5ccd83d7eaf7b1d6f07c Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 11 Nov 2014 17:01:14 +0800 Subject: [PATCH 4/6] ingore it in SparkSubmit instead of SparkContext --- .../main/scala/org/apache/spark/SparkContext.scala | 12 ++++-------- .../main/scala/org/apache/spark/deploy/Client.scala | 8 -------- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++++ 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7114c336e3e9f..2ead025ddb62f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -175,6 +175,10 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { logInfo("Spark configuration:\n" + conf.toDebugString) } + // Set Spark driver host and port system properties. + conf.setIfMissing("spark.driver.host", Utils.localHostName()) + conf.setIfMissing("spark.driver.port", "0") + val jars: Seq[String] = conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten @@ -202,14 +206,6 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") - // Set Spark driver host and port system properties. Ignore host setting in all cluster modes. - if (master.contains("cluster")) { - conf.set("spark.driver.host", Utils.localHostName()) - } else { - conf.setIfMissing("spark.driver.host", Utils.localHostName()) - } - conf.setIfMissing("spark.driver.port", "0") - // An asynchronous listener bus for Spark events private[spark] val listenerBus = new LiveListenerBus diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 8e54c0042f996..f2687ce6b42b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -155,14 +155,6 @@ object Client { } conf.set("spark.akka.askTimeout", "10") conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) - - // Set the web ui port to be ephemeral so we don't conflict with other spark processes - // running on the same box - conf.set("spark.ui.port", "0") - - // Set the master property to match the requested mode. - conf.set("spark.master", "standalone-cluster") - Logger.getRootLogger.setLevel(driverArgs.logLevel) val (actorSystem, _) = AkkaUtils.createActorSystem( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b43e68e40f791..d7ba8b5826a76 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -279,6 +279,10 @@ object SparkSubmit { sysProps.getOrElseUpdate(k, v) } + if (deployMode == CLUSTER) { + sysProps -= ("spark.driver.host") + } + // Resolve paths in certain spark properties val pathConfigs = Seq( "spark.jars", From 02c4e49fc9bad5796986c40d2c5910963e556213 Mon Sep 17 00:00:00 2001 From: WangTao Date: Thu, 13 Nov 2014 23:14:46 +0800 Subject: [PATCH 5/6] add comment --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d7ba8b5826a76..5de384c8b3797 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -279,6 +279,7 @@ object SparkSubmit { sysProps.getOrElseUpdate(k, v) } + // Ignore invalid spark.driver.host in cluster modes. if (deployMode == CLUSTER) { sysProps -= ("spark.driver.host") } From ed1a25c85b2c80802f29700f363b9ef05721b395 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Fri, 14 Nov 2014 09:39:26 +0800 Subject: [PATCH 6/6] revert unrelated formatting issue --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2ead025ddb62f..3cdaa6a9cc8a8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -175,7 +175,7 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { logInfo("Spark configuration:\n" + conf.toDebugString) } - // Set Spark driver host and port system properties. + // Set Spark driver host and port system properties conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.port", "0")