diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index a5806e98ee22d..d024d427fea97 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -106,6 +106,36 @@ The above mechanism using `kubectl proxy` can be used when we have authenticatio kubernetes-client library does not support. Authentication using X509 Client Certs and oauth tokens is currently supported. +### Determining the Driver Base URI + +Kubernetes pods run with their own IP address space. If Spark is run in cluster mode, the driver pod may not be +accessible to the submitter. However, the submitter needs to send local dependencies from its local disk to the driver +pod. + +By default, Spark will place a [Service](https://kubernetes.io/docs/user-guide/services/#type-nodeport) with a NodePort +that is opened on every node. The submission client will then contact the driver at one of the node's +addresses with the appropriate service port. + +There may be cases where the nodes cannot be reached by the submission client. For example, the cluster may +only be reachable through an external load balancer. The user may provide their own external URI for Spark driver +services. To use a your own external URI instead of a node's IP and node port, first set +`spark.kubernetes.driver.serviceManagerType` to `ExternalAnnotation`. A service will be created with the annotation +`spark-job.alpha.apache.org/provideExternalUri`, and this service routes to the driver pod. You will need to run a +separate process that watches the API server for services that are created with this annotation in the application's +namespace (set by `spark.kubernetes.namespace`). The process should determine a URI that routes to this service +(potentially configuring infrastructure to handle the URI behind the scenes), and patch the service to include an +annotation `spark-job.alpha.apache.org/resolvedExternalUri`, which has its value as the external URI that your process +has provided (e.g. `https://example.com:8080/my-job`). + +Note that the URI provided in the annotation needs to route traffic to the appropriate destination on the pod, which has +a empty path portion of the URI. This means the external URI provider will likely need to rewrite the path from the +external URI to the destination on the pod, e.g. https://example.com:8080/spark-app-1/submit will need to route traffic +to https://:/. Note that the paths of these two URLs are different. + +If the above is confusing, keep in mind that this functionality is only necessary if the submitter cannot reach any of +the nodes at the driver's node port. It is recommended to use the default configuration with the node port service +whenever possible. + ### Spark Properties Below are some other common properties that are specific to Kubernetes. Most of the other configurations are the same @@ -207,7 +237,7 @@ from the other deployment modes. See the [configuration page](configuration.html false Whether to expose the driver Web UI port as a service NodePort. Turned off by default because NodePort is a limited - resource. Use alternatives such as Ingress if possible. + resource. @@ -225,6 +255,21 @@ from the other deployment modes. See the [configuration page](configuration.html Interval between reports of the current Spark job status in cluster mode. + + spark.kubernetes.driver.serviceManagerType + NodePort + + A tag indicating which class to use for creating the Kubernetes service and determining its URI for the submission + client. Valid values are currently NodePort and ExternalAnnotation. By default, a service + is created with the NodePort type, and the driver will be contacted at one of the nodes at the port + that the nodes expose for the service. If the nodes cannot be contacted from the submitter's machine, consider + setting this to ExternalAnnotation as described in "Determining the Driver Base URI" above. One may + also include a custom implementation of org.apache.spark.deploy.rest.kubernetes.DriverServiceManager on + the submitter's classpath - spark-submit service loads an instance of that class. To use the custom + implementation, set this value to the custom implementation's return value of + DriverServiceManager#getServiceManagerType(). This method should only be done as a last resort. + + ## Current Limitations diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager new file mode 100644 index 0000000000000..5a306335b4166 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager @@ -0,0 +1,2 @@ +org.apache.spark.deploy.rest.kubernetes.ExternalSuppliedUrisDriverServiceManager +org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c9831ce23ed0e..af5623093382e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.kubernetes import java.io.File import java.security.SecureRandom +import java.util.ServiceLoader import java.util.concurrent.{CountDownLatch, TimeUnit} import com.google.common.io.Files @@ -55,6 +56,7 @@ private[spark] class Client( private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + private val driverServiceManagerType = sparkConf.get(DRIVER_SERVICE_MANAGER_TYPE) private val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) @@ -90,6 +92,7 @@ private[spark] class Client( throw new SparkException(s"Main app resource file $mainAppResource is not a file or" + s" is a directory.") } + val driverServiceManager = getDriverServiceManager val parsedCustomLabels = parseKeyValuePairs(customLabels, KUBERNETES_DRIVER_LABELS.key, "labels") parsedCustomLabels.keys.foreach { key => @@ -118,32 +121,48 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => - ShutdownHookManager.addShutdownHook(() => - kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) - val sslConfigurationProvider = new SslConfigurationProvider( - sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) - val submitServerSecret = kubernetesClient.secrets().createNew() - .withNewMetadata() - .withName(secretName) - .endMetadata() - .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) - .withType("Opaque") - .done() - kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) - try { - val sslConfiguration = sslConfigurationProvider.getSslConfiguration() - // start outer watch for status logging of driver pod - val driverPodCompletedLatch = new CountDownLatch(1) - // only enable interval logging if in waitForAppCompletion mode - val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 - val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, - loggingInterval) - Utils.tryWithResource(kubernetesClient - .pods() - .withName(kubernetesAppId) - .watch(loggingWatch)) { _ => + driverServiceManager.start(kubernetesClient, kubernetesAppId, sparkConf) + // start outer watch for status logging of driver pod + // only enable interval logging if in waitForAppCompletion mode + val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 + val driverPodCompletedLatch = new CountDownLatch(1) + val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, + loggingInterval) + Utils.tryWithResource(kubernetesClient + .pods() + .withName(kubernetesAppId) + .watch(loggingWatch)) { _ => + val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() => + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) + val cleanupServiceManagerHook = ShutdownHookManager.addShutdownHook( + ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY)( + () => driverServiceManager.stop()) + // Place the error hook at a higher priority in order for the error hook to run before + // the stop hook. + val serviceManagerErrorHook = ShutdownHookManager.addShutdownHook( + ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY + 1)(() => + driverServiceManager.handleSubmissionError( + new SparkException("Submission shutting down early..."))) + try { + val sslConfigurationProvider = new SslConfigurationProvider( + sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) + val submitServerSecret = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(secretName) + .endMetadata() + .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) + .withType("Opaque") + .done() + kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) + val sslConfiguration = sslConfigurationProvider.getSslConfiguration() + val driverKubernetesSelectors = (Map( + SPARK_DRIVER_LABEL -> kubernetesAppId, + SPARK_APP_ID_LABEL -> kubernetesAppId, + SPARK_APP_NAME_LABEL -> appName) + ++ parsedCustomLabels) val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, + driverServiceManager, parsedCustomLabels, parsedCustomAnnotations, submitServerSecret, @@ -156,6 +175,7 @@ private[spark] class Client( driverService) submitApplicationToDriverServer( kubernetesClient, + driverServiceManager, sslConfiguration, driverService, submitterLocalFiles, @@ -165,23 +185,43 @@ private[spark] class Client( // those. kubernetesResourceCleaner.unregisterResource(driverPod) kubernetesResourceCleaner.unregisterResource(driverService) - // wait if configured to do so - if (waitForAppCompletion) { - logInfo(s"Waiting for application $kubernetesAppId to finish...") - driverPodCompletedLatch.await() - logInfo(s"Application $kubernetesAppId finished.") - } else { - logInfo(s"Application $kubernetesAppId successfully launched.") + } catch { + case e: Throwable => + driverServiceManager.handleSubmissionError(e) + } finally { + Utils.tryLogNonFatalError { + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) + } + Utils.tryLogNonFatalError { + driverServiceManager.stop() + } + + // Remove the shutdown hooks that would be redundant + Utils.tryLogNonFatalError { + ShutdownHookManager.removeShutdownHook(resourceCleanShutdownHook) + } + Utils.tryLogNonFatalError { + ShutdownHookManager.removeShutdownHook(cleanupServiceManagerHook) + } + Utils.tryLogNonFatalError { + ShutdownHookManager.removeShutdownHook(serviceManagerErrorHook) } } - } finally { - kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) + // wait if configured to do so + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + driverPodCompletedLatch.await() + logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Application $kubernetesAppId successfully launched.") + } } } } private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, + driverServiceManager: DriverServiceManager, sslConfiguration: SslConfiguration, driverService: Service, submitterLocalFiles: Iterable[String], @@ -197,7 +237,10 @@ private[spark] class Client( sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) - val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, driverService, + val driverSubmitter = buildDriverSubmissionClient( + kubernetesClient, + driverServiceManager, + driverService, sslConfiguration) // Sanity check to see if the driver submitter is even reachable. driverSubmitter.ping() @@ -227,6 +270,7 @@ private[spark] class Client( private def launchDriverKubernetesComponents( kubernetesClient: KubernetesClient, + driverServiceManager: DriverServiceManager, customLabels: Map[String, String], customAnnotations: Map[String, String], submitServerSecret: Secret, @@ -254,10 +298,9 @@ private[spark] class Client( .endpoints() .withName(kubernetesAppId) .watch(endpointsReadyWatcher)) { _ => - val driverService = createDriverService( - kubernetesClient, - driverKubernetesSelectors, - submitServerSecret) + val serviceTemplate = createDriverServiceTemplate(driverKubernetesSelectors) + val driverService = kubernetesClient.services().create( + driverServiceManager.customizeDriverService(serviceTemplate).build()) kubernetesResourceCleaner.registerOrUpdateResource(driverService) val driverPod = createDriverPod( kubernetesClient, @@ -265,7 +308,6 @@ private[spark] class Client( customAnnotations, submitServerSecret, sslConfiguration) - kubernetesResourceCleaner.registerOrUpdateResource(driverPod) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) (driverPod, driverService) @@ -354,28 +396,6 @@ private[spark] class Client( } } - private def createDriverService( - kubernetesClient: KubernetesClient, - driverKubernetesSelectors: Map[String, String], - submitServerSecret: Secret): Service = { - val driverSubmissionServicePort = new ServicePortBuilder() - .withName(SUBMISSION_SERVER_PORT_NAME) - .withPort(SUBMISSION_SERVER_PORT) - .withNewTargetPort(SUBMISSION_SERVER_PORT) - .build() - kubernetesClient.services().createNew() - .withNewMetadata() - .withName(kubernetesAppId) - .withLabels(driverKubernetesSelectors.asJava) - .endMetadata() - .withNewSpec() - .withType("NodePort") - .withSelector(driverKubernetesSelectors.asJava) - .withPorts(driverSubmissionServicePort) - .endSpec() - .done() - } - private def createDriverPod( kubernetesClient: KubernetesClient, driverKubernetesSelectors: Map[String, String], @@ -388,7 +408,7 @@ private[spark] class Client( .withPath("/v1/submissions/ping") .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() - kubernetesClient.pods().createNew() + val driverPod = kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) .withLabels(driverKubernetesSelectors.asJava) @@ -428,6 +448,26 @@ private[spark] class Client( .endContainer() .endSpec() .done() + kubernetesResourceCleaner.registerOrUpdateResource(driverPod) + driverPod + } + + private def createDriverServiceTemplate(driverKubernetesSelectors: Map[String, String]) + : ServiceBuilder = { + val driverSubmissionServicePort = new ServicePortBuilder() + .withName(SUBMISSION_SERVER_PORT_NAME) + .withPort(SUBMISSION_SERVER_PORT) + .withNewTargetPort(SUBMISSION_SERVER_PORT) + .build() + new ServiceBuilder() + .withNewMetadata() + .withName(kubernetesAppId) + .withLabels(driverKubernetesSelectors.asJava) + .endMetadata() + .withNewSpec() + .withSelector(driverKubernetesSelectors.asJava) + .withPorts(driverSubmissionServicePort) + .endSpec() } private class DriverPodReadyWatcher(resolvedDriverPod: SettableFuture[Pod]) extends Watcher[Pod] { @@ -578,36 +618,14 @@ private[spark] class Client( private def buildDriverSubmissionClient( kubernetesClient: KubernetesClient, + driverServiceManager: DriverServiceManager, service: Service, sslConfiguration: SslConfiguration): KubernetesSparkRestApi = { - val urlScheme = if (sslConfiguration.sslOptions.enabled) { - "https" - } else { - logWarning("Submitting application details, application secret, and local" + - " jars to the cluster over an insecure connection. You should configure SSL" + - " to secure this step.") - "http" - } - val servicePort = service.getSpec.getPorts.asScala - .filter(_.getName == SUBMISSION_SERVER_PORT_NAME) - .head.getNodePort - val nodeUrls = kubernetesClient.nodes.list.getItems.asScala - .filterNot(node => node.getSpec.getUnschedulable != null && - node.getSpec.getUnschedulable) - .flatMap(_.getStatus.getAddresses.asScala) - // The list contains hostnames, internal and external IP addresses. - // (https://kubernetes.io/docs/admin/node/#addresses) - // we want only external IP addresses and legacyHostIP addresses in our list - // legacyHostIPs are deprecated and will be removed in the future. - // (https://github.com/kubernetes/kubernetes/issues/9267) - .filter(address => address.getType == "ExternalIP" || address.getType == "LegacyHostIP") - .map(address => { - s"$urlScheme://${address.getAddress}:$servicePort" - }).toSet - require(nodeUrls.nonEmpty, "No nodes found to contact the driver!") + val serviceUris = driverServiceManager.getDriverServiceSubmissionServerUris(service) + require(serviceUris.nonEmpty, "No uris found to contact the driver!") HttpClientUtil.createClient[KubernetesSparkRestApi]( - uris = nodeUrls, - maxRetriesPerServer = 3, + uris = serviceUris, + maxRetriesPerServer = 10, sslSocketFactory = sslConfiguration .driverSubmitClientSslContext .getSocketFactory, @@ -634,6 +652,21 @@ private[spark] class Client( }).toMap }).getOrElse(Map.empty[String, String]) } + + private def getDriverServiceManager: DriverServiceManager = { + val driverServiceManagerLoader = ServiceLoader.load(classOf[DriverServiceManager]) + val matchingServiceManagers = driverServiceManagerLoader + .iterator() + .asScala + .filter(_.getServiceManagerType == driverServiceManagerType) + .toList + require(matchingServiceManagers.nonEmpty, + s"No driver service manager found matching type $driverServiceManagerType") + require(matchingServiceManagers.size == 1, "Multiple service managers found" + + s" matching type $driverServiceManagerType, got: " + + matchingServiceManagers.map(_.getClass).toList.mkString(",")) + matchingServiceManagers.head + } } private[spark] object Client extends Logging { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala index fb76b04604479..6360bc0e48948 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala @@ -23,8 +23,7 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class KubernetesResourceCleaner - extends Logging { +private[spark] class KubernetesResourceCleaner extends Logging { private val resources = mutable.HashMap.empty[(String, String), HasMetadata] diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index bc2f9d578555d..213b5367263f8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.kubernetes import java.util.concurrent.TimeUnit import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager import org.apache.spark.internal.config.ConfigBuilder package object config { @@ -156,6 +157,16 @@ package object config { .stringConf .createOptional + private[spark] val DRIVER_SUBMIT_SSL_ENABLED = + ConfigBuilder("spark.ssl.kubernetes.submit.enabled") + .doc(""" + | Whether or not to use SSL when sending the + | application dependencies to the driver pod. + | + """.stripMargin) + .booleanConf + .createWithDefault(false) + private[spark] val KUBERNETES_DRIVER_SERVICE_NAME = ConfigBuilder("spark.kubernetes.driver.service.name") .doc(""" @@ -184,6 +195,16 @@ package object config { .stringConf .createOptional + private[spark] val DRIVER_SERVICE_MANAGER_TYPE = + ConfigBuilder("spark.kubernetes.driver.serviceManagerType") + .doc(s""" + | A tag indicating which class to use for creating the + | Kubernetes service and determining its URI for the submission + | client. + """.stripMargin) + .stringConf + .createWithDefault(NodePortUrisDriverServiceManager.TYPE) + private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.kubernetes.submit.waitAppCompletion") .doc( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 688cd858e79ff..10ddb12463894 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -64,6 +64,12 @@ package object constants { private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" + // Annotation keys + private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI = + "spark-job.alpha.apache.org/provideExternalUri" + private[spark] val ANNOTATION_RESOLVED_EXTERNAL_URI = + "spark-job.alpha.apache.org/resolvedExternalUri" + // Miscellaneous private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala new file mode 100644 index 0000000000000..d92c0247e2a35 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest.kubernetes + +import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf + +/** + * Implementations of this interface are responsible for exposing the driver pod by: + * - Creating a Kubernetes Service that is backed by the driver pod, and + * - Providing one or more URIs that the service can be reached at from the submission client. + * + * In general, one should not need to implement custom variants of this interface. Consider + * if the built-in service managers, NodePort and ExternalAnnotation, suit your needs first. + * + * This API is in an alpha state and may break without notice. + */ +trait DriverServiceManager { + + protected var kubernetesClient: KubernetesClient = _ + protected var serviceName: String = _ + protected var sparkConf: SparkConf = _ + + /** + * The tag that identifies this service manager type. This service manager will be loaded + * only if the Spark configuration spark.kubernetes.driver.serviceManagerType matches this + * value. + */ + def getServiceManagerType: String + + final def start( + kubernetesClient: KubernetesClient, + serviceName: String, + sparkConf: SparkConf): Unit = { + this.kubernetesClient = kubernetesClient + this.serviceName = serviceName + this.sparkConf = sparkConf + onStart(kubernetesClient, serviceName, sparkConf) + } + + /** + * Guaranteed to be called before {@link createDriverService} or + * {@link getDriverServiceSubmissionServerUris} is called. + */ + protected def onStart( + kubernetesClient: KubernetesClient, + serviceName: String, + sparkConf: SparkConf): Unit = {} + + /** + * Customize the driver service that overlays on the driver pod. + * + * Implementations are expected to take the service template and adjust it + * according to the particular needs of how the Service will be accessed by + * URIs provided in {@link getDriverServiceSubmissionServerUris}. + * + * @param driverServiceTemplate Base settings for the driver service. + * @return The same ServiceBuilder object with any required customizations. + */ + def customizeDriverService(driverServiceTemplate: ServiceBuilder): ServiceBuilder + + /** + * Return the set of URIs that can be used to reach the submission server that + * is running on the driver pod. + */ + def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] + + /** + * Called when the Spark application failed to start. Allows the service + * manager to clean up any state it may have created that should not be persisted + * in the case of an unsuccessful launch. Note that stop() is still called + * regardless if this method is called. + */ + def handleSubmissionError(cause: Throwable): Unit = {} + + final def stop(): Unit = onStop() + + /** + * Perform any cleanup of this service manager. + * the super implementation. + */ + protected def onStop(): Unit = {} +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala new file mode 100644 index 0000000000000..257571b5a9d3e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.SettableFuture +import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Creates the service with an annotation that is expected to be detected by another process + * which the user provides and is not built in this project. When the external process detects + * the creation of the service with the appropriate annotation, it is expected to populate the + * value of a second annotation that is the URI of the driver submission server. + */ +private[spark] class ExternalSuppliedUrisDriverServiceManager + extends DriverServiceManager with Logging { + + private val externalUriFuture = SettableFuture.create[String] + private var externalUriSetWatch: Option[Watch] = None + + override def onStart( + kubernetesClient: KubernetesClient, + serviceName: String, + sparkConf: SparkConf): Unit = { + externalUriSetWatch = Some(kubernetesClient + .services() + .withName(serviceName) + .watch(new ExternalUriSetWatcher(externalUriFuture))) + } + + override def getServiceManagerType: String = ExternalSuppliedUrisDriverServiceManager.TYPE + + override def customizeDriverService(driverServiceTemplate: ServiceBuilder): ServiceBuilder = { + require(serviceName != null, "Service name was null; was start() called?") + driverServiceTemplate + .editMetadata() + .addToAnnotations(ANNOTATION_PROVIDE_EXTERNAL_URI, "true") + .endMetadata() + .editSpec() + .withType("ClusterIP") + .endSpec() + } + + override def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] = { + val timeoutSeconds = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + require(externalUriSetWatch.isDefined, "The watch that listens for the provision of" + + " the external URI was not started; was start() called?") + Set(externalUriFuture.get(timeoutSeconds, TimeUnit.SECONDS)) + } + + override def onStop(): Unit = { + Utils.tryLogNonFatalError { + externalUriSetWatch.foreach(_.close()) + externalUriSetWatch = None + } + } +} + +private[spark] object ExternalSuppliedUrisDriverServiceManager { + val TYPE = "ExternalAnnotation" +} + +private[spark] class ExternalUriSetWatcher(externalUriFuture: SettableFuture[String]) + extends Watcher[Service] with Logging { + + override def eventReceived(action: Action, service: Service): Unit = { + if (action == Action.MODIFIED && !externalUriFuture.isDone) { + service + .getMetadata + .getAnnotations + .asScala + .get(ANNOTATION_RESOLVED_EXTERNAL_URI) + .foreach(externalUriFuture.set) + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logDebug("External URI set watcher closed.", cause) + } +} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala new file mode 100644 index 0000000000000..fa8362677f38f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging + +/** + * Creates the service with an open NodePort. The URI to reach the submission server is thus + * at the address of any of the nodes through the service's node port. + */ +private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManager with Logging { + + override def getServiceManagerType: String = NodePortUrisDriverServiceManager.TYPE + + override def customizeDriverService(driverServiceTemplate: ServiceBuilder): ServiceBuilder = { + driverServiceTemplate.editSpec().withType("NodePort").endSpec() + } + + override def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] = { + val urlScheme = if (sparkConf.get(DRIVER_SUBMIT_SSL_ENABLED)) { + "https" + } else { + logWarning("Submitting application details, application secret, and local" + + " jars to the cluster over an insecure connection. You should configure SSL" + + " to secure this step.") + "http" + } + val servicePort = driverService.getSpec.getPorts.asScala + .filter(_.getName == SUBMISSION_SERVER_PORT_NAME) + .head.getNodePort + val nodeUrls = kubernetesClient.nodes.list.getItems.asScala + .filterNot(node => node.getSpec.getUnschedulable != null && + node.getSpec.getUnschedulable) + .flatMap(_.getStatus.getAddresses.asScala) + // The list contains hostnames, internal and external IP addresses. + // (https://kubernetes.io/docs/admin/node/#addresses) + // we want only external IP addresses and legacyHostIP addresses in our list + // legacyHostIPs are deprecated and will be removed in the future. + // (https://github.com/kubernetes/kubernetes/issues/9267) + .filter(address => address.getType == "ExternalIP" || address.getType == "LegacyHostIP") + .map(address => { + s"$urlScheme://${address.getAddress}:$servicePort" + }).toSet + require(nodeUrls.nonEmpty, "No nodes found to contact the driver!") + nodeUrls + } +} + +private[spark] object NodePortUrisDriverServiceManager { + val TYPE = "NodePort" +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala new file mode 100644 index 0000000000000..3199a8c385f95 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.util.concurrent.atomic.AtomicBoolean + +import io.fabric8.kubernetes.api.model.Service +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.internal.Logging + +/** + * A slightly unrealistic implementation of external URI provision, but works + * for tests - essentially forces the service to revert back to being exposed + * on NodePort. + */ +private[spark] class ExternalUriProviderWatch(kubernetesClient: KubernetesClient) + extends Watcher[Service] with Logging { + + // Visible for testing + val annotationSet = new AtomicBoolean(false) + + override def eventReceived(action: Action, service: Service): Unit = { + if (action == Action.ADDED) { + service.getMetadata + .getAnnotations + .asScala + .get(ANNOTATION_PROVIDE_EXTERNAL_URI).foreach { _ => + if (!annotationSet.getAndSet(true)) { + val nodePortService = kubernetesClient.services().withName(service.getMetadata.getName) + .edit() + .editSpec() + .withType("NodePort") + .endSpec() + .done() + val submissionServerPort = nodePortService + .getSpec() + .getPorts + .asScala + .find(_.getName == SUBMISSION_SERVER_PORT_NAME) + .map(_.getNodePort) + .getOrElse(throw new IllegalStateException("Submission server port not found.")) + val resolvedNodePortUri = s"http://${Minikube.getMinikubeIp}:$submissionServerPort" + kubernetesClient.services().withName(service.getMetadata.getName).edit() + .editMetadata() + .addToAnnotations(ANNOTATION_RESOLVED_EXTERNAL_URI, resolvedNodePortUri) + .endMetadata() + .done() + } + } + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logWarning("External URI provider watch closed.", cause) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 11c85caa6fc94..6aa1c1fee0d47 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -35,10 +35,13 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkSubmit import org.apache.spark.deploy.kubernetes.Client +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils +import org.apache.spark.deploy.rest.kubernetes.ExternalSuppliedUrisDriverServiceManager import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @@ -108,6 +111,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .withGracePeriod(60) .delete }) + // spark-submit sets system properties so we have to clear them + new SparkConf(true).getAll.map(_._1).foreach { System.clearProperty } } override def afterAll(): Unit = { @@ -375,4 +380,49 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { s" with correct contents."), "Job did not find the file as expected.") } } + + test("Use external URI provider") { + val externalUriProviderWatch = new ExternalUriProviderWatch(minikubeKubernetesClient) + Utils.tryWithResource(minikubeKubernetesClient.services() + .withLabel("spark-app-name", "spark-pi") + .watch(externalUriProviderWatch)) { _ => + val args = Array( + "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", + "--deploy-mode", "cluster", + "--kubernetes-namespace", NAMESPACE, + "--name", "spark-pi", + "--executor-memory", "512m", + "--executor-cores", "1", + "--num-executors", "1", + "--jars", HELPER_JAR_FILE.getAbsolutePath, + "--class", SPARK_PI_MAIN_CLASS, + "--conf", "spark.ui.enabled=true", + "--conf", "spark.testing=false", + "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", + "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", + "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", + "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", + "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + "--conf", "spark.kubernetes.submit.waitAppCompletion=false", + "--conf", s"${DRIVER_SERVICE_MANAGER_TYPE.key}=${ExternalSuppliedUrisDriverServiceManager.TYPE}", + EXAMPLES_JAR_FILE.getAbsolutePath) + SparkSubmit.main(args) + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + assert(externalUriProviderWatch.annotationSet.get) + val driverService = minikubeKubernetesClient + .services() + .withLabel("spark-app-name", "spark-pi") + .list() + .getItems + .asScala(0) + assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_PROVIDE_EXTERNAL_URI), + "External URI request annotation was not set on the driver service.") + // Unfortunately we can't check the correctness of the actual value of the URI, as it depends + // on the driver submission port set on the driver service but we remove that port from the + // service once the submission is complete. + assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_RESOLVED_EXTERNAL_URI), + "Resolved URI annotation not set on driver service.") + } + } }