From 20b593811dc02c96c71978851e051d32bf8c3496 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 27 Jan 2024 20:24:15 -0800 Subject: [PATCH] [SPARK-46888][CORE] Fix `Master` to reject `/workers/kill/` requests if decommission is disabled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? This PR aims to fix `Master` to reject `/workers/kill/` request if `spark.decommission.enabled` is `false` in order to fix the dangling worker issue. ### Why are the changes needed? Currently, `spark.decommission.enabled` is `false` by default. So, when a user asks to decommission, only Master marked it `DECOMMISSIONED` while the worker is alive. ``` $ curl -XPOST http://localhost:8080/workers/kill/\?host\=127.0.0.1 ``` **Master UI** ![Screenshot 2024-01-27 at 6 19 18 PM](https://github.com/apache/spark/assets/9700541/443bfc32-b924-438a-8bf6-c64b9afbc4be) **Worker Log** ``` 24/01/27 18:18:06 WARN Worker: Receive decommission request, but decommission feature is disabled. ``` To be consistent with the existing `Worker` behavior which ignores the request. https://github.com/apache/spark/blob/1787a5261e87e0214a3f803f6534c5e52a0138e6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L859-L868 ### Does this PR introduce _any_ user-facing change? No, this is a bug fix. ### How was this patch tested? Pass the CI with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44915 from dongjoon-hyun/SPARK-46888. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../spark/deploy/master/ui/MasterWebUI.scala | 4 +++- .../spark/deploy/master/MasterSuite.scala | 21 +++++++++++++++++++ .../deploy/master/ui/MasterWebUISuite.scala | 3 ++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index d71ef8b9e36e7..3025c0bf468b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE import org.apache.spark.internal.config.UI.UI_KILL_ENABLED import org.apache.spark.ui.{SparkUI, WebUI} @@ -41,6 +42,7 @@ class MasterWebUI( val masterEndpointRef = master.self val killEnabled = master.conf.get(UI_KILL_ENABLED) + val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED) val decommissionAllowMode = master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE) initialize() @@ -60,7 +62,7 @@ class MasterWebUI( override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = { val hostnames: Seq[String] = Option(req.getParameterValues("host")) .getOrElse(Array[String]()).toImmutableArraySeq - if (!isDecommissioningRequestAllowed(req)) { + if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) { resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) } else { val removedWorkers = masterEndpointRef.askSync[Integer]( diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 6966a7f660b2f..0db58ae0c834e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.master +import java.net.{HttpURLConnection, URL} import java.util.Date import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger @@ -444,6 +445,26 @@ class MasterSuite extends SparkFunSuite } } + test("SPARK-46888: master should reject worker kill request if decommision is disabled") { + implicit val formats = org.json4s.DefaultFormats + val conf = new SparkConf() + .set(DECOMMISSION_ENABLED, false) + .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW") + val localCluster = LocalSparkCluster(1, 1, 512, conf) + localCluster.start() + val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}" + try { + eventually(timeout(30.seconds), interval(100.milliseconds)) { + val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}") + val conn = url.openConnection().asInstanceOf[HttpURLConnection] + conn.setRequestMethod("POST") + assert(conn.getResponseCode === 405) + } + } finally { + localCluster.stop() + } + } + test("master/worker web ui available") { implicit val formats = org.json4s.DefaultFormats val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala index 024511189accc..40265a12af93b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala @@ -30,12 +30,13 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, KillDriverResponse, RequestKillDriver} import org.apache.spark.deploy.DeployTestUtils._ import org.apache.spark.deploy.master._ +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv} import org.apache.spark.util.Utils class MasterWebUISuite extends SparkFunSuite { - val conf = new SparkConf() + val conf = new SparkConf().set(DECOMMISSION_ENABLED, true) val securityMgr = new SecurityManager(conf) val rpcEnv = mock(classOf[RpcEnv]) val master = mock(classOf[Master])