Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1004. PySpark on YARN #30

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
42 changes: 42 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,48 @@
</environmentVariables>
</configuration>
</plugin>
<!-- Unzip py4j so we can include its files in the jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()

// Redirect the worker's stderr to ours
Expand Down Expand Up @@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()

// Redirect the stderr to ours
Expand Down
3 changes: 3 additions & 0 deletions docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.

# Running PySpark on YARN

To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".

# Interactive Use

Expand Down
3 changes: 3 additions & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*.pyc
docs/
pyspark.egg-info
build/
dist/
1 change: 0 additions & 1 deletion python/lib/PY4J_VERSION.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@
Main entry point for accessing data stored in Apache Hive..
"""



import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))


from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
Expand Down
29 changes: 27 additions & 2 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

set_env_vars_for_yarn()

def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
Expand Down Expand Up @@ -70,3 +71,27 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
return gateway

def set_env_vars_for_yarn():
# Add the spark jar, which includes the pyspark files, to the python path
env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
if "PYTHONPATH" in env_map:
env_map["PYTHONPATH"] += ":spark.jar"
else:
env_map["PYTHONPATH"] = "spark.jar"

os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())

def parse_env(env_str):
# Turns a comma-separated of env settings into a dict that maps env vars to
# their values.
env = {}
for var_str in env_str.split(","):
parts = var_str.split("=")
if len(parts) == 2:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be worth it to crash or throw an error when passed an invalid env string?

env[parts[0]] = parts[1]
elif len(var_str) > 0:
print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
sys.exit(1)

return env
4 changes: 3 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@

from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int


SPARK_HOME = os.environ["SPARK_HOME"]


class PySparkTestCase(unittest.TestCase):

def setUp(self):
Expand Down
3 changes: 3 additions & 0 deletions sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ this="$config_bin/$script"
export SPARK_PREFIX=`dirname "$this"`/..
export SPARK_HOME=${SPARK_PREFIX}
export SPARK_CONF_DIR="$SPARK_HOME/conf"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this script I think only gets called when launching the standalone daemons. Would it make more sense to put this in spark-class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; I think we should move these lines to spark-class to make sure that workers use the right PYTHONPATH even if they're started manually through spark-class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we never addressed this. Should we move this into spark-submit, now that we have that?

export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH