Skip to content

Commit

Permalink
Generate the application ID label irrespective of app name. (apache#331)
Browse files Browse the repository at this point in the history
* Generate the application ID label irrespective of app name.

* Add an integration test.

* Fix scalastyle
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 069bd04 commit 4a01baf
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[spark] class KubernetesShuffleBlockHandler (
try {
Some(kubernetesClient
.pods()
.withLabels(Map(SPARK_ROLE_LABEL -> "driver").asJava)
.withLabels(Map(SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE).asJava)
.watch(new Watcher[Pod] {
override def eventReceived(action: Watcher.Action, p: Pod): Unit = {
action match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createWithDefault("spark")

private[spark] val KUBERNETES_SHUFFLE_NAMESPACE =
ConfigBuilder("spark.kubernetes.shuffle.namespace")
.doc("Namespace of the shuffle service")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.spark.deploy.kubernetes
package object constants {
// Labels
private[spark] val SPARK_DRIVER_LABEL = "spark-driver"
private[spark] val SPARK_APP_ID_LABEL = "spark-app-id"
private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name"
private[spark] val SPARK_APP_ID_LABEL = "spark-app-selector"
private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
private[spark] val SPARK_ROLE_LABEL = "spark-role"
private[spark] val SPARK_POD_DRIVER_ROLE = "driver"
private[spark] val SPARK_POD_EXECUTOR_ROLE = "executor"
private[spark] val SPARK_APP_NAME_ANNOTATION = "spark-app-name"

// Credentials secrets
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package org.apache.spark.deploy.kubernetes.submit

import java.io.File
import java.util.Collections
import java.util.{Collections, UUID}

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkKubernetesClientFactory}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
Expand All @@ -43,22 +43,21 @@ import org.apache.spark.util.Utils
* where different steps of submission should be factored out into separate classes.
*/
private[spark] class Client(
appName: String,
kubernetesAppId: String,
mainClass: String,
sparkConf: SparkConf,
appArgs: Array[String],
sparkJars: Seq[String],
sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClient: KubernetesClient,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider,
loggingPodStatusWatcher: LoggingPodStatusWatcher)
extends Logging {

appName: String,
kubernetesResourceNamePrefix: String,
kubernetesAppId: String,
mainClass: String,
sparkConf: SparkConf,
appArgs: Array[String],
sparkJars: Seq[String],
sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClient: KubernetesClient,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider,
loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(kubernetesAppId)
.getOrElse(s"$kubernetesResourceNamePrefix-driver")
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)

Expand Down Expand Up @@ -86,15 +85,16 @@ private[spark] class Client(
val parsedCustomLabels = ConfigurationUtils.parseKeyValuePairs(
customLabels, KUBERNETES_DRIVER_LABELS.key, "labels")
require(!parsedCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " +
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
require(!parsedCustomLabels.contains(SPARK_APP_NAME_LABEL), s"Label with key" +
s" $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations.")
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" +
s" operations.")
val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs(
customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations")
require(!parsedCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key" +
s" $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for Spark bookkeeping" +
s" operations.")
val allLabels = parsedCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName,
SPARK_ROLE_LABEL -> "driver")
val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs(
customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations")
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)

val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
new EnvVarBuilder()
Expand Down Expand Up @@ -140,6 +140,7 @@ private[spark] class Client(
.withName(kubernetesDriverPodName)
.addToLabels(allLabels.asJava)
.addToAnnotations(parsedCustomAnnotations.asJava)
.addToAnnotations(SPARK_APP_NAME_ANNOTATION, appName)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
Expand Down Expand Up @@ -186,6 +187,7 @@ private[spark] class Client(
}
resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
resolvedSparkConf.set("spark.app.id", kubernetesAppId)
resolvedSparkConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix)
// We don't need this anymore since we just set the JVM options on the environment
resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
val resolvedLocalClasspath = containerLocalizedFilesResolver
Expand Down Expand Up @@ -234,11 +236,11 @@ private[spark] class Client(
throw e
}
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
logInfo(s"Waiting for application $appName to finish...")
loggingPodStatusWatcher.awaitCompletion()
logInfo(s"Application $kubernetesAppId finished.")
logInfo(s"Application $appName finished.")
} else {
logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.")
logInfo(s"Deployed Spark application $appName into Kubernetes.")
}
}
}
Expand Down Expand Up @@ -279,15 +281,21 @@ private[spark] object Client {
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val appName = sparkConf.getOption("spark.app.name")
.getOrElse("spark")
val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The resource name prefix is derived from the application name, making it easy to connect the
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
// application the user submitted. However, we can't use the application name in the label, as
// label values are considerably restrictive, e.g. must be no longer than 63 characters in
// length. So we generate a separate identifier for the app ID itself, and bookkeeping that
// requires finding "all pods for this application" should use the kubernetesAppId.
val kubernetesResourceNamePrefix = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
val master = resolveK8sMaster(sparkConf.get("spark.master"))
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
sparkConf,
kubernetesAppId,
kubernetesResourceNamePrefix,
namespace,
sparkJars,
sparkFiles,
Expand All @@ -300,14 +308,16 @@ private[spark] object Client {
None,
None)) { kubernetesClient =>
val kubernetesCredentialsMounterProvider =
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
new DriverPodKubernetesCredentialsMounterProviderImpl(
sparkConf, kubernetesResourceNamePrefix)
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL))
.filter( _ => waitForAppCompletion)
val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(
kubernetesAppId, loggingInterval)
kubernetesResourceNamePrefix, loggingInterval)
new Client(
appName,
kubernetesResourceNamePrefix,
kubernetesAppId,
mainClass,
sparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.deploy.kubernetes.submit

import java.io.File

import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
Expand Down Expand Up @@ -46,12 +44,12 @@ private[spark] trait DriverInitContainerComponentsProvider {
}

private[spark] class DriverInitContainerComponentsProviderImpl(
sparkConf: SparkConf,
kubernetesAppId: String,
namespace: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
resourceStagingServerExternalSslOptions: SSLOptions)
sparkConf: SparkConf,
kubernetesResourceNamePrefix: String,
namespace: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
resourceStagingServerExternalSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
Expand Down Expand Up @@ -99,10 +97,10 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>
s"$kubernetesAppId-init-secret"
s"$kubernetesResourceNamePrefix-init-secret"
}
private val configMapName = s"$kubernetesAppId-init-config"
private val configMapKey = s"$kubernetesAppId-init-config-key"
private val configMapName = s"$kubernetesResourceNamePrefix-init-config"
private val configMapKey = s"$kubernetesResourceNamePrefix-init-config-key"
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
Expand All @@ -116,29 +114,29 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
filesResourceId <- maybeSubmittedResourceIds.map(_.filesResourceId)
} yield {
new SubmittedDependencyInitContainerConfigPluginImpl(
// Configure the init-container with the internal URI over the external URI.
maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri),
jarsResourceId,
filesResourceId,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
// Configure the init-container with the internal URI over the external URI.
maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri),
jarsResourceId,
filesResourceId,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
}
new SparkInitContainerConfigMapBuilderImpl(
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
configMapName,
configMapKey,
submittedDependencyConfigPlugin)
sparkJars,
sparkFiles,
jarsDownloadPath,
filesDownloadPath,
configMapName,
configMapKey,
submittedDependencyConfigPlugin)
}

