diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index d71ef8b9e36e7..3025c0bf468b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -23,6 +23,7 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.internal.config.UI.MASTER_UI_DECOMMISSION_ALLOW_MODE import org.apache.spark.internal.config.UI.UI_KILL_ENABLED import org.apache.spark.ui.{SparkUI, WebUI} @@ -41,6 +42,7 @@ class MasterWebUI( val masterEndpointRef = master.self val killEnabled = master.conf.get(UI_KILL_ENABLED) + val decommissionDisabled = !master.conf.get(DECOMMISSION_ENABLED) val decommissionAllowMode = master.conf.get(MASTER_UI_DECOMMISSION_ALLOW_MODE) initialize() @@ -60,7 +62,7 @@ class MasterWebUI( override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = { val hostnames: Seq[String] = Option(req.getParameterValues("host")) .getOrElse(Array[String]()).toImmutableArraySeq - if (!isDecommissioningRequestAllowed(req)) { + if (decommissionDisabled || !isDecommissioningRequestAllowed(req)) { resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) } else { val removedWorkers = masterEndpointRef.askSync[Integer]( diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 6966a7f660b2f..0db58ae0c834e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.master +import java.net.{HttpURLConnection, URL} import java.util.Date import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger @@ -444,6 +445,26 @@ class MasterSuite extends SparkFunSuite } } + test("SPARK-46888: master should reject worker kill request if decommision is disabled") { + implicit val formats = org.json4s.DefaultFormats + val conf = new SparkConf() + .set(DECOMMISSION_ENABLED, false) + .set(MASTER_UI_DECOMMISSION_ALLOW_MODE, "ALLOW") + val localCluster = LocalSparkCluster(1, 1, 512, conf) + localCluster.start() + val masterUrl = s"http://${Utils.localHostNameForURI()}:${localCluster.masterWebUIPort}" + try { + eventually(timeout(30.seconds), interval(100.milliseconds)) { + val url = new URL(s"$masterUrl/workers/kill/?host=${Utils.localHostNameForURI()}") + val conn = url.openConnection().asInstanceOf[HttpURLConnection] + conn.setRequestMethod("POST") + assert(conn.getResponseCode === 405) + } + } finally { + localCluster.stop() + } + } + test("master/worker web ui available") { implicit val formats = org.json4s.DefaultFormats val conf = new SparkConf() diff --git a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala index 024511189accc..40265a12af93b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/ui/MasterWebUISuite.scala @@ -30,12 +30,13 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.DeployMessages.{DecommissionWorkersOnHosts, KillDriverResponse, RequestKillDriver} import org.apache.spark.deploy.DeployTestUtils._ import org.apache.spark.deploy.master._ +import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv} import org.apache.spark.util.Utils class MasterWebUISuite extends SparkFunSuite { - val conf = new SparkConf() + val conf = new SparkConf().set(DECOMMISSION_ENABLED, true) val securityMgr = new SecurityManager(conf) val rpcEnv = mock(classOf[RpcEnv]) val master = mock(classOf[Master])