From 2ed6c6bd558875dede5fa909f817211192d87c08 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 12 Oct 2021 12:13:09 -0700 Subject: [PATCH 01/11] Add the ability to selectively disable watching or polling for pods on Kubernetes for environments where etcd may be under a high load or otherwise not support polling/watching. --- .../org/apache/spark/deploy/k8s/Config.scala | 17 ++++++++++ .../ExecutorPodsPollingSnapshotSource.scala | 11 ++++--- .../k8s/ExecutorPodsWatchSnapshotSource.scala | 22 ++++++++----- .../k8s/KubernetesClusterManager.scala | 3 +- ...ecutorPodsPollingSnapshotSourceSuite.scala | 33 ++++++++++++++----- ...ExecutorPodsWatchSnapshotSourceSuite.scala | 20 ++++++++--- 6 files changed, 80 insertions(+), 26 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1f508b52729e7..4ae15cd46a074 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -483,6 +483,23 @@ private[spark] object Config extends Logging { .checkValue(interval => interval > 0, s"Logging interval must be a positive time value.") .createWithDefaultString("1s") + val KUBERNETES_EXECUTOR_ENABLE_API_POLLING = + ConfigBuilder("spark.kubernetes.executor.enableApiPolling") + .doc("If Spark should poll Kubernetes for executor pod status. " + + "You should leave this enabled unless your encountering performance issues with your etcd.") + .version("3.3.0") + .booleanConf + .createWithDefault(true) + + val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER = + ConfigBuilder("spark.kubernetes.executor.enableApiWatcher") + .doc("If Spark should create watchers for executor pod status. " + + "You should leave this enabled unless your encountering performance issues with your etcd.") + .version("3.3.0") + .booleanConf + .createWithDefault(true) + + val KUBERNETES_EXECUTOR_API_POLLING_INTERVAL = ConfigBuilder("spark.kubernetes.executor.apiPollingInterval") .doc("Interval between polls against the Kubernetes API server to inspect the " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 192b5993efe07..10f26bd441ead 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -45,15 +45,18 @@ class ExecutorPodsPollingSnapshotSource( pollingExecutor: ScheduledExecutorService) extends Logging { private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + private val pollingEnabled = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_POLLING) private var pollingFuture: Future[_] = _ @Since("3.1.3") def start(applicationId: String): Unit = { - require(pollingFuture == null, "Cannot start polling more than once.") - logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") - pollingFuture = pollingExecutor.scheduleWithFixedDelay( - new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + if (pollingEnabled) { + require(pollingFuture == null, "Cannot start polling more than once.") + logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") + pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } } @Since("3.1.3") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 06d942eb5b36f..3508e4be419ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -22,7 +22,9 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.spark.SparkConf import org.apache.spark.annotation.{DeveloperApi, Since, Stable} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -38,19 +40,23 @@ import org.apache.spark.util.Utils @DeveloperApi class ExecutorPodsWatchSnapshotSource( snapshotsStore: ExecutorPodsSnapshotsStore, - kubernetesClient: KubernetesClient) extends Logging { + kubernetesClient: KubernetesClient, + conf: SparkConf) extends Logging { private var watchConnection: Closeable = _ + private val enablePolling = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER) @Since("3.1.3") def start(applicationId: String): Unit = { - require(watchConnection == null, "Cannot start the watcher twice.") - logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + - s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.") - watchConnection = kubernetesClient.pods() - .withLabel(SPARK_APP_ID_LABEL, applicationId) - .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) - .watch(new ExecutorPodsWatcher()) + if (enablePolling) { + require(watchConnection == null, "Cannot start the watcher twice.") + logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + + s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.") + watchConnection = kubernetesClient.pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .watch(new ExecutorPodsWatcher()) + } } @Since("3.1.3") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 10ea3a8cb0e46..507d2b310b7cb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -114,7 +114,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, - kubernetesClient) + kubernetesClient, + sc.conf) val eventsPollingExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( "kubernetes-executor-pod-polling-sync") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala index 11b604a4d8322..b1c51ef897e40 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder} import io.fabric8.kubernetes.client.KubernetesClient import org.jmock.lib.concurrent.DeterministicScheduler import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.{verify, when} +import org.mockito.Mockito.{never, verify, when} import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} @@ -33,9 +33,9 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAndAfter { - private val sparkConf = new SparkConf + private val defaultConf = new SparkConf() - private val pollingInterval = sparkConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + private val pollingInterval = defaultConf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) @Mock private var kubernetesClient: KubernetesClient = _ @@ -61,12 +61,6 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn before { MockitoAnnotations.openMocks(this).close() pollingExecutor = new DeterministicScheduler() - pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource( - sparkConf, - kubernetesClient, - eventQueue, - pollingExecutor) - pollingSourceUnderTest.start(TEST_SPARK_APP_ID) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) .thenReturn(appIdLabeledPods) @@ -77,6 +71,13 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn } test("Items returned by the API should be pushed to the event queue") { + val sparkConf = new SparkConf() + pollingSourceUnderTest = new ExecutorPodsPollingSnapshotSource( + sparkConf, + kubernetesClient, + eventQueue, + pollingExecutor) + pollingSourceUnderTest.start(TEST_SPARK_APP_ID) val exec1 = runningExecutor(1) val exec2 = runningExecutor(2) when(activeExecutorPods.list()) @@ -89,13 +90,27 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn verify(eventQueue).replaceSnapshot(Seq(exec1, exec2)) } + test("If polling is disabled we don't call pods() on the client") { + val sparkConf = new SparkConf() + val source = new ExecutorPodsPollingSnapshotSource( + sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false), + kubernetesClient, + eventQueue, + pollingExecutor) + source.start(TEST_SPARK_APP_ID) + pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS) + verify(kubernetesClient, never()).pods() + } + test("SPARK-36334: Support pod listing with resource version") { Seq(true, false).foreach { value => + val sparkConf = new SparkConf() val source = new ExecutorPodsPollingSnapshotSource( sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value), kubernetesClient, eventQueue, pollingExecutor) + source.start(TEST_SPARK_APP_ID) pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS) if (value) { verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala index cddb5f6da44f4..5797dc659cd13 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala @@ -20,10 +20,12 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} -import org.mockito.Mockito.{verify, when} +import org.mockito.Mockito.{never, verify, when} import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkConf import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ @@ -61,12 +63,13 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) .thenReturn(executorRoleLabeledPods) when(executorRoleLabeledPods.watch(watch.capture())).thenReturn(watchConnection) - watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource( - eventQueue, kubernetesClient) - watchSourceUnderTest.start(TEST_SPARK_APP_ID) } test("Watch events should be pushed to the snapshots store as snapshot updates.") { + val conf = new SparkConf() + watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource( + eventQueue, kubernetesClient, conf) + watchSourceUnderTest.start(TEST_SPARK_APP_ID) val exec1 = runningExecutor(1) val exec2 = runningExecutor(2) watch.getValue.eventReceived(Action.ADDED, exec1) @@ -74,4 +77,13 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA verify(eventQueue).updatePod(exec1) verify(eventQueue).updatePod(exec2) } + + test("Verify if watchers are disabled we don't call pods() on the client") { + val conf = new SparkConf() + conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false) + watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource( + eventQueue, kubernetesClient, conf) + watchSourceUnderTest.start(TEST_SPARK_APP_ID) + verify(kubernetesClient, never()).pods() + } } From efa0e5a5dc4a1a7bdf42a44b3fbe5d652fc7a316 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 2 May 2022 12:25:05 -0700 Subject: [PATCH 02/11] CR-feedback --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 ++ .../cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 4ae15cd46a074..3c4c501f3d997 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -489,6 +489,7 @@ private[spark] object Config extends Logging { "You should leave this enabled unless your encountering performance issues with your etcd.") .version("3.3.0") .booleanConf + .internal() .createWithDefault(true) val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER = @@ -497,6 +498,7 @@ private[spark] object Config extends Logging { "You should leave this enabled unless your encountering performance issues with your etcd.") .version("3.3.0") .booleanConf + .internal() .createWithDefault(true) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala index b1c51ef897e40..e0016a2ae0503 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSourceSuite.scala @@ -90,7 +90,7 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn verify(eventQueue).replaceSnapshot(Seq(exec1, exec2)) } - test("If polling is disabled we don't call pods() on the client") { + test("SPARK-36462: If polling is disabled we don't call pods() on the client") { val sparkConf = new SparkConf() val source = new ExecutorPodsPollingSnapshotSource( sparkConf.set(KUBERNETES_EXECUTOR_ENABLE_API_POLLING, false), From eac0698580aac2761ef9e15700d693ed0e0249b4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 3 May 2022 14:45:44 -0700 Subject: [PATCH 03/11] Update resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala Co-authored-by: Martin Grigorov --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 3c4c501f3d997..aeff933805fee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -486,7 +486,7 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_ENABLE_API_POLLING = ConfigBuilder("spark.kubernetes.executor.enableApiPolling") .doc("If Spark should poll Kubernetes for executor pod status. " + - "You should leave this enabled unless your encountering performance issues with your etcd.") + "You should leave this enabled unless you're encountering performance issues with your etcd.") .version("3.3.0") .booleanConf .internal() From a030c0bc62ea25d683441a8ff4c40f0e6bd68ffc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 3 May 2022 14:45:53 -0700 Subject: [PATCH 04/11] Update resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala Co-authored-by: Martin Grigorov --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index aeff933805fee..2415e9aece39b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -495,7 +495,7 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER = ConfigBuilder("spark.kubernetes.executor.enableApiWatcher") .doc("If Spark should create watchers for executor pod status. " + - "You should leave this enabled unless your encountering performance issues with your etcd.") + "You should leave this enabled unless you're encountering performance issues with your etcd.") .version("3.3.0") .booleanConf .internal() From 346f40f6908fdd8cbf46e134cd8cc045f1802a74 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 3 May 2022 14:46:03 -0700 Subject: [PATCH 05/11] Update resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala Co-authored-by: Martin Grigorov --- .../cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala index 5797dc659cd13..8209bee7a02b7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala @@ -78,7 +78,7 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA verify(eventQueue).updatePod(exec2) } - test("Verify if watchers are disabled we don't call pods() on the client") { + test("SPARK-36462: Verify if watchers are disabled we don't call pods() on the client") { val conf = new SparkConf() conf.set(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER, false) watchSourceUnderTest = new ExecutorPodsWatchSnapshotSource( From 6fef69d201a449114ef30244dd90985913d485c8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 3 May 2022 14:46:11 -0700 Subject: [PATCH 06/11] Update resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala Co-authored-by: Martin Grigorov --- .../scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 3508e4be419ff..e2a04c217f613 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -50,7 +50,7 @@ class ExecutorPodsWatchSnapshotSource( def start(applicationId: String): Unit = { if (enablePolling) { require(watchConnection == null, "Cannot start the watcher twice.") - logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + + logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.") watchConnection = kubernetesClient.pods() .withLabel(SPARK_APP_ID_LABEL, applicationId) From c8523c113a6794f899ed7e95e42f749a0c43355b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 3 May 2022 14:51:25 -0700 Subject: [PATCH 07/11] CR feedback it's enableWatching not polling update var name to match. --- .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index e2a04c217f613..68643931a87d2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -44,11 +44,11 @@ class ExecutorPodsWatchSnapshotSource( conf: SparkConf) extends Logging { private var watchConnection: Closeable = _ - private val enablePolling = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER) + private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER) @Since("3.1.3") def start(applicationId: String): Unit = { - if (enablePolling) { + if (enableWatching) { require(watchConnection == null, "Cannot start the watcher twice.") logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," + s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.") From 9137c4d4e0328ef984116fa08864bb63d299dd0f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jun 2022 14:39:47 -0700 Subject: [PATCH 08/11] Update version since we missed the boat on 3.3.0 --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2415e9aece39b..abb8317d5dcbc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -487,7 +487,7 @@ private[spark] object Config extends Logging { ConfigBuilder("spark.kubernetes.executor.enableApiPolling") .doc("If Spark should poll Kubernetes for executor pod status. " + "You should leave this enabled unless you're encountering performance issues with your etcd.") - .version("3.3.0") + .version("3.4.0") .booleanConf .internal() .createWithDefault(true) @@ -496,7 +496,7 @@ private[spark] object Config extends Logging { ConfigBuilder("spark.kubernetes.executor.enableApiWatcher") .doc("If Spark should create watchers for executor pod status. " + "You should leave this enabled unless you're encountering performance issues with your etcd.") - .version("3.3.0") + .version("3.4.0") .booleanConf .internal() .createWithDefault(true) From f0db5a776ffc226e7b6f87db226be20823984fbf Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jun 2022 14:42:22 -0700 Subject: [PATCH 09/11] Fix style --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index abb8317d5dcbc..815a31f81338f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -486,7 +486,7 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_ENABLE_API_POLLING = ConfigBuilder("spark.kubernetes.executor.enableApiPolling") .doc("If Spark should poll Kubernetes for executor pod status. " + - "You should leave this enabled unless you're encountering performance issues with your etcd.") + "You should leave this enabled unless you're encountering issues with your etcd.") .version("3.4.0") .booleanConf .internal() @@ -495,7 +495,7 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER = ConfigBuilder("spark.kubernetes.executor.enableApiWatcher") .doc("If Spark should create watchers for executor pod status. " + - "You should leave this enabled unless you're encountering performance issues with your etcd.") + "You should leave this enabled unless you're encountering issues with your etcd.") .version("3.4.0") .booleanConf .internal() From adf4e9aa57477a0cc2f0e6ecbc931576b94a9040 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 10 Aug 2022 16:09:01 -0700 Subject: [PATCH 10/11] Move internal up above booleanConf --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 815a31f81338f..b8e9b92159b65 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -488,8 +488,8 @@ private[spark] object Config extends Logging { .doc("If Spark should poll Kubernetes for executor pod status. " + "You should leave this enabled unless you're encountering issues with your etcd.") .version("3.4.0") - .booleanConf .internal() + .booleanConf .createWithDefault(true) val KUBERNETES_EXECUTOR_ENABLE_API_WATCHER = @@ -497,8 +497,8 @@ private[spark] object Config extends Logging { .doc("If Spark should create watchers for executor pod status. " + "You should leave this enabled unless you're encountering issues with your etcd.") .version("3.4.0") - .booleanConf .internal() + .booleanConf .createWithDefault(true) From 9f5d4605db8e065108207c311795a29f7d1ef938 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 12 Aug 2022 16:26:25 -0700 Subject: [PATCH 11/11] CR Feedback: add back legacy two parameter constructor, use default running sc to grab conf if no conf specified. --- .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 68643931a87d2..a334ece565377 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException} import io.fabric8.kubernetes.client.Watcher.Action -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER import org.apache.spark.deploy.k8s.Constants._ @@ -46,6 +46,11 @@ class ExecutorPodsWatchSnapshotSource( private var watchConnection: Closeable = _ private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER) + // If we're constructed with the old API get the SparkConf from the running SparkContext. + def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = { + this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf) + } + @Since("3.1.3") def start(applicationId: String): Unit = { if (enableWatching) {