Skip to content

Commit

Permalink
Major clean ups in code and comments
Browse files Browse the repository at this point in the history
This involves refactoring SparkSubmit a little to put the code
that launches the REST client in the right place. This commit also
adds port retry logic in the REST server, which was previously
missing.
  • Loading branch information
Andrew Or committed Jan 21, 2015
1 parent e958cae commit 544de1d
Show file tree
Hide file tree
Showing 14 changed files with 151 additions and 143 deletions.
78 changes: 38 additions & 40 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,24 @@ object SparkSubmit {
if (appArgs.verbose) {
printStream.println(appArgs)
}

// In standalone cluster mode, use the brand new REST client to submit the application
val isStandaloneCluster =
appArgs.master.startsWith("spark://") && appArgs.deployMode == "cluster"
if (isStandaloneCluster) {
new StandaloneRestClient().submitDriver(appArgs)
return
}

val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
launch(appArgs)
}

/**
* @return a tuple containing
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a list of system properties and env vars, and
* (4) the main class for the child
* Launch the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*
* Note that standalone cluster mode is an exception in that we do not invoke the
* main method of a child class. Instead, we pass the submit parameters directly to
* a REST client, which will submit the application using the stable REST protocol.
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {

// Values to return
private[spark] def launch(args: SparkSubmitArguments): Unit = {
// Environment needed to launch the child main class
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sysProps = new HashMap[String, String]()
Expand Down Expand Up @@ -198,8 +192,6 @@ object SparkSubmit {

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

// Yarn client only
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
Expand Down Expand Up @@ -228,6 +220,9 @@ object SparkSubmit {
sysProp = "spark.files")
)

val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isStandaloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
Expand All @@ -239,7 +234,6 @@ object SparkSubmit {
if (args.childArgs != null) { childArgs ++= args.childArgs }
}


// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
Expand All @@ -253,7 +247,6 @@ object SparkSubmit {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
Expand All @@ -262,19 +255,6 @@ object SparkSubmit {
sysProps.put("spark.jars", jars.mkString(","))
}

// In standalone-cluster mode, use Client as a wrapper around the user class
if (clusterManager == STANDALONE && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) {
childArgs += "--supervise"
}
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
Expand All @@ -294,7 +274,7 @@ object SparkSubmit {

// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sysProps -= ("spark.driver.host")
sysProps -= "spark.driver.host"
}

// Resolve paths in certain spark properties
Expand All @@ -320,10 +300,28 @@ object SparkSubmit {
sysProps("spark.submit.pyFiles") = formattedPyFiles
}

(childArgs, childClasspath, sysProps, childMainClass)
// In standalone cluster mode, use the stable application submission REST protocol.
// Otherwise, just call the main method of the child class.
if (isStandaloneCluster) {
// NOTE: since we mutate the values of some configs in this method, we must update the
// corresponding fields in the original SparkSubmitArguments to reflect these changes.
args.sparkProperties.clear()
args.sparkProperties ++= sysProps
sysProps.get("spark.jars").foreach { args.jars = _ }
sysProps.get("spark.files").foreach { args.files = _ }
new StandaloneRestClient().submitDriver(args)
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass)
}
}

private def launch(
/**
* Run the main method of the child class using the provided launch environment.
*
* Depending on the deploy mode, cluster manager, and the type of the application,
* this main class may not necessarily be the one provided by the user.
*/
private def runMain(
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
sysProps: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ private[spark] class Master(
recoveryCompletionTask.cancel()
}
webUi.stop()
restServer.stop()
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ private[spark] object DriverStatusRequestField
}

/**
* A request sent to the cluster manager to query the status of a driver.
* A request sent to the cluster manager to query the status of a driver
* in the stable application submission REST protocol.
*/
private[spark] class DriverStatusRequestMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.DRIVER_STATUS_REQUEST,
DriverStatusRequestField.ACTION,
DriverStatusRequestField.requiredFields)
SubmitRestProtocolAction.DRIVER_STATUS_REQUEST,
DriverStatusRequestField.ACTION,
DriverStatusRequestField.requiredFields)

