Skip to content

Commit

Permalink
[SPARK-45474][CORE][WEBUI] Support top-level filtering in MasterPage …
Browse files Browse the repository at this point in the history
…JSON API

### What changes were proposed in this pull request?

This PR aims to support `top-level` filtering in `MasterPage` JSON API.

### Why are the changes needed?

**BEFORE**

Since Apache Spark's `MasterPage` JSON API always returns a full information, we need a post-processing like the following via `jq`.
```
$ curl -s -k http://localhost:8080/json/ | jq .completedapps
[
  {
    "id": "app-20231009193946-0000",
    "starttime": 1696905586694,
    "name": "Spark shell",
    "cores": 10,
    "user": "dongjoon",
    "memoryperexecutor": 1024,
    "memoryperslave": 1024,
    "resourcesperexecutor": [],
    "resourcesperslave": [],
    "submitdate": "Mon Oct 09 19:39:46 PDT 2023",
    "state": "FINISHED",
    "duration": 115686
  }
]
```

**AFTER**

Apache Spark `MasterPage` provided a filtered result via REST API style for the top-level fields.
```
$ curl -s -k http://localhost:8080/json/completedapps
{
  "completedapps" : [ {
    "id" : "app-20231009193946-0000",
    "starttime" : 1696905586694,
    "name" : "Spark shell",
    "cores" : 10,
    "user" : "dongjoon",
    "memoryperexecutor" : 1024,
    "memoryperslave" : 1024,
    "resourcesperexecutor" : [ ],
    "resourcesperslave" : [ ],
    "submitdate" : "Mon Oct 09 19:39:46 PDT 2023",
    "state" : "FINISHED",
    "duration" : 115686
  } ]
}
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with a new test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#43303 from dongjoon-hyun/SPARK-45474.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
dongjoon-hyun committed Oct 10, 2023
1 parent d073f2d commit 3a51a58
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 18 deletions.
73 changes: 56 additions & 17 deletions core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private[deploy] object JsonProtocol {
* Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the
* information of a master node.
*
* @return a Json object containing the following fields:
* @return a Json object containing the following fields if `field` is None:
* `url` the url of the master node
* `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the
* master
Expand All @@ -208,24 +208,63 @@ private[deploy] object JsonProtocol {
* `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers
* of the master
* `status` status of the master,
* see [[org.apache.spark.deploy.master.RecoveryState.MasterState]]
* see [[org.apache.spark.deploy.master.RecoveryState.MasterState]].
* If `field` is not None, the Json object will contain the matched field.
* If `field` doesn't match, the Json object `(field -> "")` is returned.
*/
def writeMasterState(obj: MasterStateResponse): JObject = {
def writeMasterState(obj: MasterStateResponse, field: Option[String] = None): JObject = {
val aliveWorkers = obj.workers.filter(_.isAlive())
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("aliveworkers" -> aliveWorkers.length) ~
("cores" -> aliveWorkers.map(_.cores).sum) ~
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
("memory" -> aliveWorkers.map(_.memory).sum) ~
("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~
("resources" -> aliveWorkers.map(_.resourcesInfo).toList.map(writeResourcesInfo)) ~
("resourcesused" -> aliveWorkers.map(_.resourcesInfoUsed).toList.map(writeResourcesInfo)) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString)
field match {
case None =>
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map (writeWorkerInfo) ) ~
("aliveworkers" -> aliveWorkers.length) ~
("cores" -> aliveWorkers.map (_.cores).sum) ~
("coresused" -> aliveWorkers.map (_.coresUsed).sum) ~
("memory" -> aliveWorkers.map (_.memory).sum) ~
("memoryused" -> aliveWorkers.map (_.memoryUsed).sum) ~
("resources" -> aliveWorkers.map (_.resourcesInfo).toList.map (writeResourcesInfo) ) ~
("resourcesused" ->
aliveWorkers.map (_.resourcesInfoUsed).toList.map (writeResourcesInfo) ) ~
("activeapps" -> obj.activeApps.toList.map (writeApplicationInfo) ) ~
("completedapps" -> obj.completedApps.toList.map (writeApplicationInfo) ) ~
("activedrivers" -> obj.activeDrivers.toList.map (writeDriverInfo) ) ~
("completeddrivers" -> obj.completedDrivers.toList.map (writeDriverInfo) ) ~
("status" -> obj.status.toString)
case Some(field) =>
field match {
case "url" =>
("url" -> obj.uri)
case "workers" =>
("workers" -> obj.workers.toList.map (writeWorkerInfo) )
case "aliveworkers" =>
("aliveworkers" -> aliveWorkers.length)
case "cores" =>
("cores" -> aliveWorkers.map (_.cores).sum)
case "coresused" =>
("coresused" -> aliveWorkers.map (_.coresUsed).sum)
case "memory" =>
("memory" -> aliveWorkers.map (_.memory).sum)
case "memoryused" =>
("memoryused" -> aliveWorkers.map (_.memoryUsed).sum)
case "resources" =>
("resources" -> aliveWorkers.map (_.resourcesInfo).toList.map (writeResourcesInfo) )
case "resourcesused" =>
("resourcesused" ->
aliveWorkers.map (_.resourcesInfoUsed).toList.map (writeResourcesInfo) )
case "activeapps" =>
("activeapps" -> obj.activeApps.toList.map (writeApplicationInfo) )
case "completedapps" =>
("completedapps" -> obj.completedApps.toList.map (writeApplicationInfo) )
case "activedrivers" =>
("activedrivers" -> obj.activeDrivers.toList.map (writeDriverInfo) )
case "completeddrivers" =>
("completeddrivers" -> obj.completedDrivers.toList.map (writeDriverInfo) )
case "status" =>
("status" -> obj.status.toString)
case field => (field -> "")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ import org.apache.spark.util.Utils

private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val master = parent.masterEndpointRef
private val jsonFieldPattern = "/json/([a-zA-Z]+).*".r

def getMasterState: MasterStateResponse = {
master.askSync[MasterStateResponse](RequestMasterState)
}

override def renderJson(request: HttpServletRequest): JValue = {
JsonProtocol.writeMasterState(getMasterState)
jsonFieldPattern.findFirstMatchIn(request.getRequestURI()) match {
case None => JsonProtocol.writeMasterState(getMasterState)
case Some(m) => JsonProtocol.writeMasterState(getMasterState, Some(m.group(1)))
}
}

def handleAppKillRequest(request: HttpServletRequest): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr))
}

test("SPARK-45474: filtered writeMasterState") {
val workers = Array(createWorkerInfo(), createWorkerInfo())
val activeApps = Array(createAppInfo())
val completedApps = Array.empty[ApplicationInfo]
val activeDrivers = Array(createDriverInfo())
val completedDrivers = Array(createDriverInfo())
val stateResponse = new MasterStateResponse(
"host", 8080, None, workers, activeApps, completedApps,
activeDrivers, completedDrivers, RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse, Some("activedrivers"))
assertValidJson(output)

val expected = """{"activedrivers":[%s]}""".format(JsonConstants.driverInfoJsonStr).stripMargin
assertValidDataInJson(output, JsonMethods.parse(expected))
}

test("writeWorkerState") {
val executors = List[ExecutorRunner]()
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123, true),
Expand Down

0 comments on commit 3a51a58

Please sign in to comment.