Skip to content

Commit

Permalink
Use Jackson's DefaultScalaModule to simplify messages
Browse files Browse the repository at this point in the history
Instead of explicitly defining getters and setters in the messages,
we let Jackson's scala module do the work. This simplifies the code
for each message significantly, though at the expense of reducing
the level of type safety for users who implement their own clients
and servers.
  • Loading branch information
Andrew Or committed Feb 2, 2015
1 parent 9229433 commit 1f1c03f
Show file tree
Hide file tree
Showing 17 changed files with 360 additions and 495 deletions.
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest
* A request to query the status of a driver in the REST application submission protocol.
*/
class DriverStatusRequest extends SubmitRestProtocolRequest {
private val driverId = new SubmitRestProtocolField[String]
def getDriverId: String = driverId.toString
def setDriverId(s: String): this.type = setField(driverId, s)
var driverId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(driverId, "driverId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,12 @@ package org.apache.spark.deploy.rest
* A response to the [[DriverStatusRequest]] in the REST application submission protocol.
*/
class DriverStatusResponse extends SubmitRestProtocolResponse {
private val driverId = new SubmitRestProtocolField[String]
// standalone cluster mode only
private val driverState = new SubmitRestProtocolField[String]
private val workerId = new SubmitRestProtocolField[String]
private val workerHostPort = new SubmitRestProtocolField[String]

def getDriverId: String = driverId.toString
def getDriverState: String = driverState.toString
def getWorkerId: String = workerId.toString
def getWorkerHostPort: String = workerHostPort.toString
var driverId: String = null

def setDriverId(s: String): this.type = setField(driverId, s)
def setDriverState(s: String): this.type = setField(driverState, s)
def setWorkerId(s: String): this.type = setField(workerId, s)
def setWorkerHostPort(s: String): this.type = setField(workerHostPort, s)
// standalone cluster mode only
var driverState: String = null
var workerId: String = null
var workerHostPort: String = null

protected override def doValidate(): Unit = {
super.doValidate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@

package org.apache.spark.deploy.rest

import com.fasterxml.jackson.annotation.JsonIgnore

/**
* An error response message used in the REST application submission protocol.
*/
class ErrorResponse extends SubmitRestProtocolResponse {
setSuccess("false")

// Don't bother logging success = false in the JSON
@JsonIgnore
override def getSuccess: String = super.getSuccess
// request was unsuccessful
success = "false"

protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(message, "message")
assert(!success.toBoolean, s"The 'success' field must be false in $messageType.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest
* A request to kill a driver in the REST application submission protocol.
*/
class KillDriverRequest extends SubmitRestProtocolRequest {
private val driverId = new SubmitRestProtocolField[String]
def getDriverId: String = driverId.toString
def setDriverId(s: String): this.type = setField(driverId, s)
var driverId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(driverId, "driverId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ package org.apache.spark.deploy.rest
* A response to the [[KillDriverRequest]] in the REST application submission protocol.
*/
class KillDriverResponse extends SubmitRestProtocolResponse {
private val driverId = new SubmitRestProtocolField[String]
def getDriverId: String = driverId.toString
def setDriverId(s: String): this.type = setField(driverId, s)
var driverId: String = null
protected override def doValidate(): Unit = {
super.doValidate()
assertFieldIsSet(driverId, "driverId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
case s: SubmitDriverResponse => s
case _ => return response
}
val submitSuccess = submitResponse.getSuccess.toBoolean
// Report status of submitted driver to user
val submitSuccess = submitResponse.success.toBoolean
if (submitSuccess) {
val driverId = submitResponse.getDriverId
logInfo(s"Driver successfully submitted as $driverId. Polling driver state...")
pollSubmittedDriverStatus(args.master, driverId)
val driverId = submitResponse.driverId
if (driverId != null) {
logInfo(s"Driver successfully submitted as $driverId. Polling driver state...")
pollSubmittedDriverStatus(args.master, driverId)
} else {
logError("Application successfully submitted, but driver ID was not provided!")
}
} else {
val submitMessage = submitResponse.getMessage
logError(s"Application submission failed: $submitMessage")
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
logError("Application submission failed" + failMessage)
}
submitResponse
}
Expand Down Expand Up @@ -78,12 +83,12 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
case s: DriverStatusResponse => s
case _ => return
}
val statusSuccess = statusResponse.getSuccess.toBoolean
val statusSuccess = statusResponse.success.toBoolean
if (statusSuccess) {
val driverState = Option(statusResponse.getDriverState)
val workerId = Option(statusResponse.getWorkerId)
val workerHostPort = Option(statusResponse.getWorkerHostPort)
val exception = Option(statusResponse.getMessage)
val driverState = Option(statusResponse.driverState)
val workerId = Option(statusResponse.workerId)
val workerHostPort = Option(statusResponse.workerHostPort)
val exception = Option(statusResponse.message)
// Log driver state, if present
driverState match {
case Some(state) => logInfo(s"State of driver $driverId is now $state.")
Expand All @@ -105,21 +110,21 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {

/** Construct a submit driver request message. */
protected override def constructSubmitRequest(args: SparkSubmitArguments): SubmitDriverRequest = {
val message = new SubmitDriverRequest()
.setSparkVersion(sparkVersion)
.setAppName(args.name)
.setAppResource(args.primaryResource)
.setMainClass(args.mainClass)
.setJars(args.jars)
.setFiles(args.files)
.setDriverMemory(args.driverMemory)
.setDriverCores(args.driverCores)
.setDriverExtraJavaOptions(args.driverExtraJavaOptions)
.setDriverExtraClassPath(args.driverExtraClassPath)
.setDriverExtraLibraryPath(args.driverExtraLibraryPath)
.setSuperviseDriver(args.supervise.toString)
.setExecutorMemory(args.executorMemory)
.setTotalExecutorCores(args.totalExecutorCores)
val message = new SubmitDriverRequest
message.clientSparkVersion = sparkVersion
message.appName = args.name
message.appResource = args.primaryResource
message.mainClass = args.mainClass
message.jars = args.jars
message.files = args.files
message.driverMemory = args.driverMemory
message.driverCores = args.driverCores
message.driverExtraJavaOptions = args.driverExtraJavaOptions
message.driverExtraClassPath = args.driverExtraClassPath
message.driverExtraLibraryPath = args.driverExtraLibraryPath
message.superviseDriver = args.supervise.toString
message.executorMemory = args.executorMemory
message.totalExecutorCores = args.totalExecutorCores
args.childArgs.foreach(message.addAppArg)
args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
sys.env.foreach { case (k, v) =>
Expand All @@ -132,18 +137,20 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
protected override def constructKillRequest(
master: String,
driverId: String): KillDriverRequest = {
new KillDriverRequest()
.setSparkVersion(sparkVersion)
.setDriverId(driverId)
val k = new KillDriverRequest
k.clientSparkVersion = sparkVersion
k.driverId = driverId
k
}

/** Construct a driver status request message. */
protected override def constructStatusRequest(
master: String,
driverId: String): DriverStatusRequest = {
new DriverStatusRequest()
.setSparkVersion(sparkVersion)
.setDriverId(driverId)
val d = new DriverStatusRequest
d.clientSparkVersion = sparkVersion
d.driverId = driverId
d
}

/** Extract the URL portion of the master address. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,42 @@ private[spark] class StandaloneRestServerHandler(
val driverDescription = buildDriverDescription(request)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
new SubmitDriverResponse()
.setSparkVersion(sparkVersion)
.setMessage(response.message)
.setSuccess(response.success.toString)
.setDriverId(response.driverId.orNull)
val s = new SubmitDriverResponse
s.serverSparkVersion = sparkVersion
s.message = response.message
s.success = response.success.toString
s.driverId = response.driverId.orNull
s
}

/** Handle a request to kill a driver. */
protected override def handleKill(request: KillDriverRequest): KillDriverResponse = {
val driverId = request.getDriverId
val driverId = request.driverId
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(driverId), masterActor, askTimeout)
new KillDriverResponse()
.setSparkVersion(sparkVersion)
.setMessage(response.message)
.setDriverId(driverId)
.setSuccess(response.success.toString)
val k = new KillDriverResponse
k.serverSparkVersion = sparkVersion
k.message = response.message
k.driverId = driverId
k.success = response.success.toString
k
}

/** Handle a request for a driver's status. */
protected override def handleStatus(request: DriverStatusRequest): DriverStatusResponse = {
val driverId = request.getDriverId
val driverId = request.driverId
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(driverId), masterActor, askTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
new DriverStatusResponse()
.setSparkVersion(sparkVersion)
.setDriverId(driverId)
.setSuccess(response.found.toString)
.setDriverState(response.state.map(_.toString).orNull)
.setWorkerId(response.workerId.orNull)
.setWorkerHostPort(response.workerHostPort.orNull)
.setMessage(message.orNull)
val d = new DriverStatusResponse
d.serverSparkVersion = sparkVersion
d.driverId = driverId
d.success = response.found.toString
d.driverState = response.state.map(_.toString).orNull
d.workerId = response.workerId.orNull
d.workerHostPort = response.workerHostPort.orNull
d.message = message.orNull
d
}

/**
Expand All @@ -101,27 +104,27 @@ private[spark] class StandaloneRestServerHandler(
*/
private def buildDriverDescription(request: SubmitDriverRequest): DriverDescription = {
// Required fields, including the main class because python is not yet supported
val appName = request.getAppName
val appResource = request.getAppResource
val mainClass = request.getMainClass
val appName = request.appName
val appResource = request.appResource
val mainClass = request.mainClass
if (mainClass == null) {
throw new SubmitRestMissingFieldException("Main class must be set in submit request.")
}

// Optional fields
val jars = Option(request.getJars)
val files = Option(request.getFiles)
val driverMemory = Option(request.getDriverMemory)
val driverCores = Option(request.getDriverCores)
val driverExtraJavaOptions = Option(request.getDriverExtraJavaOptions)
val driverExtraClassPath = Option(request.getDriverExtraClassPath)
val driverExtraLibraryPath = Option(request.getDriverExtraLibraryPath)
val superviseDriver = Option(request.getSuperviseDriver)
val executorMemory = Option(request.getExecutorMemory)
val totalExecutorCores = Option(request.getTotalExecutorCores)
val appArgs = request.getAppArgs
val sparkProperties = request.getSparkProperties
val environmentVariables = request.getEnvironmentVariables
val jars = Option(request.jars)
val files = Option(request.files)
val driverMemory = Option(request.driverMemory)
val driverCores = Option(request.driverCores)
val driverExtraJavaOptions = Option(request.driverExtraJavaOptions)
val driverExtraClassPath = Option(request.driverExtraClassPath)
val driverExtraLibraryPath = Option(request.driverExtraLibraryPath)
val superviseDriver = Option(request.superviseDriver)
val executorMemory = Option(request.executorMemory)
val totalExecutorCores = Option(request.totalExecutorCores)
val appArgs = request.appArgs
val sparkProperties = request.sparkProperties
val environmentVariables = request.environmentVariables

// Translate all fields to the relevant Spark properties
val conf = new SparkConf(false)
Expand Down
Loading

0 comments on commit 1f1c03f

Please sign in to comment.