Skip to content

Commit

Permalink
Report REST server response back to the user
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Feb 5, 2015
1 parent 40e6095 commit 6fc7670
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 33 deletions.
29 changes: 4 additions & 25 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,15 @@ object SparkSubmit {
* Standalone cluster mode only.
*/
private def kill(args: SparkSubmitArguments): Unit = {
val client = new StandaloneRestClient
val response = client.killSubmission(args.master, args.driverToKill)
response match {
case k: KillSubmissionResponse => handleRestResponse(k)
case r => handleUnexpectedRestResponse(r)
}
new StandaloneRestClient().killSubmission(args.master, args.driverToKill)
}

/**
* Request the status of an existing driver using the REST application submission protocol.
* Standalone cluster mode only.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
val client = new StandaloneRestClient
val response = client.requestSubmissionStatus(args.master, args.driverToRequestStatusFor)
response match {
case s: SubmissionStatusResponse => handleRestResponse(s)
case r => handleUnexpectedRestResponse(r)
}
new StandaloneRestClient().requestSubmissionStatus(args.master, args.driverToRequestStatusFor)
}

/**
Expand All @@ -150,8 +140,8 @@ object SparkSubmit {
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printStream.println(s"Master endpoint ${args.master} was not a " +
s"REST server. Falling back to legacy submission gateway instead.")
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
Expand Down Expand Up @@ -540,17 +530,6 @@ object SparkSubmit {
}
}

/** Log the response sent by the server in the REST application submission protocol. */
private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = {
printStream.println(s"Server responded with ${response.messageType}:\n${response.toJson}")
}

/** Log an appropriate error if the response sent by the server is not of the expected type. */
private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
printStream.println(
s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
}

/**
* Return whether the given primary resource represents a user jar.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ private[spark] class StandaloneRestClient extends Logging {
val request = constructSubmitRequest(appArgs, sparkProperties, environmentVariables)
val response = postJson(url, request.toJson)
response match {
case s: CreateSubmissionResponse => reportSubmissionStatus(master, s)
case _ => // unexpected type, let upstream caller handle it
case s: CreateSubmissionResponse =>
reportSubmissionStatus(master, s)
handleRestResponse(s)
case unexpected =>
handleUnexpectedRestResponse(unexpected)
}
response
}
Expand All @@ -75,14 +78,27 @@ private[spark] class StandaloneRestClient extends Logging {
def killSubmission(master: String, submissionId: String): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to kill submission $submissionId in $master.")
validateMaster(master)
post(getKillUrl(master, submissionId))
val response = post(getKillUrl(master, submissionId))
response match {
case k: KillSubmissionResponse => handleRestResponse(k)
case unexpected => handleUnexpectedRestResponse(unexpected)
}
response
}

/** Request the status of a submission from the server. */
def requestSubmissionStatus(master: String, submissionId: String): SubmitRestProtocolResponse = {
def requestSubmissionStatus(
master: String,
submissionId: String,
quiet: Boolean = false): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request for the status of submission $submissionId in $master.")
validateMaster(master)
get(getStatusUrl(master, submissionId))
val response = get(getStatusUrl(master, submissionId))
response match {
case s: SubmissionStatusResponse => if (!quiet) { handleRestResponse(s) }
case unexpected => handleUnexpectedRestResponse(unexpected)
}
response
}

/** Send a GET request to the specified URL. */
Expand Down Expand Up @@ -224,7 +240,7 @@ private[spark] class StandaloneRestClient extends Logging {
*/
private def pollSubmissionStatus(master: String, submissionId: String): Unit = {
(1 to REPORT_DRIVER_STATUS_MAX_TRIES).foreach { _ =>
val response = requestSubmissionStatus(master, submissionId)
val response = requestSubmissionStatus(master, submissionId, quiet = true)
val statusResponse = response match {
case s: SubmissionStatusResponse => s
case _ => return // unexpected type, let upstream caller handle it
Expand Down Expand Up @@ -253,6 +269,16 @@ private[spark] class StandaloneRestClient extends Logging {
}
logError(s"Error: Master did not recognize driver $submissionId.")
}

/** Log the response sent by the server in the REST application submission protocol. */
private def handleRestResponse(response: SubmitRestProtocolResponse): Unit = {
logInfo(s"Server responded with ${response.messageType}:\n${response.toJson}")
}

/** Log an appropriate error if the response sent by the server is not of the expected type. */
private def handleUnexpectedRestResponse(unexpected: SubmitRestProtocolResponse): Unit = {
logError(s"Error: Server responded with message of unexpected type ${unexpected.messageType}.")
}
}

private[spark] object StandaloneRestClient {
Expand All @@ -266,12 +292,11 @@ private[spark] object StandaloneRestClient {
*/
def main(args: Array[String]): Unit = {
val client = new StandaloneRestClient
val appArgs = args.slice(1, args.size)
val master = sys.props.get("spark.master").getOrElse {
throw new IllegalArgumentException("'spark.master' must be set.")
}
val sparkProperties = new SparkConf().getAll.toMap
val environmentVariables = sys.env.filter { case (k, _) => k.startsWith("SPARK_") }
client.createSubmission(master, appArgs, sparkProperties, environmentVariables)
client.createSubmission(master, args, sparkProperties, environmentVariables)
}
}

0 comments on commit 6fc7670

Please sign in to comment.