Skip to content

Commit

Permalink
[SPARK-5388] Provide a stable application submission gateway for stan…
Browse files Browse the repository at this point in the history
…dalone cluster mode

The goal is to provide a stable, REST-based application submission gateway that is not inherently based on Akka, which is unstable across versions. This PR targets standalone cluster mode, but is implemented in a general enough manner that can be potentially extended to other modes in the future. Client mode is currently not included in the changes here because there are many more Akka messages exchanged there.

As of the changes here, the Master will advertise two ports, 7077 and 6066. We need to keep around the old one (7077) for client mode and older versions of Spark submit. However, all new versions of Spark submit will use the REST gateway (6066).

By the way this includes ~700 lines of tests and ~200 lines of license.

Author: Andrew Or <andrew@databricks.com>

Closes #4216 from andrewor14/rest and squashes the following commits:

8d7ce07 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
6f0c597 [Andrew Or] Use nullable fields for integer and boolean values
dfe4bd7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
b9e2a08 [Andrew Or] Minor comments
02b5cea [Andrew Or] Fix tests
d2b1ef8 [Andrew Or] Comment changes + minor code refactoring across the board
9c82a36 [Andrew Or] Minor comment and wording updates
b4695e7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
c9a8ad7 [Andrew Or] Do not include appResource and mainClass as properties
6fc7670 [Andrew Or] Report REST server response back to the user
40e6095 [Andrew Or] Pass submit parameters through system properties
cbd670b [Andrew Or] Include unknown fields, if any, in server response
9fee16f [Andrew Or] Include server protocol version on mismatch
09f873a [Andrew Or] Fix style
8188e61 [Andrew Or] Upgrade Jackson from 2.3.0 to 2.4.4
37538e0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
9165ae8 [Andrew Or] Fall back to Akka if endpoint was not REST
252d53c [Andrew Or] Clean up server error handling behavior further
c643f64 [Andrew Or] Fix style
bbbd329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
792e112 [Andrew Or] Use specific HTTP response codes on error
f98660b [Andrew Or] Version the protocol and include it in REST URL
721819f [Andrew Or] Provide more REST-like interface for submit/kill/status
581f7bf [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
9e0d1af [Andrew Or] Move some classes around to reduce number of files (minor)
42e5de4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
1f1c03f [Andrew Or] Use Jackson's DefaultScalaModule to simplify messages
9229433 [Andrew Or] Reduce duplicate naming in REST field
ade28fd [Andrew Or] Clean up REST response output in Spark submit
b2fef8b [Andrew Or] Abstract the success field to the general response
6c57b4b [Andrew Or] Increase timeout in end-to-end tests
bf696ff [Andrew Or] Add checks for enabling REST when using kill/status
7ee6737 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
e2f7f5f [Andrew Or] Provide more safeguard against missing fields
9581df7 [Andrew Or] Clean up uses of exceptions
914fdff [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
e2104e6 [Andrew Or] stable -> rest
3db7379 [Andrew Or] Fix comments and name fields for better error messages
8d43486 [Andrew Or] Replace SubmitRestProtocolAction with class name
df90e8b [Andrew Or] Use Jackson for JSON de/serialization
d7a1f9f [Andrew Or] Fix local cluster tests
efa5e18 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
e42c131 [Andrew Or] Add end-to-end tests for standalone REST protocol
837475b [Andrew Or] Show the REST port on the Master UI
d8d3717 [Andrew Or] Use a daemon thread pool for REST server
6568ca5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into rest
77774ba [Andrew Or] Minor fixes
206cae4 [Andrew Or] Refactor and add tests for the REST protocol
63c05b3 [Andrew Or] Remove MASTER as a field (minor)
9e21b72 [Andrew Or] Action -> SparkSubmitAction (minor)
51c5ca6 [Andrew Or] Distinguish client and server side Spark versions
b44e103 [Andrew Or] Implement status requests + fix validation behavior
120ab9d [Andrew Or] Support kill and request driver status through SparkSubmit
544de1d [Andrew Or] Major clean ups in code and comments
e958cae [Andrew Or] Supported nested values in messages
484bd21 [Andrew Or] Specify an ordering for fields in SubmitDriverRequestMessage
6ff088d [Andrew Or] Rename classes to generalize REST protocol
af9d9cb [Andrew Or] Integrate REST protocol in standalone mode
53e7c0e [Andrew Or] Initial client, server, and all the messages
  • Loading branch information
Andrew Or authored and pwendell committed Feb 6, 2015
1 parent e772b4e commit 1390e56
Show file tree
Hide file tree
Showing 23 changed files with 2,027 additions and 94 deletions.
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ span.expand-details {
float: right;
}

span.rest-uri {
font-size: 10pt;
font-style: italic;
color: gray;
}

pre {
font-size: 0.8em;
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2110,7 +2110,7 @@ object SparkContext extends Logging {

val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
val defaultCores = 1
val defaultMemory = 512
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
var logLevel = Level.WARN
Expand All @@ -39,9 +38,9 @@ private[spark] class ClientArguments(args: Array[String]) {
var master: String = ""
var jarUrl: String = ""
var mainClass: String = ""
var supervise: Boolean = false
var memory: Int = defaultMemory
var cores: Int = defaultCores
var supervise: Boolean = DEFAULT_SUPERVISE
var memory: Int = DEFAULT_MEMORY
var cores: Int = DEFAULT_CORES
private var _driverOptions = ListBuffer[String]()
def driverOptions = _driverOptions.toSeq

Expand All @@ -50,7 +49,7 @@ private[spark] class ClientArguments(args: Array[String]) {

parse(args.toList)

def parse(args: List[String]): Unit = args match {
private def parse(args: List[String]): Unit = args match {
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
parse(tail)
Expand Down Expand Up @@ -106,9 +105,10 @@ private[spark] class ClientArguments(args: Array[String]) {
|Usage: DriverClient kill <active-master> <driver-id>
|
|Options:
| -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
| -c CORES, --cores CORES Number of cores to request (default: $DEFAULT_CORES)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY)
| -s, --supervise Whether to restart the driver on failure
| (default: $DEFAULT_SUPERVISE)
| -v, --verbose Print more debugging output
""".stripMargin
System.err.println(usage)
Expand All @@ -117,6 +117,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
val uri = new URI(s)
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,22 @@ private[deploy] object DeployMessages {

// Master to MasterWebUI

case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
status: MasterState) {
case class MasterStateResponse(
host: String,
port: Int,
restPort: Option[Int],
workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo],
completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo],
completedDrivers: Array[DriverInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
assert (port > 0)

def uri = "spark://" + host + ":" + port
def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p }
}

// WorkerWebUI to Worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ import org.apache.spark.util.Utils
* fault recovery without spinning up a lot of processes.
*/
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int)
class LocalSparkCluster(
numWorkers: Int,
coresPerWorker: Int,
memoryPerWorker: Int,
conf: SparkConf)
extends Logging {

private val localHostname = Utils.localHostName()
Expand All @@ -43,9 +47,11 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")

// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone().setIfMissing("spark.master.rest.enabled", "false")

/* Start the Master */
val conf = new SparkConf(false)
val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0, conf)
val (masterSystem, masterPort, _, _) = Master.startSystemAndActor(localHostname, 0, 0, _conf)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
val masters = Array(masterUrl)
Expand Down
142 changes: 110 additions & 32 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,35 @@
package org.apache.spark.deploy

import java.io.{File, PrintStream}
import java.lang.reflect.{Modifier, InvocationTargetException}
import java.lang.reflect.{InvocationTargetException, Modifier}
import java.net.URL

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.hadoop.fs.Path
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
import org.apache.ivy.core.module.descriptor._
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.spark.executor.ExecutorURLClassLoader

import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
import org.apache.spark.util.Utils
import org.apache.spark.executor.ChildExecutorURLClassLoader
import org.apache.spark.executor.MutableURLClassLoader

/**
* Whether to submit, kill, or request the status of an application.
* The latter two operations are currently supported only for standalone cluster mode.
*/
private[spark] object SparkSubmitAction extends Enumeration {
type SparkSubmitAction = Value
val SUBMIT, KILL, REQUEST_STATUS = Value
}

/**
* Main gateway of launching a Spark application.
Expand Down Expand Up @@ -83,21 +93,74 @@ object SparkSubmit {
if (appArgs.verbose) {
printStream.println(appArgs)
}
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}

/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
private def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.killSubmission(args.master, args.submissionToKill)
}

/**
* @return a tuple containing
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a list of system properties and env vars, and
* (4) the main class for the child
* Request the status of an existing submission using the REST protocol.
* Standalone cluster mode only.
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
private def requestStatus(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
}

// Values to return
/**
* Submit the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
private[spark] def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
printStream.println("Running Spark using the REST application submission protocol.")
runMain(childArgs, childClasspath, sysProps, childMainClass)
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass)
}
}

/**
* Prepare the environment for submitting an application.
* This returns a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
* Exposed for testing.
*/
private[spark] def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], Map[String, String], String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
Expand Down Expand Up @@ -235,10 +298,13 @@ object SparkSubmit {
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
// Do not set CL arguments here because there are multiple possibilities for the main class
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
sysProp = "spark.driver.supervise"),

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
Expand Down Expand Up @@ -279,7 +345,6 @@ object SparkSubmit {
if (args.childArgs != null) { childArgs ++= args.childArgs }
}


// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
Expand All @@ -301,14 +366,21 @@ object SparkSubmit {
sysProps.put("spark.jars", jars.mkString(","))
}

// In standalone-cluster mode, use Client as a wrapper around the user class
if (clusterManager == STANDALONE && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
childArgs += "--supervise"
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
Expand Down Expand Up @@ -345,7 +417,7 @@ object SparkSubmit {

// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sysProps -= ("spark.driver.host")
sysProps -= "spark.driver.host"
}

// Resolve paths in certain spark properties
Expand Down Expand Up @@ -374,9 +446,15 @@ object SparkSubmit {
(childArgs, childClasspath, sysProps, childMainClass)
}

private def launch(
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
/**
* Run the main method of the child class using the provided launch environment.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
Expand Down Expand Up @@ -697,7 +775,7 @@ private[spark] object SparkSubmitUtils {
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
*/
private[spark] case class OptionAssigner(
private case class OptionAssigner(
value: String,
clusterManager: Int,
deployMode: Int,
Expand Down
Loading

0 comments on commit 1390e56

Please sign in to comment.