Skip to content

Commit

Permalink
[SPARK-46888][CORE] Fix Master to reject /workers/kill/ requests …
Browse files Browse the repository at this point in the history
…if decommission is disabled

This PR aims to fix `Master` to reject `/workers/kill/` request if `spark.decommission.enabled` is `false` in order to fix the dangling worker issue.

Currently, `spark.decommission.enabled` is `false` by default. So, when a user asks to decommission, only Master marked it `DECOMMISSIONED` while the worker is alive.
```
$ curl -XPOST http://localhost:8080/workers/kill/\?host\=127.0.0.1
```

**Master UI**
![Screenshot 2024-01-27 at 6 19 18 PM](https://github.com/apache/spark/assets/9700541/443bfc32-b924-438a-8bf6-c64b9afbc4be)

**Worker Log**
```
24/01/27 18:18:06 WARN Worker: Receive decommission request, but decommission feature is disabled.
```

To be consistent with the existing `Worker` behavior which ignores the request.

https://github.com/apache/spark/blob/1787a5261e87e0214a3f803f6534c5e52a0138e6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala#L859-L868

No, this is a bug fix.

Pass the CI with the newly added test case.

No.

Closes #44915 from dongjoon-hyun/SPARK-46888.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 20b5938)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
dongjoon-hyun committed Jan 28, 2024
1 parent a2854ba commit accfb39
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
import org.apache.spark.ui.{SparkUI, WebUI}
Expand All @@ -40,6 +41,7 @@ class MasterWebUI(

val masterEndpointRef = master.self
val killEnabled = master.conf.get(UI_KILL_ENABLED)
val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED)
val decommissionAllowMode = master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE)

initialize()
Expand All @@ -58,7 +60,7 @@ class MasterWebUI(
override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
val hostnames: Seq[String] = Option(req.getParameterValues("host"))
.getOrElse(Array[String]()).toSeq
if (!isDecommissioningRequestAllowed(req)) {
if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
} else {
val removedWorkers = masterEndpointRef.askSync[Integer](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.master

import java.net.{HttpURLConnection, URL}
import java.util.Date
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
Expand Down Expand Up @@ -325,6 +326,26 @@ class MasterSuite extends SparkFunSuite
}
}

test("SPARK-46888: master should reject worker kill request if decommision is disabled") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
.set(DECOMMISSION_ENABLED, false)
.set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW")
val localCluster = LocalSparkCluster(1, 1, 512, conf)
localCluster.start()
val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}"
try {
eventually(timeout(30.seconds), interval(100.milliseconds)) {
val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}")
val conn = url.openConnection().asInstanceOf[HttpURLConnection]
conn.setRequestMethod("POST")
assert(conn.getResponseCode === 405)
}
} finally {
localCluster.stop()
}
}

test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, KillDriverResponse, RequestKillDriver}
import org.apache.spark.deploy.DeployTestUtils._
import org.apache.spark.deploy.master._
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
import org.apache.spark.util.Utils

class MasterWebUISuite extends SparkFunSuite {

val conf = new SparkConf()
val conf = new SparkConf().set(DECOMMISSION_ENABLED, true)
val securityMgr = new SecurityManager(conf)
val rpcEnv = mock(classOf[RpcEnv])
val master = mock(classOf[Master])
Expand Down

0 comments on commit accfb39

Please sign in to comment.