Skip to content

Commit

Permalink
Support kill and request driver status through SparkSubmit
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 21, 2015
1 parent 544de1d commit 120ab9d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 7 deletions.
34 changes: 31 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
import org.apache.spark.deploy.rest.StandaloneRestClient

/**
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone cluster mode.
*/
private[spark] object Action extends Enumeration {
type Action = Value
val SUBMIT, KILL, REQUEST_STATUS = Value
}

/**
* Main gateway of launching a Spark application.
*
Expand Down Expand Up @@ -73,11 +82,30 @@ object SparkSubmit {
if (appArgs.verbose) {
printStream.println(appArgs)
}
launch(appArgs)
appArgs.action match {
case Action.SUBMIT => submit(appArgs)
case Action.KILL => kill(appArgs)
case Action.REQUEST_STATUS => requestStatus(appArgs)
}
}

/**
* Kill an existing driver using the stable REST protocol. Standalone cluster mode only.
*/
private[spark] def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient().killDriver(args.master, args.driverToKill)
}

/**
* Request the status of an existing driver using the stable REST protocol.
* Standalone cluster mode only.
*/
private[spark] def requestStatus(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient().requestDriverStatus(args.master, args.driverToRequestStatusFor)
}

/**
* Launch the application using the provided parameters.
* Submit the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
Expand All @@ -89,7 +117,7 @@ object SparkSubmit {
* main method of a child class. Instead, we pass the submit parameters directly to
* a REST client, which will submit the application using the stable REST protocol.
*/
private[spark] def launch(args: SparkSubmitArguments): Unit = {
private[spark] def submit(args: SparkSubmitArguments): Unit = {
// Environment needed to launch the child main class
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.jar.JarFile
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.util.Utils
import org.apache.spark.deploy.Action.Action

/**
* Parses and encapsulates arguments from the spark-submit script.
Expand All @@ -39,8 +40,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var driverExtraClassPath: String = null
var driverExtraLibraryPath: String = null
var driverExtraJavaOptions: String = null
var driverCores: String = null
var supervise: Boolean = false
var queue: String = null
var numExecutors: String = null
var files: String = null
Expand All @@ -55,6 +54,23 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var pyFiles: String = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

// Standalone cluster mode only
var supervise: Boolean = false
var driverCores: String = null
var driverToKill: String = null
var driverToRequestStatusFor: String = null

def action: Action = {
(driverToKill, driverToRequestStatusFor) match {
case (null, null) => Action.SUBMIT
case (_, null) => Action.KILL
case (null, _) => Action.REQUEST_STATUS
case _ => SparkSubmit.printErrorAndExit(
"Requested to both kill and request status for a driver. Choose only one.")
null // never reached
}
}

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
Expand All @@ -79,7 +95,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()

checkRequiredArguments()
validateArguments()

/**
* Merge values from the default properties file with those specified through --conf.
Expand Down Expand Up @@ -171,7 +187,15 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}

/** Ensure that required fields exists. Call this only once all defaults are loaded. */
private def checkRequiredArguments(): Unit = {
private def validateArguments(): Unit = {
action match {
case Action.SUBMIT => validateSubmitArguments()
case Action.KILL => validateKillArguments()
case Action.REQUEST_STATUS => validateStatusRequestArguments()
}
}

private def validateSubmitArguments(): Unit = {
if (args.length == 0) {
printUsageAndExit(-1)
}
Expand Down Expand Up @@ -206,6 +230,25 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://") || deployMode != "cluster") {
SparkSubmit.printErrorAndExit("Killing drivers is only supported in standalone cluster mode")
}
if (driverToKill == null) {
SparkSubmit.printErrorAndExit("Please specify a driver to kill")
}
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://") || deployMode != "cluster") {
SparkSubmit.printErrorAndExit(
"Requesting driver statuses is only supported in standalone cluster mode")
}
if (driverToRequestStatusFor == null) {
SparkSubmit.printErrorAndExit("Please specify a driver to request status for")
}
}

override def toString = {
s"""Parsed arguments:
| master $master
Expand Down Expand Up @@ -312,6 +355,14 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
propertiesFile = value
parse(tail)

case ("--kill") :: value :: tail =>
driverToKill = value
parse(tail)

case ("--status") :: value :: tail =>
driverToRequestStatusFor = value
parse(tail)

case ("--supervise") :: tail =>
supervise = true
parse(tail)
Expand Down Expand Up @@ -410,6 +461,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| Spark standalone with cluster deploy mode only:
| --driver-cores NUM Cores for driver (Default: 1).
| --supervise If given, restarts the driver on failure.
| --kill DRIVER_ID If given, kills the driver specified.
| --status DRIVER_ID If given, requests the status of the driver specified.
|
| Spark standalone and Mesos only:
| --total-executor-cores NUM Total cores for all executors.
Expand Down

0 comments on commit 120ab9d

Please sign in to comment.