Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36462][K8S] Add the ability to selectively disable watching or polling #36433

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,25 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val KUBERNETES_EXECUTOR_ENABLE_API_POLLING =
ConfigBuilder("spark.kubernetes.executor.enableApiPolling")
.doc("If Spark should poll Kubernetes for executor pod status. " +
"You should leave this enabled unless you're encountering issues with your etcd.")
.version("3.4.0")
.internal()
holdenk marked this conversation as resolved.
Show resolved Hide resolved
.booleanConf
.createWithDefault(true)

val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER =
ConfigBuilder("spark.kubernetes.executor.enableApiWatcher")
.doc("If Spark should create watchers for executor pod status. " +
"You should leave this enabled unless you're encountering issues with your etcd.")
.version("3.4.0")
.internal()
.booleanConf
.createWithDefault(true)


val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.apiPollingInterval")
.doc("Interval between polls against the Kubernetes API server to inspect the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,18 @@ class ExecutorPodsPollingSnapshotSource(
pollingExecutor: ScheduledExecutorService) extends Logging {

private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING)

private var pollingFuture: Future[_] = _

@Since("3.1.3")
def start(applicationId: String): Unit = {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
if (pollingEnabled) {
require(pollingFuture == null, "Cannot start polling more than once.")
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
}
}

@Since("3.1.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand All @@ -38,19 +40,28 @@ import org.apache.spark.util.Utils
@DeveloperApi
class ExecutorPodsWatchSnapshotSource(
snapshotsStore: ExecutorPodsSnapshotsStore,
kubernetesClient: KubernetesClient) extends Logging {
kubernetesClient: KubernetesClient,
conf: SparkConf) extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is a developer API as Stable level. Could you make a separate constructor for this new feature?
I believe we should keep the existing 2-parameter constructer for backward compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure :)


private var watchConnection: Closeable = _
private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)

// If we're constructed with the old API get the SparkConf from the running SparkContext.
def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = {
this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf)
}

@Since("3.1.3")
def start(applicationId: String): Unit = {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
if (enableWatching) {
require(watchConnection == null, "Cannot start the watcher twice.")
logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
watchConnection = kubernetesClient.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.watch(new ExecutorPodsWatcher())
}
}

@Since("3.1.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
kubernetesClient)
kubernetesClient,
sc.conf)

val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"kubernetes-executor-pod-polling-sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter {

private val sparkConf = new SparkConf
private val defaultConf = new SparkConf()

private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)

@Mock
private var kubernetesClient: KubernetesClient = _
Expand All @@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
before {
MockitoAnnotations.openMocks(this).close()
pollingExecutor = new DeterministicScheduler()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(appIdLabeledPods)
Expand All @@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
}

test("Items returned by the API should be pushed to the event queue") {
val sparkConf = new SparkConf()
pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource(
sparkConf,
kubernetesClient,
eventQueue,
pollingExecutor)
pollingSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
when(activeExecutorPods.list())
Expand All @@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
}

test("SPARK-36462: If polling is disabled we don't call pods() on the client") {
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(kubernetesClient, never()).pods()
}

test("SPARK-36334: Support pod listing with resource version") {
Seq(true, false).foreach { value =>
val sparkConf = new SparkConf()
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value),
kubernetesClient,
eventQueue,
pollingExecutor)
source.start(TEST_SPARK_APP_ID)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
if (value) {
verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{verify, when}
import org.mockito.Mockito.{never, verify, when}
import org.scalatest.BeforeAndAfter

import org.apache.spark.SparkConf
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
Expand Down Expand Up @@ -61,17 +63,27 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(executorRoleLabeledPods)
when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
}

test("Watch events should be pushed to the snapshots store as snapshot updates.") {
val conf = new SparkConf()
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
val exec1 = runningExecutor(1)
val exec2 = runningExecutor(2)
watch.getValue.eventReceived(Action.ADDED, exec1)
watch.getValue.eventReceived(Action.MODIFIED, exec2)
verify(eventQueue).updatePod(exec1)
verify(eventQueue).updatePod(exec2)
}

test("SPARK-36462: Verify if watchers are disabled we don't call pods() on the client") {
val conf = new SparkConf()
conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false)
watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource(
eventQueue, kubernetesClient, conf)
watchSourceUnderTest.start(TEST_SPARK_APP_ID)
verify(kubernetesClient, never()).pods()
}
}