From 2c28a06e3a8ecd3dc24ff56ef1ce9c04c2b5445d Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 1 Mar 2019 14:31:46 +0000 Subject: [PATCH 1/9] configurable k8s client timeouts --- docs/running-on-kubernetes.md | 46 +++++++++++++++++++ .../org/apache/spark/deploy/k8s/Config.scala | 9 ++++ .../k8s/SparkKubernetesClientFactory.scala | 7 +++ .../submit/KubernetesClientApplication.scala | 1 + .../k8s/KubernetesClusterManager.scala | 4 ++ 5 files changed, 67 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e23f28cf755c3..38b12468c1cf3 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -990,6 +990,52 @@ See the [configuration page](configuration.html) for information on Spark config Specify whether executor pods should be deleted in case of failure or normal termination. + + spark.kubernetes.kubernetesClient.submission.connectionTimeout + 10000 + + Connection timeout in milliseconds for the kubernetes client to use for starting the driver. In client mode, use + spark.kubernetes.kubernetesClient.connectionTimeout instead. + + + + spark.kubernetes.kubernetesClient.submission.requestTimeout + 10000 + + Request timeout in milliseconds for the kubernetes client to use for starting the driver. In client mode, use + spark.kubernetes.kubernetesClient.submission.requestTimeout instead. + + + + spark.kubernetes.kubernetesClient.driver.connectionTimeout + 10000 + + Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors. + In client mode, use spark.kubernetes.kubernetesClient.connectionTimeout instead. + + + + spark.kubernetes.kubernetesClient.driver.requestTimeout + 10000 + + Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors. + In client mode, use spark.kubernetes.kubernetesClient.connectionTimeout instead. + + + + spark.kubernetes.kubernetesClient.connectionTimeout + 10000 + + In client mode, connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors. + + + + spark.kubernetes.kubernetesClient.requestTimeout + 10000 + + In client mode, request timeout in milliseconds for the kubernetes client in driver to use when requesting executors. + + #### Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4cca1e22bd108..0399fe77a895b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -75,6 +75,12 @@ private[spark] object Config extends Logging { .toSequence .createWithDefault(Nil) + val SUBMISSION_CLIENT_PREFIX = + "spark.kubernetes.kubernetesClient.submission" + val DRIVER_CLIENT_PREFIX = + "spark.kubernetes.kubernetesClient.driver" + val CLIENT_MODE_CLIENT_PREFIX = + "spark.kubernetes.kubernetesClient" val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = @@ -85,6 +91,9 @@ private[spark] object Config extends Logging { val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + val REQUEST_TIMEOUT_SUFFIX = "requestTimeout" + val CONNECTION_TIMEOUT_SUFFIX = "connectionTimeout" + val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 06dea42cc135c..971d73b0a2c5b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -41,6 +41,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging { master: String, namespace: Option[String], kubernetesAuthConfPrefix: String, + kubernetesClientPrefix: String, sparkConf: SparkConf, defaultServiceAccountToken: Option[File], defaultServiceAccountCaCert: Option[File]): KubernetesClient = { @@ -63,6 +64,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") val clientCertFile = sparkConf .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") + val requestTimeout = + sparkConf.getInt(s"$kubernetesClientPrefix.$REQUEST_TIMEOUT_SUFFIX", 10000) + val connectionTimeout = + sparkConf.getInt(s"$kubernetesClientPrefix.$CONNECTION_TIMEOUT_SUFFIX", 10000) val dispatcher = new Dispatcher( ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) @@ -79,6 +84,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .withApiVersion("v1") .withMasterUrl(master) .withWebsocketPingInterval(0) + .withRequestTimeout(requestTimeout) + .withConnectionTimeout(connectionTimeout) .withOption(oauthTokenValue) { (token, configBuilder) => configBuilder.withOauthToken(token) }.withOption(oauthTokenFile) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 042012e9c74a4..488cd6b244619 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -220,6 +220,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { master, Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, + SUBMISSION_CLIENT_PREFIX, sparkConf, None, None)) { kubernetesClient => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 809bdf8ca8c27..20feeaf08d688 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -44,6 +44,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit scheduler: TaskScheduler): SchedulerBackend = { val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) val (authConfPrefix, + clientConfPrefix, apiServerUri, defaultServiceAccountToken, defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { @@ -51,11 +52,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit "If the application is deployed using spark-submit in cluster mode, the driver pod name " + "must be provided.") (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + DRIVER_CLIENT_PREFIX, KUBERNETES_MASTER_INTERNAL_URL, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) } else { (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, + CLIENT_MODE_CLIENT_PREFIX, KubernetesUtils.parseMasterUrl(masterURL), None, None) @@ -65,6 +68,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit apiServerUri, Some(sc.conf.get(KUBERNETES_NAMESPACE)), authConfPrefix, + clientConfPrefix, sc.conf, defaultServiceAccountToken, defaultServiceAccountCaCrt) From 0676f90f12b4328277523f792c74f9fff194d7b0 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 4 Mar 2019 11:48:01 +0000 Subject: [PATCH 2/9] move default value declaration to k8s/Config --- .../org/apache/spark/deploy/k8s/Config.scala | 35 +++++++++++++++++++ .../k8s/SparkKubernetesClientFactory.scala | 28 ++++++++++++--- .../submit/KubernetesClientApplication.scala | 2 +- .../k8s/KubernetesClusterManager.scala | 8 ++--- 4 files changed, 63 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 0399fe77a895b..1d9ece8ee26af 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -94,6 +94,41 @@ private[spark] object Config extends Logging { val REQUEST_TIMEOUT_SUFFIX = "requestTimeout" val CONNECTION_TIMEOUT_SUFFIX = "connectionTimeout" + val SUBMISSION_CLIENT_REQUEST_TIMEOUT = + ConfigBuilder(s"$SUBMISSION_CLIENT_PREFIX.$REQUEST_TIMEOUT_SUFFIX") + .doc("request timeout to be used in milliseconds for starting the driver") + .intConf + .createWithDefault(10000) + + val SUBMISSION_CLIENT_CONNECTION_TIMEOUT = + ConfigBuilder(s"$SUBMISSION_CLIENT_PREFIX.$CONNECTION_TIMEOUT_SUFFIX") + .doc("connection timeout to be used in milliseconds for starting the driver") + .intConf + .createWithDefault(10000) + + val DRIVER_CLIENT_REQUEST_TIMEOUT = + ConfigBuilder(s"$DRIVER_CLIENT_PREFIX.$REQUEST_TIMEOUT_SUFFIX") + .doc("request timeout to be used in milliseconds for requesting executors") + .intConf + .createWithDefault(10000) + + val DRIVER_CLIENT_CONNECTION_TIMEOUT = + ConfigBuilder(s"$DRIVER_CLIENT_PREFIX.$CONNECTION_TIMEOUT_SUFFIX") + .doc("connection timeout to be used in milliseconds for requesting executors") + .intConf + .createWithDefault(10000) + + val CLIENT_MODE_CLIENT_REQUEST_TIMEOUT = + ConfigBuilder(s"$CLIENT_MODE_CLIENT_PREFIX.$REQUEST_TIMEOUT_SUFFIX") + .doc("request timeout to be used in milliseconds for requesting executors in client mode") + .intConf + .createWithDefault(10000) + + val CLIENT_MODE_CLIENT_CONNECTION_TIMEOUT = + ConfigBuilder(s"$CLIENT_MODE_CLIENT_PREFIX.$CONNECTION_TIMEOUT_SUFFIX") + .doc("connection timeout to be used in milliseconds for requesting executors in client mode") + .intConf + .createWithDefault(10000) val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 971d73b0a2c5b..b1105dd482c6a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -18,6 +18,8 @@ package org.apache.spark.deploy.k8s import java.io.File +import scala.language.implicitConversions + import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} @@ -28,6 +30,7 @@ import okhttp3.Dispatcher import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.util.ThreadUtils /** @@ -41,7 +44,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging { master: String, namespace: Option[String], kubernetesAuthConfPrefix: String, - kubernetesClientPrefix: String, + clientType: ClientType.Value, sparkConf: SparkConf, defaultServiceAccountToken: Option[File], defaultServiceAccountCaCert: Option[File]): KubernetesClient = { @@ -64,10 +67,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") val clientCertFile = sparkConf .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") - val requestTimeout = - sparkConf.getInt(s"$kubernetesClientPrefix.$REQUEST_TIMEOUT_SUFFIX", 10000) - val connectionTimeout = - sparkConf.getInt(s"$kubernetesClientPrefix.$CONNECTION_TIMEOUT_SUFFIX", 10000) + val requestTimeout = clientType.requestTimeout(sparkConf) + val connectionTimeout = clientType.connectionTimeout(sparkConf) val dispatcher = new Dispatcher( ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) @@ -118,4 +119,21 @@ private[spark] object SparkKubernetesClientFactory extends Logging { }.getOrElse(configBuilder) } } + + object ClientType extends Enumeration { + import Config._ + val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT) + val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT) + val ClientMode = Val(CLIENT_MODE_CLIENT_REQUEST_TIMEOUT, CLIENT_MODE_CLIENT_CONNECTION_TIMEOUT) + + protected case class Val( + requestTimeoutEntry: ConfigEntry[Int], + connectionTimeoutEntry: ConfigEntry[Int]) + extends super.Val { + def requestTimeout(conf: SparkConf): Int = conf.get(requestTimeoutEntry) + def connectionTimeout(conf: SparkConf): Int = conf.get(connectionTimeoutEntry) + } + + implicit def convert(value: Value): Val = value.asInstanceOf[Val] + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 488cd6b244619..92d5176baa4d6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -220,7 +220,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { master, Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, - SUBMISSION_CLIENT_PREFIX, + SparkKubernetesClientFactory.ClientType.Submission, sparkConf, None, None)) { kubernetesClient => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 20feeaf08d688..aa1380eaedb2f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -44,7 +44,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit scheduler: TaskScheduler): SchedulerBackend = { val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) val (authConfPrefix, - clientConfPrefix, + clientType, apiServerUri, defaultServiceAccountToken, defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { @@ -52,13 +52,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit "If the application is deployed using spark-submit in cluster mode, the driver pod name " + "must be provided.") (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, - DRIVER_CLIENT_PREFIX, + SparkKubernetesClientFactory.ClientType.Driver, KUBERNETES_MASTER_INTERNAL_URL, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) } else { (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, - CLIENT_MODE_CLIENT_PREFIX, + SparkKubernetesClientFactory.ClientType.ClientMode, KubernetesUtils.parseMasterUrl(masterURL), None, None) @@ -68,7 +68,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit apiServerUri, Some(sc.conf.get(KUBERNETES_NAMESPACE)), authConfPrefix, - clientConfPrefix, + clientType, sc.conf, defaultServiceAccountToken, defaultServiceAccountCaCrt) From a0b88a348b40b2394212b3c27db3a2683c5ae151 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 4 Mar 2019 17:10:04 +0000 Subject: [PATCH 3/9] remove prefix and suffix constants --- .../org/apache/spark/deploy/k8s/Config.scala | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1d9ece8ee26af..2d5c635b2cb8c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -75,12 +75,6 @@ private[spark] object Config extends Logging { .toSequence .createWithDefault(Nil) - val SUBMISSION_CLIENT_PREFIX = - "spark.kubernetes.kubernetesClient.submission" - val DRIVER_CLIENT_PREFIX = - "spark.kubernetes.kubernetesClient.driver" - val CLIENT_MODE_CLIENT_PREFIX = - "spark.kubernetes.kubernetesClient" val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = @@ -91,41 +85,39 @@ private[spark] object Config extends Logging { val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" - val REQUEST_TIMEOUT_SUFFIX = "requestTimeout" - val CONNECTION_TIMEOUT_SUFFIX = "connectionTimeout" val SUBMISSION_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder(s"$SUBMISSION_CLIENT_PREFIX.$REQUEST_TIMEOUT_SUFFIX") + ConfigBuilder(s"spark.kubernetes.kubernetesClient.submission.requestTimeout") .doc("request timeout to be used in milliseconds for starting the driver") .intConf .createWithDefault(10000) val SUBMISSION_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder(s"$SUBMISSION_CLIENT_PREFIX.$CONNECTION_TIMEOUT_SUFFIX") + ConfigBuilder(s"spark.kubernetes.kubernetesClient.submission.connectionTimeout") .doc("connection timeout to be used in milliseconds for starting the driver") .intConf .createWithDefault(10000) val DRIVER_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder(s"$DRIVER_CLIENT_PREFIX.$REQUEST_TIMEOUT_SUFFIX") + ConfigBuilder(s"spark.kubernetes.kubernetesClient.driver.requestTimeout") .doc("request timeout to be used in milliseconds for requesting executors") .intConf .createWithDefault(10000) val DRIVER_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder(s"$DRIVER_CLIENT_PREFIX.$CONNECTION_TIMEOUT_SUFFIX") + ConfigBuilder(s"spark.kubernetes.kubernetesClient.driver.connectionTimeout") .doc("connection timeout to be used in milliseconds for requesting executors") .intConf .createWithDefault(10000) val CLIENT_MODE_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder(s"$CLIENT_MODE_CLIENT_PREFIX.$REQUEST_TIMEOUT_SUFFIX") + ConfigBuilder(s"spark.kubernetes.kubernetesClient.requestTimeout") .doc("request timeout to be used in milliseconds for requesting executors in client mode") .intConf .createWithDefault(10000) val CLIENT_MODE_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder(s"$CLIENT_MODE_CLIENT_PREFIX.$CONNECTION_TIMEOUT_SUFFIX") + ConfigBuilder(s"spark.kubernetes.kubernetesClient.connectionTimeout") .doc("connection timeout to be used in milliseconds for requesting executors in client mode") .intConf .createWithDefault(10000) From 7f83722f2263397af643230085f9a611801c5092 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 5 Mar 2019 10:10:47 +0000 Subject: [PATCH 4/9] do not use formatted strings --- .../scala/org/apache/spark/deploy/k8s/Config.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2d5c635b2cb8c..44b39c36a1391 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -87,37 +87,37 @@ private[spark] object Config extends Logging { val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" val SUBMISSION_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder(s"spark.kubernetes.kubernetesClient.submission.requestTimeout") + ConfigBuilder("spark.kubernetes.kubernetesClient.submission.requestTimeout") .doc("request timeout to be used in milliseconds for starting the driver") .intConf .createWithDefault(10000) val SUBMISSION_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder(s"spark.kubernetes.kubernetesClient.submission.connectionTimeout") + ConfigBuilder("spark.kubernetes.kubernetesClient.submission.connectionTimeout") .doc("connection timeout to be used in milliseconds for starting the driver") .intConf .createWithDefault(10000) val DRIVER_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder(s"spark.kubernetes.kubernetesClient.driver.requestTimeout") + ConfigBuilder("spark.kubernetes.kubernetesClient.driver.requestTimeout") .doc("request timeout to be used in milliseconds for requesting executors") .intConf .createWithDefault(10000) val DRIVER_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder(s"spark.kubernetes.kubernetesClient.driver.connectionTimeout") + ConfigBuilder("spark.kubernetes.kubernetesClient.driver.connectionTimeout") .doc("connection timeout to be used in milliseconds for requesting executors") .intConf .createWithDefault(10000) val CLIENT_MODE_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder(s"spark.kubernetes.kubernetesClient.requestTimeout") + ConfigBuilder("spark.kubernetes.kubernetesClient.requestTimeout") .doc("request timeout to be used in milliseconds for requesting executors in client mode") .intConf .createWithDefault(10000) val CLIENT_MODE_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder(s"spark.kubernetes.kubernetesClient.connectionTimeout") + ConfigBuilder("spark.kubernetes.kubernetesClient.connectionTimeout") .doc("connection timeout to be used in milliseconds for requesting executors in client mode") .intConf .createWithDefault(10000) From ad17a02ad81844606ad77078d4237369cee8bb9a Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 5 Mar 2019 10:20:26 +0000 Subject: [PATCH 5/9] remove client mode specific overrides, simplify config names --- docs/running-on-kubernetes.md | 30 ++++--------------- .../org/apache/spark/deploy/k8s/Config.scala | 20 +++---------- .../k8s/SparkKubernetesClientFactory.scala | 1 - .../k8s/KubernetesClusterManager.scala | 2 +- 4 files changed, 11 insertions(+), 42 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 38b12468c1cf3..d82dc64d47d15 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -991,49 +991,31 @@ See the [configuration page](configuration.html) for information on Spark config - spark.kubernetes.kubernetesClient.submission.connectionTimeout + spark.kubernetes.submission.connectionTimeout 10000 - Connection timeout in milliseconds for the kubernetes client to use for starting the driver. In client mode, use - spark.kubernetes.kubernetesClient.connectionTimeout instead. + Connection timeout in milliseconds for the kubernetes client to use for starting the driver. - spark.kubernetes.kubernetesClient.submission.requestTimeout + spark.kubernetes.submission.requestTimeout 10000 - Request timeout in milliseconds for the kubernetes client to use for starting the driver. In client mode, use - spark.kubernetes.kubernetesClient.submission.requestTimeout instead. + Request timeout in milliseconds for the kubernetes client to use for starting the driver. - spark.kubernetes.kubernetesClient.driver.connectionTimeout + spark.kubernetes.driver.connectionTimeout 10000 Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors. - In client mode, use spark.kubernetes.kubernetesClient.connectionTimeout instead. - spark.kubernetes.kubernetesClient.driver.requestTimeout + spark.kubernetes.driver.requestTimeout 10000 Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors. - In client mode, use spark.kubernetes.kubernetesClient.connectionTimeout instead. - - - - spark.kubernetes.kubernetesClient.connectionTimeout - 10000 - - In client mode, connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors. - - - - spark.kubernetes.kubernetesClient.requestTimeout - 10000 - - In client mode, request timeout in milliseconds for the kubernetes client in driver to use when requesting executors. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 44b39c36a1391..5e283414c844d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -87,41 +87,29 @@ private[spark] object Config extends Logging { val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" val SUBMISSION_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder("spark.kubernetes.kubernetesClient.submission.requestTimeout") + ConfigBuilder("spark.kubernetes.submission.requestTimeout") .doc("request timeout to be used in milliseconds for starting the driver") .intConf .createWithDefault(10000) val SUBMISSION_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder("spark.kubernetes.kubernetesClient.submission.connectionTimeout") + ConfigBuilder("spark.kubernetes.submission.connectionTimeout") .doc("connection timeout to be used in milliseconds for starting the driver") .intConf .createWithDefault(10000) val DRIVER_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder("spark.kubernetes.kubernetesClient.driver.requestTimeout") + ConfigBuilder("spark.kubernetes.driver.requestTimeout") .doc("request timeout to be used in milliseconds for requesting executors") .intConf .createWithDefault(10000) val DRIVER_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder("spark.kubernetes.kubernetesClient.driver.connectionTimeout") + ConfigBuilder("spark.kubernetes.driver.connectionTimeout") .doc("connection timeout to be used in milliseconds for requesting executors") .intConf .createWithDefault(10000) - val CLIENT_MODE_CLIENT_REQUEST_TIMEOUT = - ConfigBuilder("spark.kubernetes.kubernetesClient.requestTimeout") - .doc("request timeout to be used in milliseconds for requesting executors in client mode") - .intConf - .createWithDefault(10000) - - val CLIENT_MODE_CLIENT_CONNECTION_TIMEOUT = - ConfigBuilder("spark.kubernetes.kubernetesClient.connectionTimeout") - .doc("connection timeout to be used in milliseconds for requesting executors in client mode") - .intConf - .createWithDefault(10000) - val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index b1105dd482c6a..d6c7eb23fd2d0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -124,7 +124,6 @@ private[spark] object SparkKubernetesClientFactory extends Logging { import Config._ val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT) val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT) - val ClientMode = Val(CLIENT_MODE_CLIENT_REQUEST_TIMEOUT, CLIENT_MODE_CLIENT_CONNECTION_TIMEOUT) protected case class Val( requestTimeoutEntry: ConfigEntry[Int], diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index aa1380eaedb2f..9e6b1055c7b89 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -58,7 +58,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) } else { (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, - SparkKubernetesClientFactory.ClientType.ClientMode, + SparkKubernetesClientFactory.ClientType.Driver, KubernetesUtils.parseMasterUrl(masterURL), None, None) From a316760d2423784089de574fd9447e29b24eeb38 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 5 Mar 2019 17:35:37 +0000 Subject: [PATCH 6/9] remove vars, update docs --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 4 ++-- .../spark/deploy/k8s/SparkKubernetesClientFactory.scala | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 5e283414c844d..83b5a758f0f5e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -100,13 +100,13 @@ private[spark] object Config extends Logging { val DRIVER_CLIENT_REQUEST_TIMEOUT = ConfigBuilder("spark.kubernetes.driver.requestTimeout") - .doc("request timeout to be used in milliseconds for requesting executors") + .doc("request timeout to be used in milliseconds for driver to request executors") .intConf .createWithDefault(10000) val DRIVER_CLIENT_CONNECTION_TIMEOUT = ConfigBuilder("spark.kubernetes.driver.connectionTimeout") - .doc("connection timeout to be used in milliseconds for requesting executors") + .doc("connection timeout to be used in milliseconds for driver to request executors") .intConf .createWithDefault(10000) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index d6c7eb23fd2d0..2108a27c39c65 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -67,8 +67,6 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") val clientCertFile = sparkConf .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") - val requestTimeout = clientType.requestTimeout(sparkConf) - val connectionTimeout = clientType.connectionTimeout(sparkConf) val dispatcher = new Dispatcher( ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) @@ -85,8 +83,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .withApiVersion("v1") .withMasterUrl(master) .withWebsocketPingInterval(0) - .withRequestTimeout(requestTimeout) - .withConnectionTimeout(connectionTimeout) + .withRequestTimeout(clientType.requestTimeout(sparkConf)) + .withConnectionTimeout(clientType.connectionTimeout(sparkConf)) .withOption(oauthTokenValue) { (token, configBuilder) => configBuilder.withOauthToken(token) }.withOption(oauthTokenFile) { From e7fa51c3f1541b0b836d805d0e0b16bd2f5e9581 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 5 Mar 2019 17:50:31 +0000 Subject: [PATCH 7/9] scoped import --- .../apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 2108a27c39c65..79e0e30289cef 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -18,8 +18,6 @@ package org.apache.spark.deploy.k8s import java.io.File -import scala.language.implicitConversions - import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} @@ -119,6 +117,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging { } object ClientType extends Enumeration { + import scala.language.implicitConversions import Config._ val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT) val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT) From 280c176f2b130a1aa9cce6c5d763bf57826fda12 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Wed, 6 Mar 2019 09:43:35 +0000 Subject: [PATCH 8/9] inline --- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 9e6b1055c7b89..31ca06b721c5d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -44,7 +44,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit scheduler: TaskScheduler): SchedulerBackend = { val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) val (authConfPrefix, - clientType, apiServerUri, defaultServiceAccountToken, defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { @@ -52,13 +51,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit "If the application is deployed using spark-submit in cluster mode, the driver pod name " + "must be provided.") (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, - SparkKubernetesClientFactory.ClientType.Driver, KUBERNETES_MASTER_INTERNAL_URL, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) } else { (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, - SparkKubernetesClientFactory.ClientType.Driver, KubernetesUtils.parseMasterUrl(masterURL), None, None) @@ -68,7 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit apiServerUri, Some(sc.conf.get(KUBERNETES_NAMESPACE)), authConfPrefix, - clientType, + SparkKubernetesClientFactory.ClientType.Driver, sc.conf, defaultServiceAccountToken, defaultServiceAccountCaCrt) From 1902a1332d60433bab8f4b34b19aad6f2630fb0c Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Wed, 6 Mar 2019 18:25:08 +0000 Subject: [PATCH 9/9] remove redundant import --- .../apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 79e0e30289cef..459259f77796c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -118,7 +118,6 @@ private[spark] object SparkKubernetesClientFactory extends Logging { object ClientType extends Enumeration { import scala.language.implicitConversions - import Config._ val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT) val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT)