Skip to content

Commit

Permalink
Fix comments and name fields for better error messages
Browse files Browse the repository at this point in the history
This commit also includes comprehensive cleanups across the board
and simplifies the serialization process by eliminating the naming
constraints on the JSON fields.
  • Loading branch information
Andrew Or committed Jan 29, 2015
1 parent 8d43486 commit 3db7379
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.spark.deploy.rest

/**
* A request to query the status of a driver in the REST application submission protocol.
*/
class DriverStatusRequest extends SubmitRestProtocolRequest {
private val driverId = new SubmitRestProtocolField[String]
private val driverId = new SubmitRestProtocolField[String]("driverId")

def getDriverId: String = driverId.toString
def setDriverId(s: String): this.type = setField(driverId, s)

override def validate(): Unit = {
super.validate()
assertFieldIsSet(driverId, "driver_id")
assertFieldIsSet(driverId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.deploy.rest

/**
* A response to the [[DriverStatusRequest]] in the REST application submission protocol.
*/
class DriverStatusResponse extends SubmitRestProtocolResponse {
private val driverId = new SubmitRestProtocolField[String]
private val success = new SubmitRestProtocolField[Boolean]
private val driverState = new SubmitRestProtocolField[String]
private val workerId = new SubmitRestProtocolField[String]
private val workerHostPort = new SubmitRestProtocolField[String]
private val driverId = new SubmitRestProtocolField[String]("driverId")
private val success = new SubmitRestProtocolField[Boolean]("success")
// standalone cluster mode only
private val driverState = new SubmitRestProtocolField[String]("driverState")
private val workerId = new SubmitRestProtocolField[String]("workerId")
private val workerHostPort = new SubmitRestProtocolField[String]("workerHostPort")

def getDriverId: String = driverId.toString
def getSuccess: String = success.toString
Expand All @@ -38,7 +42,7 @@ class DriverStatusResponse extends SubmitRestProtocolResponse {

override def validate(): Unit = {
super.validate()
assertFieldIsSet(driverId, "driver_id")
assertFieldIsSet(success, "success")
assertFieldIsSet(driverId)
assertFieldIsSet(success)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.deploy.rest

/**
* An error response message used in the REST application submission protocol.
*/
class ErrorResponse extends SubmitRestProtocolResponse {
override def validate(): Unit = {
super.validate()
assertFieldIsSet(message, "message")
assertFieldIsSet(message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.spark.deploy.rest

/**
* A request to kill a driver in the REST application submission protocol.
*/
class KillDriverRequest extends SubmitRestProtocolRequest {
private val driverId = new SubmitRestProtocolField[String]
private val driverId = new SubmitRestProtocolField[String]("driverId")

def getDriverId: String = driverId.toString
def setDriverId(s: String): this.type = setField(driverId, s)

override def validate(): Unit = {
super.validate()
assertFieldIsSet(driverId, "driver_id")
assertFieldIsSet(driverId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.deploy.rest

/**
* A response to the [[KillDriverRequest]] in the REST application submission protocol.
*/
class KillDriverResponse extends SubmitRestProtocolResponse {
private val driverId = new SubmitRestProtocolField[String]
private val success = new SubmitRestProtocolField[Boolean]
private val driverId = new SubmitRestProtocolField[String]("driverId")
private val success = new SubmitRestProtocolField[Boolean]("success")

def getDriverId: String = driverId.toString
def getSuccess: String = success.toString
Expand All @@ -29,7 +32,7 @@ class KillDriverResponse extends SubmitRestProtocolResponse {

override def validate(): Unit = {
super.validate()
assertFieldIsSet(driverId, "driver_id")
assertFieldIsSet(success, "success")
assertFieldIsSet(driverId)
assertFieldIsSet(success)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import java.net.URL

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.SparkSubmitArguments
import org.apache.spark.util.Utils

/**
* A client that submits applications to the standalone Master using the stable REST protocol.
* This client is intended to communicate with the StandaloneRestServer. Cluster mode only.
* A client that submits applications to the standalone Master using the REST protocol
* This client is intended to communicate with the [[StandaloneRestServer]]. Cluster mode only.
*/
private[spark] class StandaloneRestClient extends SubmitRestClient {
import StandaloneRestClient._
Expand All @@ -38,7 +37,8 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
* this reports failure and logs an error message provided by the REST server.
*/
override def submitDriver(args: SparkSubmitArguments): SubmitDriverResponse = {
val submitResponse = super.submitDriver(args).asInstanceOf[SubmitDriverResponse]
validateSubmitArgs(args)
val submitResponse = super.submitDriver(args)
val submitSuccess = submitResponse.getSuccess.toBoolean
if (submitSuccess) {
val driverId = submitResponse.getDriverId
Expand All @@ -51,14 +51,25 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
submitResponse
}

/** Request that the REST server kill the specified driver. */
override def killDriver(master: String, driverId: String): KillDriverResponse = {
validateMaster(master)
super.killDriver(master, driverId)
}

/** Request the status of the specified driver from the REST server. */
override def requestDriverStatus(master: String, driverId: String): DriverStatusResponse = {
validateMaster(master)
super.requestDriverStatus(master, driverId)
}

/**
* Poll the status of the driver that was just submitted and report it.
* This retries up to a fixed number of times until giving up.
* Poll the status of the driver that was just submitted and log it.
* This retries up to a fixed number of times before giving up.
*/
private def pollSubmittedDriverStatus(master: String, driverId: String): Unit = {
(1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
val statusResponse = requestDriverStatus(master, driverId)
.asInstanceOf[DriverStatusResponse]
val statusSuccess = statusResponse.getSuccess.toBoolean
if (statusSuccess) {
val driverState = statusResponse.getDriverState
Expand All @@ -75,13 +86,13 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
exception.foreach { e => logError(e) }
return
}
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
}
logError(s"Error: Master did not recognize driver $driverId.")
}

/** Construct a submit driver request message. */
override protected def constructSubmitRequest(
args: SparkSubmitArguments): SubmitDriverRequest = {
protected override def constructSubmitRequest(args: SparkSubmitArguments): SubmitDriverRequest = {
val message = new SubmitDriverRequest()
.setSparkVersion(sparkVersion)
.setAppName(args.name)
Expand All @@ -99,12 +110,14 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
.setTotalExecutorCores(args.totalExecutorCores)
args.childArgs.foreach(message.addAppArg)
args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
// TODO: send special environment variables?
sys.env.foreach { case (k, v) =>
if (k.startsWith("SPARK_")) { message.setEnvironmentVariable(k, v) }
}
message
}

/** Construct a kill driver request message. */
override protected def constructKillRequest(
protected override def constructKillRequest(
master: String,
driverId: String): KillDriverRequest = {
new KillDriverRequest()
Expand All @@ -113,33 +126,34 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
}

/** Construct a driver status request message. */
override protected def constructStatusRequest(
protected override def constructStatusRequest(
master: String,
driverId: String): DriverStatusRequest = {
new DriverStatusRequest()
.setSparkVersion(sparkVersion)
.setDriverId(driverId)
}

/** Extract the URL portion of the master address. */
protected override def getHttpUrl(master: String): URL = {
validateMaster(master)
new URL("http://" + master.stripPrefix("spark://"))
}

/** Throw an exception if this is not standalone mode. */
override protected def validateMaster(master: String): Unit = {
private def validateMaster(master: String): Unit = {
if (!master.startsWith("spark://")) {
throw new IllegalArgumentException("This REST client is only supported in standalone mode.")
}
}

/** Throw an exception if this is not cluster deploy mode. */
override protected def validateDeployMode(deployMode: String): Unit = {
if (deployMode != "cluster") {
throw new IllegalArgumentException("This REST client is only supported in cluster mode.")
/** Throw an exception if this is not standalone cluster mode. */
private def validateSubmitArgs(args: SparkSubmitArguments): Unit = {
if (!args.isStandaloneCluster) {
throw new IllegalArgumentException(
"This REST client is only supported in standalone cluster mode.")
}
}

/** Extract the URL portion of the master address. */
override protected def getHttpUrl(master: String): URL = {
validateMaster(master)
new URL("http://" + master.stripPrefix("spark://"))
}
}

private object StandaloneRestClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,22 @@ import akka.actor.ActorRef
import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.SparkConf
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.deploy.DeployMessages
import org.apache.spark.deploy.master.Master

/**
* A server that responds to requests submitted by the StandaloneRestClient.
* This is intended to be embedded in the standalone Master. Cluster mode only.
* A server that responds to requests submitted by the [[StandaloneRestClient]].
* This is intended to be embedded in the standalone Master. Cluster mode only
*/
private[spark] class StandaloneRestServer(master: Master, host: String, requestedPort: Int)
extends SubmitRestServer(host, requestedPort, master.conf) {
override protected val handler = new StandaloneRestServerHandler(master)
protected override val handler = new StandaloneRestServerHandler(master)
}

/**
* A handler for requests submitted to the standalone Master
* via the stable application submission REST protocol.
* A handler for requests submitted to the standalone
* Master via the REST application submission protocol.
*/
private[spark] class StandaloneRestServerHandler(
conf: SparkConf,
Expand All @@ -55,8 +54,7 @@ private[spark] class StandaloneRestServerHandler(
}

/** Handle a request to submit a driver. */
override protected def handleSubmit(
request: SubmitDriverRequest): SubmitDriverResponse = {
protected override def handleSubmit(request: SubmitDriverRequest): SubmitDriverResponse = {
val driverDescription = buildDriverDescription(request)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
Expand All @@ -68,8 +66,7 @@ private[spark] class StandaloneRestServerHandler(
}

/** Handle a request to kill a driver. */
override protected def handleKill(
request: KillDriverRequest): KillDriverResponse = {
protected override def handleKill(request: KillDriverRequest): KillDriverResponse = {
val driverId = request.getDriverId
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(driverId), masterActor, askTimeout)
Expand All @@ -81,16 +78,11 @@ private[spark] class StandaloneRestServerHandler(
}

/** Handle a request for a driver's status. */
override protected def handleStatus(
request: DriverStatusRequest): DriverStatusResponse = {
protected override def handleStatus(request: DriverStatusRequest): DriverStatusResponse = {
val driverId = request.getDriverId
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(driverId), masterActor, askTimeout)
// Format exception nicely, if it exists
val message = response.exception.map { e =>
val stackTraceString = e.getStackTrace.map { "\t" + _ }.mkString("\n")
s"Exception from the cluster:\n$e\n$stackTraceString"
}
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
new DriverStatusResponse()
.setSparkVersion(sparkVersion)
.setDriverId(driverId)
Expand All @@ -103,6 +95,7 @@ private[spark] class StandaloneRestServerHandler(

/**
* Build a driver description from the fields specified in the submit request.
*
* This does not currently consider fields used by python applications since
* python is not supported in standalone cluster mode yet.
*/
Expand Down
Loading

0 comments on commit 3db7379

Please sign in to comment.