Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Python Bindings for launching PySpark Jobs from the JVM #364

Merged
merged 29 commits into from
Jul 3, 2017

Conversation

ifilonenko
Copy link
Member

What changes were proposed in this pull request?

The changes that were proposed in the pull request are the following:

  1. Using separate docker images, built on-top of spark-base, for PySpark jobs.
    These images differ with the inclusion of python and pyspark specific environment variables. The user-entry point also differs for driver-py as you must include the location of the primary PySpark file and distributed py-files in addition to driver args.
  2. New FileMountingTrait that is generic enough for both SparkR and PySpark to handle passing in the proper arguments for PythonRunner and RRunner. This FileMounter uses the filesDownloadPath resolved in the DriverInitComponent to ensure that file paths are correct. These file-paths are stored as environmental variables that are mounted on the driver pod.
  3. Inclusion of integration tests for PySpark (TODO: Build environment identical to distribution python environment to run the tests)
  4. Match statements to account for varying arguments and malformed inputs which may include nulls or a mix of local:// and file:// file-types.

Example Spark Submit

This is an example spark-submit that uses the custom pyspark docker images and distributes the staged sort.py file across the cluster. The entry point for the driver is:
org.apache.spark.deploy.PythonRunner <FILE_DOWNLOADS_PATH>/pi.py <FILE_DOWNLOADS_PATH>/sort.py 100

bin/spark-submit \
  --deploy-mode cluster \
  --master k8s://<k8s-api-url> \
  --kubernetes-namespace default \
  --conf spark.executor.memory=500m \
  --conf spark.driver.memory=1G \
  --conf spark.driver.cores=1 \
  --conf spark.executor.cores=1 \
  --conf spark.executor.instances=1 \
  --conf spark.app.name=spark-pi \
  --conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
  --conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
  --conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
  --conf spark.kubernetes.resourceStagingServer.uri=http://192.168.99.100:31000 \
  --py-files examples/src/main/python/sort.py \
  examples/src/main/python/pi.py 100

How was this patch tested?

This was fully tested by building a make_distribution environment and running on a local minikube cluster with a single executor. The following command is an example submission:

$ build/mvn install -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
$ build/mvn compile -T 4C -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
$ dev/make-distribution.sh --pip --tgz -Phadoop-2.7 -Pkubernetes
$ tar -xvf spark-2.1.0-k8s-0.2.0-SNAPSHOT-bin-2.7.3.tgz
$ cd spark-2.1.0-k8s-0.2.0-SNAPSHOT-bin-2.7.3
$ minikube start --insecure-registry=localhost:5000 --cpus 8 --disk-size 20g --memory 8000 --kubernetes-version v1.5.3; eval $(minikube docker-env)
$ # Build all docker images using docker build ....
$ # Make sure staging server is up 
$ kubectl cluster-info
Kubernetes master is running at https://192.168.99.100:8443
KubeDNS is running at https://192.168.99.100:8443/api/v1/proxy/namespaces/kube-system/services/kube-dns
kubernetes-dashboard is running at https://192.168.99.100:8443/api/v1/proxy/namespaces/kube-system/services/kubernetes-dashboard
$ docker images
REPOSITORY                                          
spark-integration-test-asset-server                 
spark-init                                           
spark-resource-staging-server                         
spark-shuffle                                      
spark-executor-py                                    
spark-executor                                      
spark-driver-py                                      
spark-driver                                        
spark-base                                         
$ bin/spark-submit \
  --deploy-mode cluster \
  --master k8s://https://192.168.99.100:8443 \
  --kubernetes-namespace default \
  --conf spark.executor.memory=500m \
  --conf spark.driver.memory=1G \
  --conf spark.driver.cores=1 \
  --conf spark.executor.cores=1 \
  --conf spark.executor.instances=1 \
  --conf spark.app.name=spark-pi \
  --conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
  --conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
  --conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
  --conf spark.kubernetes.resourceStagingServer.uri=http://192.168.99.100:31000 \
  --py-files examples/src/main/python/sort.py \
  local:///opt/spark/examples/src/main/python/pi.py 100

Integration and Unit tests have been added.

Future Versions of this PR

Launching JVM from Python (log issue)
MemoryOverhead testing (OOMKilled errors)

ifilonenko and others added 23 commits June 16, 2017 13:06
This change overhauls the underlying architecture of the submission
client, but it is intended to entirely preserve existing behavior of
Spark applications. Therefore users will find this to be an invisible
change.

The philosophy behind this design is to reconsider the breakdown of the
submission process. It operates off the abstraction of "submission
steps", which are transformation functions that take the previous state
of the driver and return the new state of the driver. The driver's state
includes its Spark configurations and the Kubernetes resources that will
be used to deploy it.

