From c3428f7ed78a2443e708341c1299ecf333e84c17 Mon Sep 17 00:00:00 2001 From: Anirudh Ramanathan Date: Fri, 27 Jan 2017 11:15:05 -0800 Subject: [PATCH] Added GC to components (#56) --- resource-managers/kubernetes/core/pom.xml | 2 +- .../spark/deploy/kubernetes/Client.scala | 19 +++++++++++++++ .../KubernetesClusterSchedulerBackend.scala | 24 ++++++++++++++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 388defd93465d..86d7dec2c076f 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes - 1.4.17 + 1.4.34 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 537f6b6a115e9..77b7c793dc37e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -123,6 +123,8 @@ private[spark] class Client( .endSpec() .done() sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName) + sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId) + sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) val submitRequest = buildSubmissionRequest() @@ -131,6 +133,23 @@ private[spark] class Client( val podWatcher = new Watcher[Pod] { override def eventReceived(action: Action, t: Pod): Unit = { + if (action == Action.ADDED) { + val ownerRefs = new ArrayBuffer[OwnerReference] + ownerRefs += new OwnerReferenceBuilder() + .withApiVersion(t.getApiVersion) + .withController(true) + .withKind(t.getKind) + .withName(t.getMetadata.getName) + .withUid(t.getMetadata.getUid) + .build() + + secret.getMetadata().setOwnerReferences(ownerRefs.asJava) + kubernetesClient.secrets().createOrReplace(secret) + + service.getMetadata().setOwnerReferences(ownerRefs.asJava) + kubernetesClient.services().createOrReplace(service) + } + if ((action == Action.ADDED || action == Action.MODIFIED) && t.getStatus.getPhase == "Running" && !submitCompletedFuture.isDone) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index b7110ba901842..f512c50a9a934 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -60,6 +60,11 @@ private[spark] class KubernetesClusterSchedulerBackend( .getOrElse( throw new SparkException("Must specify the service name the driver is running with")) + private val kubernetesDriverPodName = conf + .getOption("spark.kubernetes.driver.pod.name") + .getOrElse( + throw new SparkException("Must specify the driver pod name")) + private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g") private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) @@ -82,6 +87,15 @@ private[spark] class KubernetesClusterSchedulerBackend( private val kubernetesClient = KubernetesClientBuilder .buildFromWithinPod(kubernetesMaster, kubernetesNamespace) + val driverPod = try { + kubernetesClient.pods().inNamespace(kubernetesNamespace). + withName(kubernetesDriverPodName).get() + } catch { + case throwable: Throwable => + logError(s"Executor cannot find driver pod.", throwable) + throw new SparkException(s"Executor cannot find driver pod", throwable) + } + override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 @@ -202,7 +216,15 @@ private[spark] class KubernetesClusterSchedulerBackend( .withNewMetadata() .withName(name) .withLabels(selectors) - .endMetadata() + .withOwnerReferences() + .addNewOwnerReference() + .withController(true) + .withApiVersion(driverPod.getApiVersion) + .withKind(driverPod.getKind) + .withName(driverPod.getMetadata.getName) + .withUid(driverPod.getMetadata.getUid) + .endOwnerReference() + .endMetadata() .withNewSpec() .addNewContainer() .withName(s"exec-${applicationId()}-container")