Skip to content

Commit

Permalink
Allow docker image pull policy to be configurable (apache#328)
Browse files Browse the repository at this point in the history
* Allow docker image pull policy to be configurable

* Add flag documentation

* Update running-on-kubernetes.md
  • Loading branch information
tnachen authored and foxish committed Jul 24, 2017
1 parent c312567 commit 4f6a4d7
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 5 deletions.
7 changes: 7 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,13 @@ from the other deployment modes. See the [configuration page](configuration.html
Interval between reports of the current Spark job status in cluster mode.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.docker.image.pullPolicy</code></td>
<td><code>IfNotPresent</code></td>
<td>
Docker image pull policy used when pulling Docker images with Kubernetes.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[spark] trait SparkPodInitContainerBootstrap {

private[spark] class SparkPodInitContainerBootstrapImpl(
initContainerImage: String,
dockerImagePullPolicy: String,
jarsDownloadPath: String,
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
Expand All @@ -60,7 +61,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
val initContainer = new ContainerBuilder()
.withName(s"spark-init")
.withImage(initContainerImage)
.withImagePullPolicy("IfNotPresent")
.withImagePullPolicy(dockerImagePullPolicy)
.addNewVolumeMount()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ package object config extends Logging {
.stringConf
.createWithDefault(s"spark-executor:$sparkVersion")

private[spark] val DOCKER_IMAGE_PULL_POLICY =
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
.doc("Docker image pull policy when pulling any docker image in Kubernetes integration")
.stringConf
.createWithDefault("IfNotPresent")

private[spark] val APISERVER_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
private[spark] val APISERVER_AUTH_DRIVER_CONF_PREFIX =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ private[spark] class Client(
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(kubernetesAppId)
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
private val memoryOverheadMb = sparkConf
.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
Expand Down Expand Up @@ -99,7 +100,7 @@ private[spark] class Client(
val driverContainer = new ContainerBuilder()
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.withImagePullPolicy(dockerImagePullPolicy)
.addToEnv(driverExtraClasspathEnv.toSeq: _*)
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val configMapName = s"$kubernetesAppId-init-config"
private val configMapKey = s"$kubernetesAppId-init-config-key"
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)

override def provideInitContainerConfigMapBuilder(
Expand Down Expand Up @@ -196,6 +197,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
}
new SparkPodInitContainerBootstrapImpl(
initContainerImage,
dockerImagePullPolicy,
jarsDownloadPath,
filesDownloadPath,
downloadTimeoutMinutes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val maybeExecutorInitContainerSecretName =
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET)
val maybeExecutorInitContainerSecretMount =
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR)
sparkConf.get(EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR)
val executorInitContainerSecretVolumePlugin = for {
initContainerSecretName <- maybeExecutorInitContainerSecretName
initContainerSecretMountPath <- maybeExecutorInitContainerSecretMount
Expand All @@ -65,6 +65,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
} yield {
new SparkPodInitContainerBootstrapImpl(
sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE),
sparkConf.get(DOCKER_IMAGE_PULL_POLICY),
sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION),
sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION),
sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT),
Expand Down Expand Up @@ -95,4 +96,3 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private[spark] class KubernetesClusterSchedulerBackend(

private var shufflePodCache: Option[ShufflePodCache] = None
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY)
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
private val blockmanagerPort = conf
Expand Down Expand Up @@ -354,7 +355,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.addNewContainer()
.withName(s"executor")
.withImage(executorDockerImage)
.withImagePullPolicy("IfNotPresent")
.withImagePullPolicy(dockerImagePullPolicy)
.withNewResources()
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryLimitQuantity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.deploy.kubernetes.constants._
class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAfter {
private val OBJECT_MAPPER = new ObjectMapper()
private val INIT_CONTAINER_IMAGE = "spark-init:latest"
private val DOCKER_IMAGE_PULL_POLICY = "IfNotPresent"
private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars"
private val FILES_DOWNLOAD_PATH = "/var/data/spark-files"
private val DOWNLOAD_TIMEOUT_MINUTES = 5
Expand Down Expand Up @@ -137,6 +138,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf
private def bootstrapPodWithoutSubmittedDependencies(): Pod = {
val bootstrapUnderTest = new SparkPodInitContainerBootstrapImpl(
INIT_CONTAINER_IMAGE,
DOCKER_IMAGE_PULL_POLICY,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOWNLOAD_TIMEOUT_MINUTES,
Expand All @@ -150,6 +152,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf
private def bootstrapPodWithSubmittedDependencies(): Pod = {
val bootstrapUnderTest = new SparkPodInitContainerBootstrapImpl(
INIT_CONTAINER_IMAGE,
DOCKER_IMAGE_PULL_POLICY,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH,
DOWNLOAD_TIMEOUT_MINUTES,
Expand Down

0 comments on commit 4f6a4d7

Please sign in to comment.