Such a refactor moves away from a features-first API design, which
considers different containers to serve a set of features. The previous
design, for example, had a container files resolver API object that
returned different resolutions of the dependencies added by the user.
However, it was up to the main Client to know how to intelligently
invoke all of those APIs. Therefore the API surface area of the file
resolver became untenably large and it was not intuitive of how it was
to be used or extended.

This design changes the encapsulation layout; every module is now
responsible for changing the driver specification directly. An
orchestrator builds the correct chain of steps and hands it to the
client, which then calls it verbatim. The main client then makes any
final modifications that put the different pieces of the driver
together, particularly to attach the driver container itself to the pod
and to apply the Spark configuration as command-line arguments.
@ifilonenko
Copy link
Member Author

@mccheah PTAL

childArgs += "org.apache.spark.deploy.PythonRunner"
childArgs += "--other-py-files"
childArgs += args.pyFiles
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check if it's empty?

@mccheah
Copy link

mccheah commented Jun 30, 2017

This was not supposed to include #363 so please hold off on reviews until we revert that merge.

@ifilonenko
Copy link
Member Author

@tnachen PTAL now

@mccheah
Copy link

mccheah commented Jun 30, 2017

I think this is ok to merge given that we'll be redesigning a lot of the architecture in #365. Feature-wise it's more or less what we need and the integration tests plus manual testing have verified correctness. @foxish did you have any last comments or can we merge this soon?

} else None
// Since you might need jars for SQL UDFs in PySpark
def sparkJarFilter() : Seq[String] =
pythonResource.map { p => p.sparkJars}.getOrElse(
Copy link

@tnachen tnachen Jun 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: {p => p.SparkJars}
To be consistent every else here

Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs))
} else None
// Since you might need jars for SQL UDFs in PySpark
def sparkJarFilter() : Seq[String] =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: sparkJarFilter(): Seq[String] =

jarsDownloadPath: String,
filesDownloadPath: String) extends ContainerLocalizedFilesResolver {
filesDownloadPath: String ) extends ContainerLocalizedFilesResolver {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra space before )

@@ -32,13 +32,15 @@ import org.apache.spark.util.Utils
*/
private[spark] trait DriverInitContainerComponentsProvider {

def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver
def provideContainerLocalizedFilesResolver(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra space after )

resolvedPrimaryPySparkResource: String,
resolvedPySparkFiles: String,
driverContainerName: String,
driverPodBuilder: PodBuilder) : Pod
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the style consistent with the : spacing.

}
}
}
override def primaryPySparkResource (
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As well with ( spacing

import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfter


Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra spaces

@tnachen
Copy link

tnachen commented Jun 30, 2017

Besides style problems, nothing jumps out to me as being wrong. Since it's tested already, I think it LGTM.

@ifilonenko
Copy link
Member Author

@foxish waiting on your OK to merge

@foxish
Copy link
Member

foxish commented Jul 1, 2017

LGTM. Feel free to merge. Thanks!

@erikerlandson
Copy link
Member

Likewise, LGTM

@foxish foxish merged commit befcf0a into branch-2.1-kubernetes Jul 3, 2017
foxish pushed a commit that referenced this pull request Jul 24, 2017
* Adding PySpark Submit functionality. Launching Python from JVM

* Addressing scala idioms related to PR351

* Removing extends Logging which was necessary for LogInfo

* Refactored code to leverage the ContainerLocalizedFileResolver

* Modified Unit tests so that they would pass

* Modified Unit Test input to pass Unit Tests

* Setup working environent for integration tests for PySpark

* Comment out Python thread logic until Jenkins has python in Python

* Modifying PythonExec to pass on Jenkins

* Modifying python exec

* Added unit tests to ClientV2 and refactored to include pyspark submission resources

* Modified unit test check

* Scalastyle

* PR 348 file conflicts

* Refactored unit tests and styles

* further scala stylzing and logic

* Modified unit tests to be more specific towards Class in question

* Removed space delimiting for methods

* Submission client redesign to use a step-based builder pattern.

This change overhauls the underlying architecture of the submission
client, but it is intended to entirely preserve existing behavior of
Spark applications. Therefore users will find this to be an invisible
change.

The philosophy behind this design is to reconsider the breakdown of the
submission process. It operates off the abstraction of "submission
steps", which are transformation functions that take the previous state
of the driver and return the new state of the driver. The driver's state
includes its Spark configurations and the Kubernetes resources that will
be used to deploy it.

Such a refactor moves away from a features-first API design, which
considers different containers to serve a set of features. The previous
design, for example, had a container files resolver API object that
returned different resolutions of the dependencies added by the user.
However, it was up to the main Client to know how to intelligently
invoke all of those APIs. Therefore the API surface area of the file
resolver became untenably large and it was not intuitive of how it was
to be used or extended.

This design changes the encapsulation layout; every module is now
responsible for changing the driver specification directly. An
orchestrator builds the correct chain of steps and hands it to the
client, which then calls it verbatim. The main client then makes any
final modifications that put the different pieces of the driver
together, particularly to attach the driver container itself to the pod
and to apply the Spark configuration as command-line arguments.

* Don't add the init-container step if all URIs are local.

* Python arguments patch + tests + docs

* Revert "Python arguments patch + tests + docs"

This reverts commit 4533df2.

* Revert "Don't add the init-container step if all URIs are local."

This reverts commit e103225.

* Revert "Submission client redesign to use a step-based builder pattern."

This reverts commit 5499f6d.

* style changes

* space for styling
@foxish foxish deleted the pyspark-integration branch September 5, 2017 02:35
--conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
--conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
--conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
--py-files local:///opt/spark/examples/src/main/python/sort.py \
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is unrelated, but so happy to have local support :)

Copy link

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is merged, but I figured I'd leave some comments/questions if that works for folks?

}

