diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 1c92af45f869a..acbaba6791850 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -35,10 +35,6 @@ private[spark] object PythonUtils { pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator) } pythonPath ++= SparkContext.jarOfObject(this) - sys.env.get("PYSPARK_ARCHIVES_PATH") match { - case Some(path) => pythonPath += path - case None => // do nothing - } pythonPath.mkString(File.pathSeparator) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index e314408c067e9..3f6e0dd6050d2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -49,7 +49,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val pythonPath = PythonUtils.mergePythonPaths( PythonUtils.sparkPythonPath, - envVars.getOrElse("PYTHONPATH", ""), + envVars.getOrElse("PYTHONPATH", sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")), sys.env.getOrElse("PYTHONPATH", "")) def create(): Socket = { diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 53e18c4bcec23..569abcfff68f5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -52,6 +52,7 @@ object PythonRunner { pathElements ++= formattedPyFiles pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") + pathElements += sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "") val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) // Launch Python process diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 73afbe06df606..3ecbf2308cd44 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -22,9 +22,6 @@ For Python, you can use the `--py-files` argument of `spark-submit` to add `.py` files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a `.zip` or `.egg`. -As Python can not read files from assembly jar which packaged by JDK1.7+, so packaging pyspark into a -`.zip`(the name contains "pyspark") and use `--py-files` argument of `spark-submit` to distribute it. - # Launching Applications with spark-submit Once a user application is bundled, it can be launched using the `bin/spark-submit` script. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c1effd3c8a718..b1ee4096f4b6c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -247,6 +247,7 @@ private[spark] class Client( List( (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR), (APP_JAR, args.userJar, CONF_SPARK_USER_JAR), + (PYSPARK_ARCHIVES, pysparkArchives(sparkConf), CONF_PYSPARK_ARCHIVES), ("log4j.properties", oldLog4jConf.orNull, null) ).foreach { case (destName, _localPath, confKey) => val localPath: String = if (_localPath != null) _localPath.trim() else "" @@ -386,6 +387,12 @@ private[spark] class Client( val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) val launchEnv = setupLaunchEnv(appStagingDir) + // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are + // package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this + // path to PYTHONPATH. + for ((resPath, res) <- localResources if resPath.contains(PYSPARK_ARCHIVES)) { + launchEnv("PYSPARK_ARCHIVES_PATH") = resPath + } val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) amContainer.setEnvironment(launchEnv) @@ -681,9 +688,10 @@ object Client extends Logging { new Client(args, sparkConf).run() } - // Alias for the Spark assembly jar and the user jar + // Alias for the Spark assembly jar, the user jar and PySpark archives val SPARK_JAR: String = "__spark__.jar" val APP_JAR: String = "__app__.jar" + val PYSPARK_ARCHIVES: String = "__pyspark__.zip" // URI scheme that identifies local resources val LOCAL_SCHEME = "local" @@ -695,6 +703,9 @@ object Client extends Logging { val CONF_SPARK_JAR = "spark.yarn.jar" val ENV_SPARK_JAR = "SPARK_JAR" + // Location of any user-defined PySpark archives + val CONF_PYSPARK_ARCHIVES = "spark.pyspark.archives" + // Internal config to propagate the location of the user's jar to the driver/executors val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" @@ -733,6 +744,19 @@ object Client extends Logging { } } + /** + * Find the user-defined PySpark archives if configured, or return default. + * The default pyspark.zip is in the same path with assembly jar. + */ + private def pysparkArchives(conf: SparkConf): String = { + if (conf.contains(CONF_PYSPARK_ARCHIVES)) { + conf.get(CONF_PYSPARK_ARCHIVES) + } else { + val sparkJarPath = SparkContext.jarOfClass(this.getClass).head + sparkJarPath.substring(0, sparkJarPath.lastIndexOf('/')) + "/pyspark.zip" + } + } + /** * Return the path to the given application's staging directory. */ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index a8341991f64ad..2325ffcb78772 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -75,12 +75,7 @@ class ExecutorRunnable( val localResources = prepareLocalResources ctx.setLocalResources(localResources) - // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are - // package by JDK 1.7+, so we ship PySpark archives to executors by Yarn with --py-files, and - // add this path to PYTHONPATH. - for ((resPath, res) <- localResources if resPath.contains("pyspark")) { - env("PYSPARK_ARCHIVES_PATH") = resPath - } + ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() @@ -304,6 +299,12 @@ class ExecutorRunnable( } System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } + + // Add PySpark archives path + sys.env.get("PYSPARK_ARCHIVES_PATH") match { + case Some(pythonArchivesPath) => env("PYSPARK_ARCHIVES_PATH") = pythonArchivesPath + case None => + } env } }