private[spark] object DriverStatusRequestMessage
extends SubmitRestProtocolMessageCompanion[DriverStatusRequestMessage] {
protected override def newMessage() = new DriverStatusRequestMessage
protected override def fieldWithName(field: String) = DriverStatusRequestField.withName(field)
protected override def fieldFromString(field: String) = DriverStatusRequestField.fromString(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ private[spark] object DriverStatusResponseField
}

/**
* A message sent from the cluster manager in response to a DriverStatusResponseMessage.
* A message sent from the cluster manager in response to a DriverStatusRequestMessage
* in the stable application submission REST protocol.
*/
private[spark] class DriverStatusResponseMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.DRIVER_STATUS_RESPONSE,
DriverStatusResponseField.ACTION,
DriverStatusResponseField.requiredFields)
SubmitRestProtocolAction.DRIVER_STATUS_RESPONSE,
DriverStatusResponseField.ACTION,
DriverStatusResponseField.requiredFields)

private[spark] object DriverStatusResponseMessage
extends SubmitRestProtocolMessageCompanion[DriverStatusResponseMessage] {
protected override def newMessage() = new DriverStatusResponseMessage
protected override def fieldWithName(field: String) = DriverStatusResponseField.withName(field)
protected override def fieldFromString(field: String) = DriverStatusResponseField.fromString(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.rest

/**
* A field used in a ErrorMessage.
* A field used in an ErrorMessage.
*/
private[spark] abstract class ErrorField extends SubmitRestProtocolField
private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion[ErrorField] {
Expand All @@ -30,14 +30,14 @@ private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion[ErrorF
}

/**
* An error message exchanged in the stable application submission protocol.
* An error message exchanged in the stable application submission REST protocol.
*/
private[spark] class ErrorMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.ERROR,
ErrorField.ACTION,
ErrorField.requiredFields)
SubmitRestProtocolAction.ERROR,
ErrorField.ACTION,
ErrorField.requiredFields)

private[spark] object ErrorMessage extends SubmitRestProtocolMessageCompanion[ErrorMessage] {
protected override def newMessage() = new ErrorMessage
protected override def fieldWithName(field: String) = ErrorField.withName(field)
protected override def fieldFromString(field: String) = ErrorField.fromString(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ private[spark] object KillDriverRequestField
}

/**
* A request sent to the cluster manager to kill a driver.
* A request sent to the cluster manager to kill a driver
* in the stable application submission REST protocol.
*/
private[spark] class KillDriverRequestMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.KILL_DRIVER_REQUEST,
KillDriverRequestField.ACTION,
KillDriverRequestField.requiredFields)
SubmitRestProtocolAction.KILL_DRIVER_REQUEST,
KillDriverRequestField.ACTION,
KillDriverRequestField.requiredFields)

private[spark] object KillDriverRequestMessage
extends SubmitRestProtocolMessageCompanion[KillDriverRequestMessage] {
protected override def newMessage() = new KillDriverRequestMessage
protected override def fieldWithName(field: String) = KillDriverRequestField.withName(field)
protected override def fieldFromString(field: String) = KillDriverRequestField.fromString(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ private[spark] object KillDriverResponseField
}

/**
* A message sent from the cluster manager in response to a KillDriverResponseMessage.
* A message sent from the cluster manager in response to a KillDriverRequestMessage
* in the stable application submission REST protocol.
*/
private[spark] class KillDriverResponseMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.KILL_DRIVER_RESPONSE,
KillDriverResponseField.ACTION,
KillDriverResponseField.requiredFields)
SubmitRestProtocolAction.KILL_DRIVER_RESPONSE,
KillDriverResponseField.ACTION,
KillDriverResponseField.requiredFields)