private[spark] class ContainerLocalizedFilesResolverImpl(
sparkJars: Seq[String],
sparkFiles: Seq[String],
pySparkFiles: Seq[String],
primaryPyFile: String,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an option?

val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
// Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is pre-existing but why?

if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
// Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes'
// is empty or only has `local://` URIs
if ((KubernetesFileUtils.getNonContainerLocalFiles(uris) ++ pySparkSubmitted).nonEmpty) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right now pySparkSubmitted is defined as val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles) and that seems like not what is desired based on the previous comment and also the logic used for the non-container local files before hand.

# RUN apk add --update alpine-sdk python-dev
# RUN pip install numpy

ENV PYTHON_VERSION 2.7.13
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a weird version to set Python to? Do we expect people to roll their own Py3 containers?

ENV PYTHON_VERSION 2.7.13
ENV PYSPARK_PYTHON python
ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we source this from one of the existing pyspark scripts so it doesn't need to be updated?

If not, and we need this, we should add a comment about needing to update this when we update elsehwere. Also spark master is now @ 0.10.6.

ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
puneetloya pushed a commit to puneetloya/spark that referenced this pull request Mar 11, 2019
…-on-k8s#364)

* Adding PySpark Submit functionality. Launching Python from JVM

* Addressing scala idioms related to PR351

* Removing extends Logging which was necessary for LogInfo

* Refactored code to leverage the ContainerLocalizedFileResolver

* Modified Unit tests so that they would pass

* Modified Unit Test input to pass Unit Tests

* Setup working environent for integration tests for PySpark

* Comment out Python thread logic until Jenkins has python in Python

* Modifying PythonExec to pass on Jenkins

* Modifying python exec

* Added unit tests to ClientV2 and refactored to include pyspark submission resources

* Modified unit test check

* Scalastyle

* PR 348 file conflicts

* Refactored unit tests and styles

* further scala stylzing and logic

* Modified unit tests to be more specific towards Class in question

* Removed space delimiting for methods

* Submission client redesign to use a step-based builder pattern.

This change overhauls the underlying architecture of the submission
client, but it is intended to entirely preserve existing behavior of
Spark applications. Therefore users will find this to be an invisible
change.

The philosophy behind this design is to reconsider the breakdown of the
submission process. It operates off the abstraction of "submission
steps", which are transformation functions that take the previous state
of the driver and return the new state of the driver. The driver's state
includes its Spark configurations and the Kubernetes resources that will
be used to deploy it.

Such a refactor moves away from a features-first API design, which
considers different containers to serve a set of features. The previous
design, for example, had a container files resolver API object that
returned different resolutions of the dependencies added by the user.
However, it was up to the main Client to know how to intelligently
invoke all of those APIs. Therefore the API surface area of the file
resolver became untenably large and it was not intuitive of how it was
to be used or extended.

This design changes the encapsulation layout; every module is now
responsible for changing the driver specification directly. An
orchestrator builds the correct chain of steps and hands it to the
client, which then calls it verbatim. The main client then makes any
final modifications that put the different pieces of the driver
together, particularly to attach the driver container itself to the pod
and to apply the Spark configuration as command-line arguments.

* Don't add the init-container step if all URIs are local.

* Python arguments patch + tests + docs

* Revert "Python arguments patch + tests + docs"

This reverts commit 4533df2.

* Revert "Don't add the init-container step if all URIs are local."

This reverts commit e103225.

* Revert "Submission client redesign to use a step-based builder pattern."

This reverts commit 5499f6d.

* style changes

* space for styling
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants