diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index f697892aacc83..0a5399a5ba52a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -188,7 +188,7 @@ private[deploy] object JsonProtocol { * Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the * information of a master node. * - * @return a Json object containing the following fields: + * @return a Json object containing the following fields if `field` is None: * `url` the url of the master node * `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the * master @@ -208,24 +208,63 @@ private[deploy] object JsonProtocol { * `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers * of the master * `status` status of the master, - * see [[org.apache.spark.deploy.master.RecoveryState.MasterState]] + * see [[org.apache.spark.deploy.master.RecoveryState.MasterState]]. + * If `field` is not None, the Json object will contain the matched field. + * If `field` doesn't match, the Json object `(field -> "")` is returned. */ - def writeMasterState(obj: MasterStateResponse): JObject = { + def writeMasterState(obj: MasterStateResponse, field: Option[String] = None): JObject = { val aliveWorkers = obj.workers.filter(_.isAlive()) - ("url" -> obj.uri) ~ - ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ - ("aliveworkers" -> aliveWorkers.length) ~ - ("cores" -> aliveWorkers.map(_.cores).sum) ~ - ("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~ - ("memory" -> aliveWorkers.map(_.memory).sum) ~ - ("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~ - ("resources" -> aliveWorkers.map(_.resourcesInfo).toList.map(writeResourcesInfo)) ~ - ("resourcesused" -> aliveWorkers.map(_.resourcesInfoUsed).toList.map(writeResourcesInfo)) ~ - ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ - ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ - ("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~ - ("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~ - ("status" -> obj.status.toString) + field match { + case None => + ("url" -> obj.uri) ~ + ("workers" -> obj.workers.toList.map (writeWorkerInfo) ) ~ + ("aliveworkers" -> aliveWorkers.length) ~ + ("cores" -> aliveWorkers.map (_.cores).sum) ~ + ("coresused" -> aliveWorkers.map (_.coresUsed).sum) ~ + ("memory" -> aliveWorkers.map (_.memory).sum) ~ + ("memoryused" -> aliveWorkers.map (_.memoryUsed).sum) ~ + ("resources" -> aliveWorkers.map (_.resourcesInfo).toList.map (writeResourcesInfo) ) ~ + ("resourcesused" -> + aliveWorkers.map (_.resourcesInfoUsed).toList.map (writeResourcesInfo) ) ~ + ("activeapps" -> obj.activeApps.toList.map (writeApplicationInfo) ) ~ + ("completedapps" -> obj.completedApps.toList.map (writeApplicationInfo) ) ~ + ("activedrivers" -> obj.activeDrivers.toList.map (writeDriverInfo) ) ~ + ("completeddrivers" -> obj.completedDrivers.toList.map (writeDriverInfo) ) ~ + ("status" -> obj.status.toString) + case Some(field) => + field match { + case "url" => + ("url" -> obj.uri) + case "workers" => + ("workers" -> obj.workers.toList.map (writeWorkerInfo) ) + case "aliveworkers" => + ("aliveworkers" -> aliveWorkers.length) + case "cores" => + ("cores" -> aliveWorkers.map (_.cores).sum) + case "coresused" => + ("coresused" -> aliveWorkers.map (_.coresUsed).sum) + case "memory" => + ("memory" -> aliveWorkers.map (_.memory).sum) + case "memoryused" => + ("memoryused" -> aliveWorkers.map (_.memoryUsed).sum) + case "resources" => + ("resources" -> aliveWorkers.map (_.resourcesInfo).toList.map (writeResourcesInfo) ) + case "resourcesused" => + ("resourcesused" -> + aliveWorkers.map (_.resourcesInfoUsed).toList.map (writeResourcesInfo) ) + case "activeapps" => + ("activeapps" -> obj.activeApps.toList.map (writeApplicationInfo) ) + case "completedapps" => + ("completedapps" -> obj.completedApps.toList.map (writeApplicationInfo) ) + case "activedrivers" => + ("activedrivers" -> obj.activeDrivers.toList.map (writeDriverInfo) ) + case "completeddrivers" => + ("completeddrivers" -> obj.completedDrivers.toList.map (writeDriverInfo) ) + case "status" => + ("status" -> obj.status.toString) + case field => (field -> "") + } + } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 02c488656b848..cc4370ad02e06 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -32,13 +32,17 @@ import org.apache.spark.util.Utils private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { private val master = parent.masterEndpointRef + private val jsonFieldPattern = "/json/([a-zA-Z]+).*".r def getMasterState: MasterStateResponse = { master.askSync[MasterStateResponse](RequestMasterState) } override def renderJson(request: HttpServletRequest): JValue = { - JsonProtocol.writeMasterState(getMasterState) + jsonFieldPattern.findFirstMatchIn(request.getRequestURI()) match { + case None => JsonProtocol.writeMasterState(getMasterState) + case Some(m) => JsonProtocol.writeMasterState(getMasterState, Some(m.group(1))) + } } def handleAppKillRequest(request: HttpServletRequest): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 5e62323770e1b..4a6ace6facdea 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -76,6 +76,22 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils { assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr)) } + test("SPARK-45474: filtered writeMasterState") { + val workers = Array(createWorkerInfo(), createWorkerInfo()) + val activeApps = Array(createAppInfo()) + val completedApps = Array.empty[ApplicationInfo] + val activeDrivers = Array(createDriverInfo()) + val completedDrivers = Array(createDriverInfo()) + val stateResponse = new MasterStateResponse( + "host", 8080, None, workers, activeApps, completedApps, + activeDrivers, completedDrivers, RecoveryState.ALIVE) + val output = JsonProtocol.writeMasterState(stateResponse, Some("activedrivers")) + assertValidJson(output) + + val expected = """{"activedrivers":[%s]}""".format(JsonConstants.driverInfoJsonStr).stripMargin + assertValidDataInJson(output, JsonMethods.parse(expected)) + } + test("writeWorkerState") { val executors = List[ExecutorRunner]() val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123, true),