override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
Expand All @@ -158,14 +156,13 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader] = {
maybeResourceStagingServerUri.map { stagingServerUri =>
new SubmittedDependencyUploaderImpl(
kubernetesAppId,
driverPodLabels,
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
driverPodLabels,
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
}
}

Expand All @@ -178,15 +175,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
filesResourceSecret <- maybeSubmittedResourceSecrets.map(_.filesResourceSecret)
} yield {
new SubmittedDependencySecretBuilderImpl(
secretName,
jarsResourceSecret,
filesResourceSecret,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert)
secretName,
jarsResourceSecret,
filesResourceSecret,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert)
}
}

Expand All @@ -196,13 +193,13 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
secret, INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
}
new SparkPodInitContainerBootstrapImpl(
initContainerImage,
dockerImagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
downloadTimeoutMinutes,
configMapName,
configMapKey,
resourceStagingServerSecretPlugin)
initContainerImage,
dockerImagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
downloadTimeoutMinutes,
configMapName,
configMapKey,
resourceStagingServerSecretPlugin)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ private[spark] trait SubmittedDependencyUploader {
* Resource Staging Service.
*/
private[spark] class SubmittedDependencyUploaderImpl(
kubernetesAppId: String,
podLabels: Map[String, String],
podNamespace: String,
stagingServerUri: String,
Expand Down
Loading

0 comments on commit 4a01baf

Please sign in to comment.