Skip to content

Commit

Permalink
Use the new initContainers field instead of the deprecated annotation (
Browse files Browse the repository at this point in the history
…apache#528)

* Use the new initContainers field in Kubernetes 1.8

* Fixed the integration tests
  • Loading branch information
liyinan926 authored and foxish committed Oct 20, 2017
1 parent f94499b commit 0abf0b9
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 44 deletions.
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>2.2.13</kubernetes.client.version>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ package object constants {
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"

// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH =
"/mnt/secrets/spark-init"
private[spark] val INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,15 @@
*/
package org.apache.spark.deploy.k8s.submit

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.deploy.k8s.constants._

private[spark] object InitContainerUtil {

private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)

def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
val resolvedInitContainers = originalPodSpec
.getMetadata
.getAnnotations
.asScala
.get(INIT_CONTAINER_ANNOTATION)
.map { existingInitContainerAnnotation =>
val existingInitContainers = OBJECT_MAPPER.readValue(
existingInitContainerAnnotation, classOf[List[Container]])
existingInitContainers ++ Seq(initContainer)
}.getOrElse(Seq(initContainer))
val resolvedSerializedInitContainers = OBJECT_MAPPER.writeValueAsString(resolvedInitContainers)
new PodBuilder(originalPodSpec)
.editMetadata()
.removeFromAnnotations(INIT_CONTAINER_ANNOTATION)
.addToAnnotations(INIT_CONTAINER_ANNOTATION, resolvedSerializedInitContainers)
.endMetadata()
.editOrNewSpec()
.addToInitContainers(initContainer)
.endSpec()
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit

import java.util.concurrent.{CountDownLatch, TimeUnit}

import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -109,15 +109,15 @@ private[k8s] class LoggingPodStatusWatcherImpl(
("namespace", pod.getMetadata.getNamespace()),
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", pod.getMetadata.getCreationTimestamp()),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),

// spec details
("service account name", pod.getSpec.getServiceAccountName()),
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName()),

// status
("start time", pod.getStatus.getStartTime),
("start time", formatTime(pod.getStatus.getStartTime)),
("container images",
pod.getStatus.getContainerStatuses()
.asScala
Expand Down Expand Up @@ -162,7 +162,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", running.getStartedAt))
("Container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
Expand All @@ -175,4 +175,8 @@ private[k8s] class LoggingPodStatusWatcherImpl(
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("Container state", "N/A")))
}

private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
import org.apache.spark.util.Utils

private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
private[spark] class InitContainerBootstrapStepSuite extends SparkFunSuite {

private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)
private val CONFIG_MAP_NAME = "spark-init-config-map"
private val CONFIG_MAP_KEY = "spark-init-config-map-key"

Expand All @@ -59,12 +58,9 @@ private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvKey)
assert(additionalDriverEnv.head.getValue ===
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvValue)
val driverAnnotations = preparedDriverSpec.driverPod.getMetadata.getAnnotations.asScala
assert(driverAnnotations.size === 1)
val initContainers = OBJECT_MAPPER.readValue(
driverAnnotations(INIT_CONTAINER_ANNOTATION), classOf[Array[Container]])
assert(initContainers.length === 1)
val initContainerEnv = initContainers.head.getEnv.asScala
val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
assert(initContainers.size() === 1)
val initContainerEnv = initContainers.get(0).getEnv.asScala
assert(initContainerEnv.size === 1)
assert(initContainerEnv.head.getName ===
SecondTestInitContainerConfigurationStep$.additionalInitContainerEnvKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
verify(nodeAffinityExecutorPodModifier, times(1))
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))

assert(executor.getMetadata.getAnnotations.size() === 1)
assert(executor.getMetadata.getAnnotations.containsKey(INIT_CONTAINER_ANNOTATION))
assert(executor.getSpec.getInitContainers.size() === 1)
checkOwnerReferences(executor, driverPodUid)
}

Expand Down
4 changes: 2 additions & 2 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-linux-amd64</url>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/linux-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
Expand All @@ -363,7 +363,7 @@
<goal>wget</goal>
</goals>
<configuration>
<url>https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-darwin-amd64</url>
<url>https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64</url>
<outputDirectory>${project.build.directory}/minikube-bin/darwin-amd64</outputDirectory>
<outputFileName>minikube</outputFileName>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ private[spark] object Minikube extends Logging {
def getMinikubeStatus: MinikubeStatus.Value = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
val statusString = executeMinikube("status")
.filter(_.contains("minikubeVM: "))
.filter(_.contains("minikube: "))
.head
.replaceFirst("minikubeVM: ", "")
.replaceFirst("minikube: ", "")
MinikubeStatus.unapply(statusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
}
Expand All @@ -78,7 +78,7 @@ private[spark] object Minikube extends Logging {

def deleteMinikube(): Unit = synchronized {
assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
if (getMinikubeStatus != MinikubeStatus.DOES_NOT_EXIST) {
if (getMinikubeStatus != MinikubeStatus.NONE) {
executeMinikube("delete")
} else {
logInfo("Minikube was already not running.")
Expand Down Expand Up @@ -115,10 +115,17 @@ private[spark] object Minikube extends Logging {

private[spark] object MinikubeStatus extends Enumeration {

// The following states are listed according to
// https://github.com/docker/machine/blob/master/libmachine/state/state.go.
val STARTING = status("Starting")
val RUNNING = status("Running")
val PAUSED = status("Paused")
val STOPPING = status("Stopping")
val STOPPED = status("Stopped")
val DOES_NOT_EXIST = status("Does Not Exist")
val ERROR = status("Error")
val TIMEOUT = status("Timeout")
val SAVED = status("Saved")
val NONE = status("")

def status(value: String): Value = new Val(nextId, value)
def unapply(s: String): Option[Value] = values.find(s == _.toString)
Expand Down

0 comments on commit 0abf0b9

Please sign in to comment.