Skip to content

Commit

Permalink
[SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Introducing R Bindings for Spark R on K8s

- [x] Running SparkR Job

## How was this patch tested?

This patch was tested with

- [x] Unit Tests
- [x] Integration Tests

## Example:

Commands to run example spark job:
1. `dev/make-distribution.sh --pip --r --tgz -Psparkr -Phadoop-2.7 -Pkubernetes`
2. `bin/docker-image-tool.sh -m -t testing build`
3.
```
bin/spark-submit \
    --master k8s://https://192.168.64.33:8443 \
    --deploy-mode cluster \
    --name spark-r \
    --conf spark.executor.instances=1 \
    --conf spark.kubernetes.container.image=spark-r:testing \
    local:///opt/spark/examples/src/main/r/dataframe.R
```

This above spark-submit command works given the distribution. (Will include this integration test in PR once PRB is ready).

Author: Ilan Filonenko <if56@cornell.edu>

Closes #21584 from ifilonenko/spark-r.
  • Loading branch information
ifilonenko authored and mccheah committed Aug 17, 2018
1 parent da2dc69 commit ba84bcb
Show file tree
Hide file tree
Showing 17 changed files with 344 additions and 27 deletions.
23 changes: 16 additions & 7 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ function build {
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}
local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"}

docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
Expand All @@ -79,11 +80,16 @@ function build {
docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .

docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-r) \
-f "$RDOCKERFILE" .
}

function push {
docker push "$(image_ref spark)"
docker push "$(image_ref spark-py)"
docker push "$(image_ref spark-r)"
}

function usage {
Expand All @@ -97,12 +103,13 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.
Options:
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark.
-R file Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.
Expand Down Expand Up @@ -133,14 +140,16 @@ REPO=
TAG=
BASEDOCKERFILE=
PYDOCKERFILE=
RDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
while getopts f:p:mr:t:n:b: option
while getopts f:p:R:mr:t:n:b: option
do
case "${option}"
in
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
R) RDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
n) NOCACHEARG="--no-cache";;
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,6 @@ private[spark] class SparkSubmit extends Logging {
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isR =>
error("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
error("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand Down Expand Up @@ -700,7 +698,11 @@ private[spark] class SparkSubmit extends Logging {
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else {
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
}
else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_R_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.r.mainAppResource")
.doc("The main app resource for SparkR jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_R_APP_ARGS =
ConfigBuilder("spark.kubernetes.r.appArgs")
.doc("The app arguments for SparkR Jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private[spark] object Constants {
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
val ENV_R_PRIMARY = "R_PRIMARY"
val ENV_R_ARGS = "R_APP_ARGS"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_MAJOR_PYTHON_VERSION)

def sparkRMainResource(): Option[String] = sparkConf
.get(KUBERNETES_R_MAIN_APP_RESOURCE)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets(): Seq[LocalObjectReference] = {
Expand Down Expand Up @@ -125,14 +128,17 @@ private[spark] object KubernetesConf {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
// The function of this outer match is to account for multiple nonJVM
// bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4
// bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4
case nonJVM: NonJVMResource =>
nonJVM match {
case PythonMainAppResource(res) =>
additionalFiles += res
maybePyFiles.foreach{maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))}
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
case RMainAppResource(res) =>
additionalFiles += res
sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res)
}
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features.bindings

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep

private[spark] class RDriverFeatureStep(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val roleConf = kubernetesConf.roleSpecificConf
require(roleConf.mainAppResource.isDefined, "R Main Resource must be defined")
val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
rArgs =>
new EnvVarBuilder()
.withName(ENV_R_ARGS)
.withValue(rArgs.mkString(","))
.build())
val envSeq =
Seq(new EnvVarBuilder()
.withName(ENV_R_PRIMARY)
.withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get))
.build())
val rEnvs = envSeq ++
maybeRArgs.toSeq

val withRPrimaryContainer = new ContainerBuilder(pod.container)
.addAllToEnv(rEnvs.asJava)
.addToArgs("driver-r")
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", roleConf.mainClass)
.build()

SparkPod(pod.pod, withRPrimaryContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ private[spark] object ClientArguments {
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
case Array("--primary-py-file", primaryPythonResource: String) =>
mainAppResource = Some(PythonMainAppResource(primaryPythonResource))
case Array("--primary-r-file", primaryRFile: String) =>
mainAppResource = Some(RMainAppResource(primaryRFile))
case Array("--other-py-files", pyFiles: String) =>
maybePyFiles = Some(pyFiles)
case Array("--main-class", clazz: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep}
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}

private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
Expand All @@ -40,14 +40,18 @@ private[spark] class KubernetesDriverBuilder(
provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountVolumesFeatureStep) =
new MountVolumesFeatureStep(_),
provideJavaStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> JavaDriverFeatureStep) =
new JavaDriverFeatureStep(_),
providePythonStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> PythonDriverFeatureStep) =
new PythonDriverFeatureStep(_)) {
new PythonDriverFeatureStep(_),
provideRStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> RDriverFeatureStep) =
new RDriverFeatureStep(_),
provideJavaStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> JavaDriverFeatureStep) =
new JavaDriverFeatureStep(_)) {

def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
Expand All @@ -71,7 +75,9 @@ private[spark] class KubernetesDriverBuilder(
case JavaMainAppResource(_) =>
provideJavaStep(kubernetesConf)
case PythonMainAppResource(_) =>
providePythonStep(kubernetesConf)}
providePythonStep(kubernetesConf)
case RMainAppResource(_) =>
provideRStep(kubernetesConf)}
.getOrElse(provideJavaStep(kubernetesConf))

val allFeatures = (baseFeatures :+ bindingsStep) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ private[spark] case class JavaMainAppResource(primaryResource: String) extends M

private[spark] case class PythonMainAppResource(primaryResource: String)
extends MainAppResource with NonJVMResource

private[spark] case class RMainAppResource(primaryResource: String)
extends MainAppResource with NonJVMResource
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ class KubernetesConfSuite extends SparkFunSuite {
=== Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles)
}

test("Creating driver conf with a r primary file") {
val mainResourceFile = "local:///opt/spark/main.R"
val sparkConf = new SparkConf(false)
.setJars(Seq("local:///opt/spark/jar1.jar"))
.set("spark.files", "local:///opt/spark/example2.R")
val mainAppResource = Some(RMainAppResource(mainResourceFile))
val kubernetesConfWithMainResource = KubernetesConf.createDriverConf(
sparkConf,
APP_NAME,
RESOURCE_NAME_PREFIX,
APP_ID,
mainAppResource,
MAIN_CLASS,
APP_ARGS,
maybePyFiles = None)
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
=== Array("local:///opt/spark/jar1.jar"))
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4)
assert(kubernetesConfWithMainResource.sparkFiles
=== Array("local:///opt/spark/example2.R", mainResourceFile))
}

test("Testing explicit setting of memory overhead on non-JVM tasks") {
val sparkConf = new SparkConf(false)
.set(MEMORY_OVERHEAD_FACTOR, 0.3)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features.bindings

import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.RMainAppResource

class RDriverFeatureStepSuite extends SparkFunSuite {

test("R Step modifies container correctly") {
val expectedMainResource = "/main.R"
val mainResource = "local:///main.R"
val baseDriverPod = SparkPod.initialPod()
val sparkConf = new SparkConf(false)
.set(KUBERNETES_R_MAIN_APP_RESOURCE, mainResource)
val kubernetesConf = KubernetesConf(
sparkConf,
KubernetesDriverSpecificConf(
Some(RMainAppResource(mainResource)),
"test-app",
"r-runner",
Seq("5 7")),
appResourceNamePrefix = "",
appId = "",
roleLabels = Map.empty,
roleAnnotations = Map.empty,
roleSecretNamesToMountPaths = Map.empty,
roleSecretEnvNamesToKeyRefs = Map.empty,
roleEnvs = Map.empty,
roleVolumes = Seq.empty,
sparkFiles = Seq.empty[String])

val step = new RDriverFeatureStep(kubernetesConf)
val driverContainerwithR = step.configurePod(baseDriverPod).container
assert(driverContainerwithR.getEnv.size === 2)
val envs = driverContainerwithR
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs(ENV_R_PRIMARY) === expectedMainResource)
assert(envs(ENV_R_ARGS) === "5 7")
}
}
Loading

0 comments on commit ba84bcb

Please sign in to comment.