private[spark] object KillDriverResponseMessage
extends SubmitRestProtocolMessageCompanion[KillDriverResponseMessage] {
protected override def newMessage() = new KillDriverResponseMessage
protected override def fieldWithName(field: String) = KillDriverResponseField.withName(field)
protected override def fieldFromString(field: String) = KillDriverResponseField.fromString(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.deploy.SparkSubmitArguments
import org.apache.spark.util.Utils

/**
* A client that submits Spark applications to the standalone Master using a stable REST protocol.
* A client that submits applications to the standalone Master using the stable REST protocol.
* This client is intended to communicate with the StandaloneRestServer. Cluster mode only.
*/
private[spark] class StandaloneRestClient extends SubmitRestClient {
Expand All @@ -33,12 +33,8 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
override protected def constructSubmitRequest(
args: SparkSubmitArguments): SubmitDriverRequestMessage = {
import SubmitDriverRequestField._
val driverMemory = Option(args.driverMemory)
.map { m => Utils.memoryStringToMb(m).toString }
.orNull
val executorMemory = Option(args.executorMemory)
.map { m => Utils.memoryStringToMb(m).toString }
.orNull
val dm = Option(args.driverMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull
val em = Option(args.executorMemory).map { m => Utils.memoryStringToMb(m).toString }.orNull
val message = new SubmitDriverRequestMessage()
.setField(SPARK_VERSION, sparkVersion)
.setField(MASTER, args.master)
Expand All @@ -47,17 +43,17 @@ private[spark] class StandaloneRestClient extends SubmitRestClient {
.setFieldIfNotNull(MAIN_CLASS, args.mainClass)
.setFieldIfNotNull(JARS, args.jars)
.setFieldIfNotNull(FILES, args.files)
.setFieldIfNotNull(DRIVER_MEMORY, driverMemory)
.setFieldIfNotNull(DRIVER_MEMORY, dm)
.setFieldIfNotNull(DRIVER_CORES, args.driverCores)
.setFieldIfNotNull(DRIVER_EXTRA_JAVA_OPTIONS, args.driverExtraJavaOptions)
.setFieldIfNotNull(DRIVER_EXTRA_CLASS_PATH, args.driverExtraClassPath)
.setFieldIfNotNull(DRIVER_EXTRA_LIBRARY_PATH, args.driverExtraLibraryPath)
.setFieldIfNotNull(SUPERVISE_DRIVER, args.supervise.toString)
.setFieldIfNotNull(EXECUTOR_MEMORY, executorMemory)
.setFieldIfNotNull(EXECUTOR_MEMORY, em)
.setFieldIfNotNull(TOTAL_EXECUTOR_CORES, args.totalExecutorCores)
args.childArgs.foreach(message.appendAppArg)
args.sparkProperties.foreach { case (k, v) => message.setSparkProperty(k, v) }
// TODO: set environment variables?
// TODO: send special environment variables?
message.validate()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.deploy.rest

import java.io.File

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.SparkConf
Expand All @@ -29,22 +28,19 @@ import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import akka.actor.ActorRef

/**
* A server that responds to requests submitted by the StandaloneRestClient.
* This is intended to be embedded in the standalone Master. Cluster mode only.
*/
private[spark] class StandaloneRestServer(
master: Master,
host: String,
requestedPort: Int)
extends SubmitRestServer(host, requestedPort) {
private[spark] class StandaloneRestServer(master: Master, host: String, requestedPort: Int)
extends SubmitRestServer(host, requestedPort, master.conf) {
override protected val handler = new StandaloneRestServerHandler(master)
}

/**
* A handler for requests submitted to the standalone Master through the REST protocol.
* A handler for requests submitted to the standalone Master
* via the stable application submission REST protocol.
*/
private[spark] class StandaloneRestServerHandler(
conf: SparkConf,
Expand Down Expand Up @@ -141,9 +137,7 @@ private[spark] class StandaloneRestServerHandler(
// Otherwise, once the driver is launched it will contact with the wrong server
.set("spark.master", masterUrl)
.set("spark.app.name", appName)
// Include main app resource on the executor classpath
// The corresponding behavior in client mode is handled in SparkSubmit
.set("spark.jars", jars.map(_ + ",").getOrElse("") + appResource)
jars.foreach { j => conf.set("spark.jars", j) }
files.foreach { f => conf.set("spark.files", f) }
driverExtraJavaOptions.foreach { j => conf.set("spark.driver.extraJavaOptions", j) }
driverExtraClassPath.foreach { cp => conf.set("spark.driver.extraClassPath", cp) }
Expand Down
Loading

0 comments on commit 544de1d

Please sign in to comment.