diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e23f28cf755c3..d82dc64d47d15 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -990,6 +990,34 @@ See the [configuration page](configuration.html) for information on Spark config Specify whether executor pods should be deleted in case of failure or normal termination. + + spark.kubernetes.submission.connectionTimeout + 10000 + + Connection timeout in milliseconds for the kubernetes client to use for starting the driver. + + + + spark.kubernetes.submission.requestTimeout + 10000 + + Request timeout in milliseconds for the kubernetes client to use for starting the driver. + + + + spark.kubernetes.driver.connectionTimeout + 10000 + + Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors. + + + + spark.kubernetes.driver.requestTimeout + 10000 + + Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors. + + #### Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4cca1e22bd108..83b5a758f0f5e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -86,6 +86,30 @@ private[spark] object Config extends Logging { val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + val SUBMISSION_CLIENT_REQUEST_TIMEOUT = + ConfigBuilder("spark.kubernetes.submission.requestTimeout") + .doc("request timeout to be used in milliseconds for starting the driver") + .intConf + .createWithDefault(10000) + + val SUBMISSION_CLIENT_CONNECTION_TIMEOUT = + ConfigBuilder("spark.kubernetes.submission.connectionTimeout") + .doc("connection timeout to be used in milliseconds for starting the driver") + .intConf + .createWithDefault(10000) + + val DRIVER_CLIENT_REQUEST_TIMEOUT = + ConfigBuilder("spark.kubernetes.driver.requestTimeout") + .doc("request timeout to be used in milliseconds for driver to request executors") + .intConf + .createWithDefault(10000) + + val DRIVER_CLIENT_CONNECTION_TIMEOUT = + ConfigBuilder("spark.kubernetes.driver.connectionTimeout") + .doc("connection timeout to be used in milliseconds for driver to request executors") + .intConf + .createWithDefault(10000) + val KUBERNETES_SERVICE_ACCOUNT_NAME = ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 06dea42cc135c..459259f77796c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -28,6 +28,7 @@ import okhttp3.Dispatcher import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.util.ThreadUtils /** @@ -41,6 +42,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging { master: String, namespace: Option[String], kubernetesAuthConfPrefix: String, + clientType: ClientType.Value, sparkConf: SparkConf, defaultServiceAccountToken: Option[File], defaultServiceAccountCaCert: Option[File]): KubernetesClient = { @@ -79,6 +81,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging { .withApiVersion("v1") .withMasterUrl(master) .withWebsocketPingInterval(0) + .withRequestTimeout(clientType.requestTimeout(sparkConf)) + .withConnectionTimeout(clientType.connectionTimeout(sparkConf)) .withOption(oauthTokenValue) { (token, configBuilder) => configBuilder.withOauthToken(token) }.withOption(oauthTokenFile) { @@ -111,4 +115,20 @@ private[spark] object SparkKubernetesClientFactory extends Logging { }.getOrElse(configBuilder) } } + + object ClientType extends Enumeration { + import scala.language.implicitConversions + val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT) + val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT) + + protected case class Val( + requestTimeoutEntry: ConfigEntry[Int], + connectionTimeoutEntry: ConfigEntry[Int]) + extends super.Val { + def requestTimeout(conf: SparkConf): Int = conf.get(requestTimeoutEntry) + def connectionTimeout(conf: SparkConf): Int = conf.get(connectionTimeoutEntry) + } + + implicit def convert(value: Value): Val = value.asInstanceOf[Val] + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 042012e9c74a4..92d5176baa4d6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -220,6 +220,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { master, Some(kubernetesConf.namespace), KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, + SparkKubernetesClientFactory.ClientType.Submission, sparkConf, None, None)) { kubernetesClient => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 809bdf8ca8c27..31ca06b721c5d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -65,6 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit apiServerUri, Some(sc.conf.get(KUBERNETES_NAMESPACE)), authConfPrefix, + SparkKubernetesClientFactory.ClientType.Driver, sc.conf, defaultServiceAccountToken, defaultServiceAccountCaCrt)