From ba84bcb2c4f73baf63782ff6fad5a607008c7cd2 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 17 Aug 2018 16:04:02 -0700 Subject: [PATCH] [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s ## What changes were proposed in this pull request? Introducing R Bindings for Spark R on K8s - [x] Running SparkR Job ## How was this patch tested? This patch was tested with - [x] Unit Tests - [x] Integration Tests ## Example: Commands to run example spark job: 1. `dev/make-distribution.sh --pip --r --tgz -Psparkr -Phadoop-2.7 -Pkubernetes` 2. `bin/docker-image-tool.sh -m -t testing build` 3. ``` bin/spark-submit \ --master k8s://https://192.168.64.33:8443 \ --deploy-mode cluster \ --name spark-r \ --conf spark.executor.instances=1 \ --conf spark.kubernetes.container.image=spark-r:testing \ local:///opt/spark/examples/src/main/r/dataframe.R ``` This above spark-submit command works given the distribution. (Will include this integration test in PR once PRB is ready). Author: Ilan Filonenko Closes #21584 from ifilonenko/spark-r. --- bin/docker-image-tool.sh | 23 ++++--- .../org/apache/spark/deploy/SparkSubmit.scala | 8 ++- .../org/apache/spark/deploy/k8s/Config.scala | 13 ++++ .../apache/spark/deploy/k8s/Constants.scala | 2 + .../spark/deploy/k8s/KubernetesConf.scala | 8 ++- .../bindings/RDriverFeatureStep.scala | 59 +++++++++++++++++ .../submit/KubernetesClientApplication.scala | 2 + .../k8s/submit/KubernetesDriverBuilder.scala | 22 ++++--- .../deploy/k8s/submit/MainAppResource.scala | 3 + .../deploy/k8s/KubernetesConfSuite.scala | 22 +++++++ .../bindings/RDriverFeatureStepSuite.scala | 63 +++++++++++++++++++ .../submit/KubernetesDriverBuilderSuite.scala | 36 ++++++++++- .../dockerfiles/spark/bindings/R/Dockerfile | 29 +++++++++ .../src/main/dockerfiles/spark/entrypoint.sh | 14 ++++- .../ClientModeTestsSuite.scala | 2 +- .../k8s/integrationtest/KubernetesSuite.scala | 21 ++++++- .../k8s/integrationtest/RTestsSuite.scala | 44 +++++++++++++ 17 files changed, 344 insertions(+), 27 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala create mode 100644 resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index cd22e75402f56..d6371051ef7fb 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -71,6 +71,7 @@ function build { ) local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} + local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"} docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ -t $(image_ref spark) \ @@ -79,11 +80,16 @@ function build { docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ -t $(image_ref spark-py) \ -f "$PYDOCKERFILE" . + + docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ + -t $(image_ref spark-r) \ + -f "$RDOCKERFILE" . } function push { docker push "$(image_ref spark)" docker push "$(image_ref spark-py)" + docker push "$(image_ref spark-r)" } function usage { @@ -97,12 +103,13 @@ Commands: push Push a pre-built image to a registry. Requires a repository address to be provided. Options: - -f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark. - -p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark. - -r repo Repository address. - -t tag Tag to apply to the built image, or to identify the image to be pushed. - -m Use minikube's Docker daemon. - -n Build docker image with --no-cache + -f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark. + -p file Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark. + -R file Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark. + -r repo Repository address. + -t tag Tag to apply to the built image, or to identify the image to be pushed. + -m Use minikube's Docker daemon. + -n Build docker image with --no-cache -b arg Build arg to build or push the image. For multiple build args, this option needs to be used separately for each build arg. @@ -133,14 +140,16 @@ REPO= TAG= BASEDOCKERFILE= PYDOCKERFILE= +RDOCKERFILE= NOCACHEARG= BUILD_PARAMS= -while getopts f:p:mr:t:n:b: option +while getopts f:p:R:mr:t:n:b: option do case "${option}" in f) BASEDOCKERFILE=${OPTARG};; p) PYDOCKERFILE=${OPTARG};; + R) RDOCKERFILE=${OPTARG};; r) REPO=${OPTARG};; t) TAG=${OPTARG};; n) NOCACHEARG="--no-cache";; diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 6e70bcd7fc088..cf902db8709e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -286,8 +286,6 @@ private[spark] class SparkSubmit extends Logging { case (STANDALONE, CLUSTER) if args.isR => error("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") - case (KUBERNETES, _) if args.isR => - error("R applications are currently not supported for Kubernetes.") case (LOCAL, CLUSTER) => error("Cluster deploy mode is not compatible with master \"local\"") case (_, CLUSTER) if isShell(args.primaryResource) => @@ -700,7 +698,11 @@ private[spark] class SparkSubmit extends Logging { if (args.pyFiles != null) { childArgs ++= Array("--other-py-files", args.pyFiles) } - } else { + } else if (args.isR) { + childArgs ++= Array("--primary-r-file", args.primaryResource) + childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner") + } + else { childArgs ++= Array("--primary-java-resource", args.primaryResource) childArgs ++= Array("--main-class", args.mainClass) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4442333c573cc..1b582fe53624a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -139,6 +139,19 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_R_MAIN_APP_RESOURCE = + ConfigBuilder("spark.kubernetes.r.mainAppResource") + .doc("The main app resource for SparkR jobs") + .internal() + .stringConf + .createOptional + + val KUBERNETES_R_APP_ARGS = + ConfigBuilder("spark.kubernetes.r.appArgs") + .doc("The app arguments for SparkR Jobs") + .internal() + .stringConf + .createOptional val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index f82cd7fd02e12..8202d874a4626 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -71,6 +71,8 @@ private[spark] object Constants { val ENV_PYSPARK_FILES = "PYSPARK_FILES" val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS" val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION" + val ENV_R_PRIMARY = "R_PRIMARY" + val ENV_R_ARGS = "R_APP_ARGS" // Miscellaneous val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 866ba3cbaa9c3..3aa35d419073f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -78,6 +78,9 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def pySparkPythonVersion(): String = sparkConf .get(PYSPARK_MAJOR_PYTHON_VERSION) + def sparkRMainResource(): Option[String] = sparkConf + .get(KUBERNETES_R_MAIN_APP_RESOURCE) + def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) def imagePullSecrets(): Seq[LocalObjectReference] = { @@ -125,7 +128,7 @@ private[spark] object KubernetesConf { sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) } // The function of this outer match is to account for multiple nonJVM - // bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4 + // bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4 case nonJVM: NonJVMResource => nonJVM match { case PythonMainAppResource(res) => @@ -133,6 +136,9 @@ private[spark] object KubernetesConf { maybePyFiles.foreach{maybePyFiles => additionalFiles.appendAll(maybePyFiles.split(","))} sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) + case RMainAppResource(res) => + additionalFiles += res + sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res) } sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala new file mode 100644 index 0000000000000..b33b86e02ea6f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep + +private[spark] class RDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + override def configurePod(pod: SparkPod): SparkPod = { + val roleConf = kubernetesConf.roleSpecificConf + require(roleConf.mainAppResource.isDefined, "R Main Resource must be defined") + val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( + rArgs => + new EnvVarBuilder() + .withName(ENV_R_ARGS) + .withValue(rArgs.mkString(",")) + .build()) + val envSeq = + Seq(new EnvVarBuilder() + .withName(ENV_R_PRIMARY) + .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get)) + .build()) + val rEnvs = envSeq ++ + maybeRArgs.toSeq + + val withRPrimaryContainer = new ContainerBuilder(pod.container) + .addAllToEnv(rEnvs.asJava) + .addToArgs("driver-r") + .addToArgs("--properties-file", SPARK_CONF_PATH) + .addToArgs("--class", roleConf.mainClass) + .build() + + SparkPod(pod.pod, withRPrimaryContainer) + } + override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 9398faee2ea5c..986c950ab365a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -60,6 +60,8 @@ private[spark] object ClientArguments { mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) case Array("--primary-py-file", primaryPythonResource: String) => mainAppResource = Some(PythonMainAppResource(primaryPythonResource)) + case Array("--primary-r-file", primaryRFile: String) => + mainAppResource = Some(RMainAppResource(primaryRFile)) case Array("--other-py-files", pyFiles: String) => maybePyFiles = Some(pyFiles) case Array("--main-class", clazz: String) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 7208e3d377593..8f3f18ffadc3b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -17,8 +17,8 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} -import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep} +import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep} +import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} private[spark] class KubernetesDriverBuilder( provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = @@ -40,14 +40,18 @@ private[spark] class KubernetesDriverBuilder( provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), - provideJavaStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => JavaDriverFeatureStep) = - new JavaDriverFeatureStep(_), providePythonStep: ( KubernetesConf[KubernetesDriverSpecificConf] => PythonDriverFeatureStep) = - new PythonDriverFeatureStep(_)) { + new PythonDriverFeatureStep(_), + provideRStep: ( + KubernetesConf[KubernetesDriverSpecificConf] + => RDriverFeatureStep) = + new RDriverFeatureStep(_), + provideJavaStep: ( + KubernetesConf[KubernetesDriverSpecificConf] + => JavaDriverFeatureStep) = + new JavaDriverFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { @@ -71,7 +75,9 @@ private[spark] class KubernetesDriverBuilder( case JavaMainAppResource(_) => provideJavaStep(kubernetesConf) case PythonMainAppResource(_) => - providePythonStep(kubernetesConf)} + providePythonStep(kubernetesConf) + case RMainAppResource(_) => + provideRStep(kubernetesConf)} .getOrElse(provideJavaStep(kubernetesConf)) val allFeatures = (baseFeatures :+ bindingsStep) ++ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala index cbe081ae35683..dd5a4549743df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala @@ -24,3 +24,6 @@ private[spark] case class JavaMainAppResource(primaryResource: String) extends M private[spark] case class PythonMainAppResource(primaryResource: String) extends MainAppResource with NonJVMResource + +private[spark] case class RMainAppResource(primaryResource: String) + extends MainAppResource with NonJVMResource diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index ecdb71359c5bb..e3c19cdb81567 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -122,6 +122,28 @@ class KubernetesConfSuite extends SparkFunSuite { === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles) } + test("Creating driver conf with a r primary file") { + val mainResourceFile = "local:///opt/spark/main.R" + val sparkConf = new SparkConf(false) + .setJars(Seq("local:///opt/spark/jar1.jar")) + .set("spark.files", "local:///opt/spark/example2.R") + val mainAppResource = Some(RMainAppResource(mainResourceFile)) + val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( + sparkConf, + APP_NAME, + RESOURCE_NAME_PREFIX, + APP_ID, + mainAppResource, + MAIN_CLASS, + APP_ARGS, + maybePyFiles = None) + assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") + === Array("local:///opt/spark/jar1.jar")) + assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) + assert(kubernetesConfWithMainResource.sparkFiles + === Array("local:///opt/spark/example2.R", mainResourceFile)) + } + test("Testing explicit setting of memory overhead on non-JVM tasks") { val sparkConf = new SparkConf(false) .set(MEMORY_OVERHEAD_FACTOR, 0.3) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala new file mode 100644 index 0000000000000..8fdf91ef638f2 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features.bindings + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.RMainAppResource + +class RDriverFeatureStepSuite extends SparkFunSuite { + + test("R Step modifies container correctly") { + val expectedMainResource = "/main.R" + val mainResource = "local:///main.R" + val baseDriverPod = SparkPod.initialPod() + val sparkConf = new SparkConf(false) + .set(KUBERNETES_R_MAIN_APP_RESOURCE, mainResource) + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(RMainAppResource(mainResource)), + "test-app", + "r-runner", + Seq("5 7")), + appResourceNamePrefix = "", + appId = "", + roleLabels = Map.empty, + roleAnnotations = Map.empty, + roleSecretNamesToMountPaths = Map.empty, + roleSecretEnvNamesToKeyRefs = Map.empty, + roleEnvs = Map.empty, + roleVolumes = Seq.empty, + sparkFiles = Seq.empty[String]) + + val step = new RDriverFeatureStep(kubernetesConf) + val driverContainerwithR = step.configurePod(baseDriverPod).container + assert(driverContainerwithR.getEnv.size === 2) + val envs = driverContainerwithR + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envs(ENV_R_PRIMARY) === expectedMainResource) + assert(envs(ENV_R_ARGS) === "5 7") + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 046e578b94629..4117c5487a41e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -20,7 +20,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep} -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep} +import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -31,6 +31,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val SECRETS_STEP_TYPE = "mount-secrets" private val JAVA_STEP_TYPE = "java-bindings" private val PYSPARK_STEP_TYPE = "pyspark-bindings" + private val R_STEP_TYPE = "r-bindings" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" @@ -55,6 +56,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep]) + private val rStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + R_STEP_TYPE, classOf[RDriverFeatureStep]) + private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) @@ -70,8 +74,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => envSecretsStep, _ => localDirsStep, _ => mountVolumesStep, - _ => javaStep, - _ => pythonStep) + _ => pythonStep, + _ => rStep, + _ => javaStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -211,6 +216,31 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { JAVA_STEP_TYPE) } + test("Apply R step if main resource is R.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + Some(RMainAppResource("example.R")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String]) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + R_STEP_TYPE) + } private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) : Unit = { diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile new file mode 100644 index 0000000000000..e627883ba782e --- /dev/null +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG base_img +FROM $base_img +WORKDIR / +RUN mkdir ${SPARK_HOME}/R +COPY R ${SPARK_HOME}/R + +RUN apk add --no-cache R R-dev + +ENV R_HOME /usr/lib/R + +WORKDIR /opt/spark/work-dir +ENTRYPOINT [ "/opt/entrypoint.sh" ] diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 8bdb0f7a10795..216e8fe31becb 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -38,7 +38,7 @@ fi SPARK_K8S_CMD="$1" case "$SPARK_K8S_CMD" in - driver | driver-py | executor) + driver | driver-py | driver-r | executor) shift 1 ;; "") @@ -66,6 +66,10 @@ if [ -n "$PYSPARK_APP_ARGS" ]; then PYSPARK_ARGS="$PYSPARK_APP_ARGS" fi +R_ARGS="" +if [ -n "$R_APP_ARGS" ]; then + R_ARGS="$R_APP_ARGS" +fi if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then pyv="$(python -V 2>&1)" @@ -96,6 +100,14 @@ case "$SPARK_K8S_CMD" in "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS ) ;; + driver-r) + CMD=( + "$SPARK_HOME/bin/spark-submit" + --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" + --deploy-mode client + "$@" $R_PRIMARY $R_ARGS + ) + ;; executor) CMD=( ${JAVA_HOME}/bin/java diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala index 159cfd97ff403..c8bd584516ea5 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag, INTERVAL, TIMEOUT} -trait ClientModeTestsSuite { k8sSuite: KubernetesSuite => +private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite => test("Run in client mode.", k8sTestTag) { val labels = Map("spark-app-selector" -> driverPodName) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 13ce2efecbef7..896a83a5badbb 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -38,10 +38,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite import KubernetesSuite._ - protected var testBackend: IntegrationTestBackend = _ - protected var sparkHomeDir: Path = _ + private var sparkHomeDir: Path = _ + private var pyImage: String = _ + private var rImage: String = _ + protected var image: String = _ - protected var pyImage: String = _ + protected var testBackend: IntegrationTestBackend = _ protected var driverPodName: String = _ protected var kubernetesTestComponents: KubernetesTestComponents = _ protected var sparkAppConf: SparkAppConf = _ @@ -67,6 +69,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite val imageRepo = getTestImageRepo image = s"$imageRepo/spark:$imageTag" pyImage = s"$imageRepo/spark-py:$imageTag" + rImage = s"$imageRepo/spark-r:$imageTag" val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) .toFile @@ -239,6 +242,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") } + protected def doBasicDriverRPodCheck(driverPod: Pod): Unit = { + assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getSpec.getContainers.get(0).getImage === rImage) + assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + } + + protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === image) assert(executorPod.getSpec.getContainers.get(0).getName === "executor") @@ -249,6 +259,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite assert(executorPod.getSpec.getContainers.get(0).getName === "executor") } + protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = { + assert(executorPod.getSpec.getContainers.get(0).getImage === rImage) + assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + } + protected def checkCustomSettings(pod: Pod): Unit = { assert(pod.getMetadata.getLabels.get("label1") === "label1-value") assert(pod.getMetadata.getLabels.get("label2") === "label2-value") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala new file mode 100644 index 0000000000000..885a23cfb4864 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag} + +private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite => + + import RTestsSuite._ + import KubernetesSuite.k8sTestTag + + test("Run SparkR on simple dataframe.R example", k8sTestTag) { + sparkAppConf + .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-r:${getTestImageTag}") + runSparkApplicationAndVerifyCompletion( + appResource = SPARK_R_DATAFRAME_TEST, + mainClass = "", + expectedLogOnCompletion = Seq("name: string (nullable = true)", "1 Justin"), + appArgs = Array.empty[String], + driverPodChecker = doBasicDriverRPodCheck, + executorPodChecker = doBasicExecutorRPodCheck, + appLocator = appLocator, + isJVM = false) + } +} + +private[spark] object RTestsSuite { + val CONTAINER_LOCAL_SPARKR: String = "local:///opt/spark/examples/src/main/r/" + val SPARK_R_DATAFRAME_TEST: String = CONTAINER_LOCAL_SPARKR + "dataframe.R" +}