Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch #23322

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.ExecutorService

import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
Expand All @@ -39,10 +40,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
private implicit val requestExecutorContext =
ExecutionContext.fromExecutorService(requestExecutorsService)

protected override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
Expand All @@ -60,6 +61,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
removeExecutor(executorId, reason)
}

/**
* Get an application ID associated with the job.
* This returns the string value of spark.app.id if set, otherwise
* the locally-generated ID from the superclass.
*
* @return The application ID
*/
override def applicationId(): String = {
conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
}

override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
Expand Down Expand Up @@ -88,7 +100,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient.pods()
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
Expand Down Expand Up @@ -120,7 +133,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
kubernetesClient.pods()
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
Expand All @@ -133,7 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {
extends DriverEndpoint(rpcEnv, sparkProperties) {

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
// Don't do anything besides disabling the executor - allow the Kubernetes API events to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private val requestExecutorsService = new DeterministicScheduler()
private val sparkConf = new SparkConf(false)
.set("spark.executor.instances", "3")
.set("spark.app.id", TEST_SPARK_APP_ID)

@Mock
private var sc: SparkContext = _
Expand Down Expand Up @@ -87,8 +88,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(sc.env).thenReturn(env)
when(env.rpcEnv).thenReturn(rpcEnv)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
when(rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
when(
rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
Expand All @@ -100,9 +103,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
podAllocator,
lifecycleEventHandler,
watchEvents,
pollEvents) {
override def applicationId(): String = TEST_SPARK_APP_ID
}
pollEvents)
}

test("Start all components") {
Expand All @@ -127,8 +128,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn

test("Remove executor") {
schedulerBackendUnderTest.start()
schedulerBackendUnderTest.doRemoveExecutor(
"1", ExecutorKilled)
schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
}

Expand Down