Skip to content

Commit

Permalink
[SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mi…
Browse files Browse the repository at this point in the history
…smatch

## What changes were proposed in this pull request?

In K8S Cluster mode, the algorithm to generate spark-app-selector/spark.app.id of spark driver is different with spark executor.
This patch makes sure spark driver and executor to use the same spark-app-selector/spark.app.id if spark.app.id is set, otherwise it will use superclass applicationId.

In K8S Client mode, spark-app-selector/spark.app.id for executors will use superclass applicationId.

## How was this patch tested?

Manually run."

Closes apache#23322 from suxingfate/SPARK-25922.

Lead-authored-by: suxingfate <suxingfate@163.com>
Co-authored-by: xinglwang <xinglwang@ebay.com>
Signed-off-by: Yinan Li <ynli@google.com>
  • Loading branch information
2 people authored and jackylee-ch committed Feb 18, 2019
1 parent eac8f26 commit 03c2a87
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.ExecutorService

import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
Expand All @@ -39,10 +40,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
private implicit val requestExecutorContext =
ExecutionContext.fromExecutorService(requestExecutorsService)

protected override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
Expand All @@ -60,6 +61,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
removeExecutor(executorId, reason)
}

/**
* Get an application ID associated with the job.
* This returns the string value of spark.app.id if set, otherwise
* the locally-generated ID from the superclass.
*
* @return The application ID
*/
override def applicationId(): String = {
conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
}

override def start(): Unit = {
super.start()
if (!Utils.isDynamicAllocationEnabled(conf)) {
Expand Down Expand Up @@ -88,7 +100,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient.pods()
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
Expand Down Expand Up @@ -120,7 +133,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
kubernetesClient.pods()
kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId())
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
Expand All @@ -133,7 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
}

private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends DriverEndpoint(rpcEnv, sparkProperties) {
extends DriverEndpoint(rpcEnv, sparkProperties) {

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
// Don't do anything besides disabling the executor - allow the Kubernetes API events to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private val requestExecutorsService = new DeterministicScheduler()
private val sparkConf = new SparkConf(false)
.set("spark.executor.instances", "3")
.set("spark.app.id", TEST_SPARK_APP_ID)

@Mock
private var sc: SparkContext = _
Expand Down Expand Up @@ -87,8 +88,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
when(sc.env).thenReturn(env)
when(env.rpcEnv).thenReturn(rpcEnv)
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
when(rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
when(
rpcEnv.setupEndpoint(
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
driverEndpoint.capture()))
.thenReturn(driverEndpointRef)
when(kubernetesClient.pods()).thenReturn(podOperations)
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
Expand All @@ -100,9 +103,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
podAllocator,
lifecycleEventHandler,
watchEvents,
pollEvents) {
override def applicationId(): String = TEST_SPARK_APP_ID
}
pollEvents)
}

test("Start all components") {
Expand All @@ -127,8 +128,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn

test("Remove executor") {
schedulerBackendUnderTest.start()
schedulerBackendUnderTest.doRemoveExecutor(
"1", ExecutorKilled)
schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
}

Expand Down

0 comments on commit 03c2a87

Please sign in to comment.