Skip to content

Commit

Permalink
Do not include appResource and mainClass as properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Feb 5, 2015
1 parent 6fc7670 commit c9a8ad7
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 27 deletions.
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,6 @@ object SparkSubmit {
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, sysProp = "spark.driver.cores"),
OptionAssigner(args.primaryResource, STANDALONE, CLUSTER, sysProp = "spark.app.resource"),
OptionAssigner(args.mainClass, STANDALONE, CLUSTER, sysProp = "spark.app.mainClass"),
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
sysProp = "spark.driver.supervise"),

Expand Down Expand Up @@ -375,6 +373,7 @@ object SparkSubmit {
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.StandaloneRestClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
Expand Down Expand Up @@ -773,7 +772,7 @@ private[spark] object SparkSubmitUtils {
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
*/
private[spark] case class OptionAssigner(
private case class OptionAssigner(
value: String,
clusterManager: Int,
deployMode: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,10 @@ private[spark] class StandaloneRestClient extends Logging {
*/
def createSubmission(
master: String,
appArgs: Array[String],
sparkProperties: Map[String, String],
environmentVariables: Map[String, String]): SubmitRestProtocolResponse = {
request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to launch a driver in $master.")
validateMaster(master)
val url = getSubmitUrl(master)
val request = constructSubmitRequest(appArgs, sparkProperties, environmentVariables)
val response = postJson(url, request.toJson)
response match {
case s: CreateSubmissionResponse =>
Expand Down Expand Up @@ -202,11 +199,15 @@ private[spark] class StandaloneRestClient extends Logging {

/** Construct a message that captures the specified parameters for submitting an application. */
def constructSubmitRequest(
appResource: String,
mainClass: String,
appArgs: Array[String],
sparkProperties: Map[String, String],
environmentVariables: Map[String, String]): CreateSubmissionRequest = {
val message = new CreateSubmissionRequest
message.clientSparkVersion = sparkVersion
message.appResource = appResource
message.mainClass = mainClass
message.appArgs = appArgs
message.sparkProperties = sparkProperties
message.environmentVariables = environmentVariables
Expand Down Expand Up @@ -286,17 +287,23 @@ private[spark] object StandaloneRestClient {
val REPORT_DRIVER_STATUS_MAX_TRIES = 10
val PROTOCOL_VERSION = "v1"

/**
* Submit an application, assuming parameters are specified through system properties.
* Usage: StandaloneRestClient [app args*]
*/
/** Submit an application, assuming parameters are specified through system properties. */
def main(args: Array[String]): Unit = {
if (args.size < 2) {
sys.error("Usage: StandaloneRestClient [app resource] [main class] [app args*]")
sys.exit(1)
}
val appResource = args(0)
val mainClass = args(1)
val appArgs = args.slice(2, args.size)
val client = new StandaloneRestClient
val master = sys.props.get("spark.master").getOrElse {
throw new IllegalArgumentException("'spark.master' must be set.")
}
val sparkProperties = new SparkConf().getAll.toMap
val environmentVariables = sys.env.filter { case (k, _) => k.startsWith("SPARK_") }
client.createSubmission(master, args, sparkProperties, environmentVariables)
val submitRequest = client.constructSubmitRequest(
appResource, mainClass, appArgs, sparkProperties, environmentVariables)
client.createSubmission(master, submitRequest)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -337,17 +337,15 @@ private[spark] class SubmitRequestServlet(master: Master) extends StandaloneRest
* cluster mode yet.
*/
private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = {
val sparkProperties = request.sparkProperties

// Required fields, including the main class because python is not yet supported
val appResource = sparkProperties.get("spark.app.resource").getOrElse {
throw new SubmitRestMissingFieldException("Main application resource is missing.")
}
val mainClass = sparkProperties.get("spark.app.mainClass").getOrElse {
val appResource = request.appResource
val mainClass = Option(request.mainClass).getOrElse {
throw new SubmitRestMissingFieldException("Main class is missing.")
}

// Optional fields
val sparkProperties = request.sparkProperties
val driverMemory = sparkProperties.get("spark.driver.memory")
val driverCores = sparkProperties.get("spark.driver.cores")
val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ private[spark] abstract class SubmitRestProtocolRequest extends SubmitRestProtoc
* A request to submit a driver in the REST application submission protocol.
*/
private[spark] class CreateSubmissionRequest extends SubmitRestProtocolRequest {
var appResource: String = null
var mainClass: String = null
var appArgs: Array[String] = null
var sparkProperties: Map[String, String] = null
var environmentVariables: Map[String, String] = null

protected override def doValidate(): Unit = {
super.doValidate()
assert(sparkProperties != null, "No Spark properties set!")
assertFieldIsSet(appResource, "appResource")
assertPropertyIsSet("spark.app.name")
assertPropertyIsSet("spark.app.resource")
assertPropertyIsBoolean("spark.driver.supervise")
assertPropertyIsNumeric("spark.driver.cores")
assertPropertyIsNumeric("spark.cores.max")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterAll with Bef
mainJar) ++ appArgs
val args = new SparkSubmitArguments(commandLineArgs)
val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args)
val response = client.createSubmission(
args.master, appArgs.toArray, sparkProperties.toMap, Map.empty)
val request = client.constructSubmitRequest(
mainJar, mainClass, appArgs.toArray, sparkProperties.toMap, Map.empty)
val response = client.createSubmission(masterRestUrl, request)
val submitResponse = getSubmitResponse(response)
val submissionId = submitResponse.submissionId
assert(submissionId != null, "Application submission was unsuccessful!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ class SubmitRestProtocolSuite extends FunSuite {
val message = new CreateSubmissionRequest
intercept[SubmitRestProtocolException] { message.validate() }
message.clientSparkVersion = "1.2.3"
message.appResource = "honey-walnut-cherry.jar"
message.mainClass = "org.apache.spark.examples.SparkPie"
val conf = new SparkConf(false)
conf.set("spark.app.name", "SparkPie")
conf.set("spark.app.resource", "honey-walnut-cherry.jar")
message.sparkProperties = conf.getAll.toMap
message.validate()
// optional fields
conf.set("spark.app.mainClass", "org.apache.spark.examples.SparkPie")
conf.set("spark.jars", "mayonnaise.jar,ketchup.jar")
conf.set("spark.files", "fireball.png")
conf.set("spark.driver.memory", "512m")
Expand Down Expand Up @@ -137,9 +137,9 @@ class SubmitRestProtocolSuite extends FunSuite {
assertJsonEquals(json, submitDriverRequestJson)
val newMessage = SubmitRestProtocolMessage.fromJson(json, classOf[CreateSubmissionRequest])
assert(newMessage.clientSparkVersion === "1.2.3")
assert(newMessage.appResource === "honey-walnut-cherry.jar")
assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie")
assert(newMessage.sparkProperties("spark.app.name") === "SparkPie")
assert(newMessage.sparkProperties("spark.app.resource") === "honey-walnut-cherry.jar")
assert(newMessage.sparkProperties("spark.app.mainClass") === "org.apache.spark.examples.SparkPie")
assert(newMessage.sparkProperties("spark.jars") === "mayonnaise.jar,ketchup.jar")
assert(newMessage.sparkProperties("spark.files") === "fireball.png")
assert(newMessage.sparkProperties("spark.driver.memory") === "512m")
Expand Down Expand Up @@ -261,10 +261,12 @@ class SubmitRestProtocolSuite extends FunSuite {
|{
| "action" : "CreateSubmissionRequest",
| "appArgs" : [ "two slices", "a hint of cinnamon" ],
| "appResource" : "honey-walnut-cherry.jar",
| "clientSparkVersion" : "1.2.3",
| "environmentVariables" : {
| "PATH" : "/dev/null"
| },
| "mainClass" : "org.apache.spark.examples.SparkPie",
| "sparkProperties" : {
| "spark.driver.extraLibraryPath" : "pickle.jar",
| "spark.jars" : "mayonnaise.jar,ketchup.jar",
Expand All @@ -273,12 +275,10 @@ class SubmitRestProtocolSuite extends FunSuite {
| "spark.cores.max" : "10000",
| "spark.driver.memory" : "512m",
| "spark.files" : "fireball.png",
| "spark.app.resource" : "honey-walnut-cherry.jar",
| "spark.driver.cores" : "180",
| "spark.driver.extraJavaOptions" : " -Dslices=5 -Dcolor=mostly_red",
| "spark.executor.memory" : "256m",
| "spark.driver.extraClassPath" : "food-coloring.jar",
| "spark.app.mainClass" : "org.apache.spark.examples.SparkPie"
| "spark.driver.extraClassPath" : "food-coloring.jar"
| }
|}
""".stripMargin
Expand Down

0 comments on commit c9a8ad7

Please sign in to comment.