diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 01c94670b807a..5a790f85964b8 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes - 2.2.13 + 3.0.0 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index 1dad5393ca301..80c52a93dfc31 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -74,7 +74,6 @@ package object constants { private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" // Bootstrapping dependencies with the init-container - private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH = "/mnt/secrets/spark-init" private[spark] val INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/InitContainerUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/InitContainerUtil.scala index 55ed06d7ca415..1101fcaac39b6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/InitContainerUtil.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/InitContainerUtil.scala @@ -16,34 +16,15 @@ */ package org.apache.spark.deploy.k8s.submit -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder} -import scala.collection.JavaConverters._ - -import org.apache.spark.deploy.k8s.constants._ private[spark] object InitContainerUtil { - private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule) - def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = { - val resolvedInitContainers = originalPodSpec - .getMetadata - .getAnnotations - .asScala - .get(INIT_CONTAINER_ANNOTATION) - .map { existingInitContainerAnnotation => - val existingInitContainers = OBJECT_MAPPER.readValue( - existingInitContainerAnnotation, classOf[List[Container]]) - existingInitContainers ++ Seq(initContainer) - }.getOrElse(Seq(initContainer)) - val resolvedSerializedInitContainers = OBJECT_MAPPER.writeValueAsString(resolvedInitContainers) new PodBuilder(originalPodSpec) - .editMetadata() - .removeFromAnnotations(INIT_CONTAINER_ANNOTATION) - .addToAnnotations(INIT_CONTAINER_ANNOTATION, resolvedSerializedInitContainers) - .endMetadata() + .editOrNewSpec() + .addToInitContainers(initContainer) + .endSpec() .build() } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index c645b008d736d..d25003a65bb04 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit import java.util.concurrent.{CountDownLatch, TimeUnit} -import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod} +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ @@ -109,7 +109,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( ("namespace", pod.getMetadata.getNamespace()), ("labels", pod.getMetadata.getLabels().asScala.mkString(", ")), ("pod uid", pod.getMetadata.getUid), - ("creation time", pod.getMetadata.getCreationTimestamp()), + ("creation time", formatTime(pod.getMetadata.getCreationTimestamp)), // spec details ("service account name", pod.getSpec.getServiceAccountName()), @@ -117,7 +117,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( ("node name", pod.getSpec.getNodeName()), // status - ("start time", pod.getStatus.getStartTime), + ("start time", formatTime(pod.getStatus.getStartTime)), ("container images", pod.getStatus.getContainerStatuses() .asScala @@ -162,7 +162,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( case running: ContainerStateRunning => Seq( ("Container state", "Running"), - ("Container started at", running.getStartedAt)) + ("Container started at", formatTime(running.getStartedAt))) case waiting: ContainerStateWaiting => Seq( ("Container state", "Waiting"), @@ -175,4 +175,8 @@ private[k8s] class LoggingPodStatusWatcherImpl( throw new SparkException(s"Unexpected container status type ${unknown.getClass}.") }.getOrElse(Seq(("Container state", "N/A"))) } + + private def formatTime(time: Time): String = { + if (time != null) time.getTime else "N/A" + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala index ee3b4229b16c1..1b011671c3f4a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/InitContainerBootstrapStepSuite.scala @@ -31,9 +31,8 @@ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec} import org.apache.spark.util.Utils -private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite { +private[spark] class InitContainerBootstrapStepSuite extends SparkFunSuite { - private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule) private val CONFIG_MAP_NAME = "spark-init-config-map" private val CONFIG_MAP_KEY = "spark-init-config-map-key" @@ -59,12 +58,9 @@ private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite { FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvKey) assert(additionalDriverEnv.head.getValue === FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvValue) - val driverAnnotations = preparedDriverSpec.driverPod.getMetadata.getAnnotations.asScala - assert(driverAnnotations.size === 1) - val initContainers = OBJECT_MAPPER.readValue( - driverAnnotations(INIT_CONTAINER_ANNOTATION), classOf[Array[Container]]) - assert(initContainers.length === 1) - val initContainerEnv = initContainers.head.getEnv.asScala + val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers + assert(initContainers.size() === 1) + val initContainerEnv = initContainers.get(0).getEnv.asScala assert(initContainerEnv.size === 1) assert(initContainerEnv.head.getName === SecondTestInitContainerConfigurationStep$.additionalInitContainerEnvKey) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index bb09cb801b5a9..71204a5aa1deb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -179,8 +179,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) - assert(executor.getMetadata.getAnnotations.size() === 1) - assert(executor.getMetadata.getAnnotations.containsKey(INIT_CONTAINER_ANNOTATION)) + assert(executor.getSpec.getInitContainers.size() === 1) checkOwnerReferences(executor, driverPodUid) } diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index dda4743d607b2..c882090200132 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -351,7 +351,7 @@ wget - https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-linux-amd64 + https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64 ${project.build.directory}/minikube-bin/linux-amd64 minikube @@ -363,7 +363,7 @@ wget - https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-darwin-amd64 + https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64 ${project.build.directory}/minikube-bin/darwin-amd64 minikube diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index bd31bde6dabf4..c9b3e0495896f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -60,9 +60,9 @@ private[spark] object Minikube extends Logging { def getMinikubeStatus: MinikubeStatus.Value = synchronized { assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) val statusString = executeMinikube("status") - .filter(_.contains("minikubeVM: ")) + .filter(_.contains("minikube: ")) .head - .replaceFirst("minikubeVM: ", "") + .replaceFirst("minikube: ", "") MinikubeStatus.unapply(statusString) .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) } @@ -78,7 +78,7 @@ private[spark] object Minikube extends Logging { def deleteMinikube(): Unit = synchronized { assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) - if (getMinikubeStatus != MinikubeStatus.DOES_NOT_EXIST) { + if (getMinikubeStatus != MinikubeStatus.NONE) { executeMinikube("delete") } else { logInfo("Minikube was already not running.") @@ -115,10 +115,17 @@ private[spark] object Minikube extends Logging { private[spark] object MinikubeStatus extends Enumeration { + // The following states are listed according to + // https://github.com/docker/machine/blob/master/libmachine/state/state.go. + val STARTING = status("Starting") val RUNNING = status("Running") + val PAUSED = status("Paused") + val STOPPING = status("Stopping") val STOPPED = status("Stopped") - val DOES_NOT_EXIST = status("Does Not Exist") + val ERROR = status("Error") + val TIMEOUT = status("Timeout") val SAVED = status("Saved") + val NONE = status("") def status(value: String): Value = new Val(nextId, value) def unapply(s: String): Option[Value] = values.find(s == _.toString)