Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27023][K8S] Make k8s client timeouts configurable #23928

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,34 @@ See the [configuration page](configuration.html) for information on Spark config
Specify whether executor pods should be deleted in case of failure or normal termination.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submission.connectionTimeout</code></td>
<td>10000</td>
onursatici marked this conversation as resolved.
Show resolved Hide resolved
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
<td>
Connection timeout in milliseconds for the kubernetes client to use for starting the driver.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submission.requestTimeout</code></td>
<td>10000</td>
<td>
Request timeout in milliseconds for the kubernetes client to use for starting the driver.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.connectionTimeout</code></td>
<td>10000</td>
<td>
Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.requestTimeout</code></td>
<td>10000</td>
<td>
Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors.
</td>
</tr>
</table>

#### Pod template properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,30 @@ private[spark] object Config extends Logging {
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"

val SUBMISSION_CLIENT_REQUEST_TIMEOUT =
ConfigBuilder("spark.kubernetes.submission.requestTimeout")
.doc("request timeout to be used in milliseconds for starting the driver")
.intConf
.createWithDefault(10000)

val SUBMISSION_CLIENT_CONNECTION_TIMEOUT =
ConfigBuilder("spark.kubernetes.submission.connectionTimeout")
.doc("connection timeout to be used in milliseconds for starting the driver")
.intConf
.createWithDefault(10000)

val DRIVER_CLIENT_REQUEST_TIMEOUT =
ConfigBuilder("spark.kubernetes.driver.requestTimeout")
.doc("request timeout to be used in milliseconds for driver to request executors")
.intConf
.createWithDefault(10000)

val DRIVER_CLIENT_CONNECTION_TIMEOUT =
ConfigBuilder("spark.kubernetes.driver.connectionTimeout")
.doc("connection timeout to be used in milliseconds for driver to request executors")
.intConf
.createWithDefault(10000)

val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import okhttp3.Dispatcher
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.ThreadUtils

/**
Expand All @@ -41,6 +42,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
clientType: ClientType.Value,
onursatici marked this conversation as resolved.
Show resolved Hide resolved
sparkConf: SparkConf,
defaultServiceAccountToken: Option[File],
defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
Expand Down Expand Up @@ -79,6 +81,8 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withRequestTimeout(clientType.requestTimeout(sparkConf))
.withConnectionTimeout(clientType.connectionTimeout(sparkConf))
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
Expand Down Expand Up @@ -111,4 +115,20 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}.getOrElse(configBuilder)
}
}

object ClientType extends Enumeration {
import scala.language.implicitConversions
val Driver = Val(DRIVER_CLIENT_REQUEST_TIMEOUT, DRIVER_CLIENT_CONNECTION_TIMEOUT)
val Submission = Val(SUBMISSION_CLIENT_REQUEST_TIMEOUT, SUBMISSION_CLIENT_CONNECTION_TIMEOUT)

protected case class Val(
requestTimeoutEntry: ConfigEntry[Int],
connectionTimeoutEntry: ConfigEntry[Int])
extends super.Val {
def requestTimeout(conf: SparkConf): Int = conf.get(requestTimeoutEntry)
def connectionTimeout(conf: SparkConf): Int = conf.get(connectionTimeoutEntry)
}

implicit def convert(value: Value): Val = value.asInstanceOf[Val]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
master,
Some(kubernetesConf.namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
SparkKubernetesClientFactory.ClientType.Submission,
sparkConf,
None,
None)) { kubernetesClient =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
apiServerUri,
Some(sc.conf.get(KUBERNETES_NAMESPACE)),
authConfPrefix,
SparkKubernetesClientFactory.ClientType.Driver,
sc.conf,
defaultServiceAccountToken,
defaultServiceAccountCaCrt)
Expand Down