Skip to content

Commit

Permalink
Rename classes to generalize REST protocol
Browse files Browse the repository at this point in the history
Previously the REST protocol was very explicitly tied to the
standalone mode. This commit frees the protocol from this
restriction.
  • Loading branch information
Andrew Or committed Jan 20, 2015
1 parent af9d9cb commit 6ff088d
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 430 deletions.
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ object SparkSubmit {
}

// In standalone cluster mode, use the brand new REST client to submit the application
val doingRest = appArgs.master.startsWith("spark://") && appArgs.deployMode == "cluster"
if (doingRest) {
println("Submitting driver through the REST interface.")
val isStandaloneCluster =
appArgs.master.startsWith("spark://") && appArgs.deployMode == "cluster"
if (isStandaloneCluster) {
new StandaloneRestClient().submitDriver(appArgs)
println("Done submitting driver.")
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ private[spark] class Master(
throw new SparkException("spark.deploy.defaultCores must be positive")
}

val restServer = new StandaloneRestServer(this, host, 6677)
// Alternative application submission gateway that is stable across Spark versions
private val restServerPort = conf.getInt("spark.master.rest.port", 17077)
private val restServer = new StandaloneRestServer(this, host, restServerPort)
restServer.start()

override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
/**
* A field used in a DriverStatusRequestMessage.
*/
private[spark] abstract class DriverStatusRequestField extends StandaloneRestProtocolField
private[spark] object DriverStatusRequestField extends StandaloneRestProtocolFieldCompanion {
private[spark] abstract class DriverStatusRequestField extends SubmitRestProtocolField
private[spark] object DriverStatusRequestField extends SubmitRestProtocolFieldCompanion {
case object ACTION extends DriverStatusRequestField
case object SPARK_VERSION extends DriverStatusRequestField
case object MESSAGE extends DriverStatusRequestField
Expand All @@ -32,16 +32,16 @@ private[spark] object DriverStatusRequestField extends StandaloneRestProtocolFie
}

/**
* A request sent to the standalone Master to query the status of a driver.
* A request sent to the cluster manager to query the status of a driver.
*/
private[spark] class DriverStatusRequestMessage extends StandaloneRestProtocolMessage(
StandaloneRestProtocolAction.DRIVER_STATUS_REQUEST,
private[spark] class DriverStatusRequestMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.DRIVER_STATUS_REQUEST,
DriverStatusRequestField.ACTION,
DriverStatusRequestField.requiredFields)

private[spark] object DriverStatusRequestMessage extends StandaloneRestProtocolMessageCompanion {
protected override def newMessage(): StandaloneRestProtocolMessage =
private[spark] object DriverStatusRequestMessage extends SubmitRestProtocolMessageCompanion {
protected override def newMessage(): SubmitRestProtocolMessage =
new DriverStatusRequestMessage
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
protected override def fieldWithName(field: String): SubmitRestProtocolField =
DriverStatusRequestField.withName(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
/**
* A field used in a DriverStatusResponseMessage.
*/
private[spark] abstract class DriverStatusResponseField extends StandaloneRestProtocolField
private[spark] object DriverStatusResponseField extends StandaloneRestProtocolFieldCompanion {
private[spark] abstract class DriverStatusResponseField extends SubmitRestProtocolField
private[spark] object DriverStatusResponseField extends SubmitRestProtocolFieldCompanion {
case object ACTION extends DriverStatusResponseField
case object SPARK_VERSION extends DriverStatusResponseField
case object MESSAGE extends DriverStatusResponseField
Expand All @@ -36,16 +36,16 @@ private[spark] object DriverStatusResponseField extends StandaloneRestProtocolFi
}

/**
* A message sent from the standalone Master in response to a DriverStatusResponseMessage.
* A message sent from the cluster manager in response to a DriverStatusResponseMessage.
*/
private[spark] class DriverStatusResponseMessage extends StandaloneRestProtocolMessage(
StandaloneRestProtocolAction.DRIVER_STATUS_RESPONSE,
private[spark] class DriverStatusResponseMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.DRIVER_STATUS_RESPONSE,
DriverStatusResponseField.ACTION,
DriverStatusResponseField.requiredFields)

private[spark] object DriverStatusResponseMessage extends StandaloneRestProtocolMessageCompanion {
protected override def newMessage(): StandaloneRestProtocolMessage =
private[spark] object DriverStatusResponseMessage extends SubmitRestProtocolMessageCompanion {
protected override def newMessage(): SubmitRestProtocolMessage =
new DriverStatusResponseMessage
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
protected override def fieldWithName(field: String): SubmitRestProtocolField =
DriverStatusResponseField.withName(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
/**
* A field used in a ErrorMessage.
*/
private[spark] abstract class ErrorField extends StandaloneRestProtocolField
private[spark] object ErrorField extends StandaloneRestProtocolFieldCompanion {
private[spark] abstract class ErrorField extends SubmitRestProtocolField
private[spark] object ErrorField extends SubmitRestProtocolFieldCompanion {
case object ACTION extends ErrorField
case object SPARK_VERSION extends ErrorField
case object MESSAGE extends ErrorField
Expand All @@ -30,15 +30,15 @@ private[spark] object ErrorField extends StandaloneRestProtocolFieldCompanion {
}

/**
* An error message exchanged in the standalone REST protocol.
* An error message exchanged in the stable application submission protocol.
*/
private[spark] class ErrorMessage extends StandaloneRestProtocolMessage(
StandaloneRestProtocolAction.ERROR,
private[spark] class ErrorMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.ERROR,
ErrorField.ACTION,
ErrorField.requiredFields)

private[spark] object ErrorMessage extends StandaloneRestProtocolMessageCompanion {
protected override def newMessage(): StandaloneRestProtocolMessage = new ErrorMessage
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
private[spark] object ErrorMessage extends SubmitRestProtocolMessageCompanion {
protected override def newMessage(): SubmitRestProtocolMessage = new ErrorMessage
protected override def fieldWithName(field: String): SubmitRestProtocolField =
ErrorField.withName(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
/**
* A field used in a KillDriverRequestMessage.
*/
private[spark] abstract class KillDriverRequestField extends StandaloneRestProtocolField
private[spark] object KillDriverRequestField extends StandaloneRestProtocolFieldCompanion {
private[spark] abstract class KillDriverRequestField extends SubmitRestProtocolField
private[spark] object KillDriverRequestField extends SubmitRestProtocolFieldCompanion {
case object ACTION extends KillDriverRequestField
case object SPARK_VERSION extends KillDriverRequestField
case object MESSAGE extends KillDriverRequestField
Expand All @@ -32,16 +32,16 @@ private[spark] object KillDriverRequestField extends StandaloneRestProtocolField
}

/**
* A request sent to the standalone Master to kill a driver.
* A request sent to the cluster manager to kill a driver.
*/
private[spark] class KillDriverRequestMessage extends StandaloneRestProtocolMessage(
StandaloneRestProtocolAction.KILL_DRIVER_REQUEST,
private[spark] class KillDriverRequestMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.KILL_DRIVER_REQUEST,
KillDriverRequestField.ACTION,
KillDriverRequestField.requiredFields)

private[spark] object KillDriverRequestMessage extends StandaloneRestProtocolMessageCompanion {
protected override def newMessage(): StandaloneRestProtocolMessage =
private[spark] object KillDriverRequestMessage extends SubmitRestProtocolMessageCompanion {
protected override def newMessage(): SubmitRestProtocolMessage =
new KillDriverRequestMessage
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
protected override def fieldWithName(field: String): SubmitRestProtocolField =
KillDriverRequestField.withName(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.deploy.rest
/**
* A field used in a KillDriverResponseMessage.
*/
private[spark] abstract class KillDriverResponseField extends StandaloneRestProtocolField
private[spark] object KillDriverResponseField extends StandaloneRestProtocolFieldCompanion {
private[spark] abstract class KillDriverResponseField extends SubmitRestProtocolField
private[spark] object KillDriverResponseField extends SubmitRestProtocolFieldCompanion {
case object ACTION extends KillDriverResponseField
case object SPARK_VERSION extends KillDriverResponseField
case object MESSAGE extends KillDriverResponseField
Expand All @@ -33,16 +33,16 @@ private[spark] object KillDriverResponseField extends StandaloneRestProtocolFiel
}

/**
* A message sent from the standalone Master in response to a KillDriverResponseMessage.
* A message sent from the cluster manager in response to a KillDriverResponseMessage.
*/
private[spark] class KillDriverResponseMessage extends StandaloneRestProtocolMessage(
StandaloneRestProtocolAction.KILL_DRIVER_RESPONSE,
private[spark] class KillDriverResponseMessage extends SubmitRestProtocolMessage(
SubmitRestProtocolAction.KILL_DRIVER_RESPONSE,
KillDriverResponseField.ACTION,
KillDriverResponseField.requiredFields)

private[spark] object KillDriverResponseMessage extends StandaloneRestProtocolMessageCompanion {
protected override def newMessage(): StandaloneRestProtocolMessage =
private[spark] object KillDriverResponseMessage extends SubmitRestProtocolMessageCompanion {
protected override def newMessage(): SubmitRestProtocolMessage =
new KillDriverResponseMessage
protected override def fieldWithName(field: String): StandaloneRestProtocolField =
protected override def fieldWithName(field: String): SubmitRestProtocolField =
KillDriverResponseField.withName(field)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,22 @@

package org.apache.spark.deploy.rest

import java.io.DataOutputStream
import java.net.URL
import java.net.HttpURLConnection

import scala.io.Source

import com.google.common.base.Charsets

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.SparkSubmitArguments
import org.apache.spark.util.Utils

/**
* A client that submits Spark applications using a stable REST protocol in standalone
* cluster mode. This client is intended to communicate with the StandaloneRestServer.
* A client that submits Spark applications to the standalone Master using a stable
* REST protocol. This client is intended to communicate with the StandaloneRestServer,
* and currently only used in cluster mode.
*/
private[spark] class StandaloneRestClient {

def submitDriver(args: SparkSubmitArguments): Unit = {
validateSubmitArguments(args)
val url = getHttpUrl(args.master)
val request = constructSubmitRequest(args)
val response = sendHttp(url, request)
println(response.toJson)
}

def killDriver(master: String, driverId: String): Unit = {
validateMaster(master)
val url = getHttpUrl(master)
val request = constructKillRequest(master, driverId)
val response = sendHttp(url, request)
println(response.toJson)
}

def requestDriverStatus(master: String, driverId: String): Unit = {
validateMaster(master)
val url = getHttpUrl(master)
val request = constructStatusRequest(master, driverId)
val response = sendHttp(url, request)
println(response.toJson)
}
private[spark] class StandaloneRestClient extends SubmitRestClient {

/**
* Construct a submit driver request message.
*/
private def constructSubmitRequest(args: SparkSubmitArguments): SubmitDriverRequestMessage = {
/** Construct a submit driver request message. */
override protected def constructSubmitRequest(
args: SparkSubmitArguments): SubmitDriverRequestMessage = {
import SubmitDriverRequestField._
val driverMemory = Option(args.driverMemory)
.map { m => Utils.memoryStringToMb(m).toString }
Expand All @@ -78,7 +48,6 @@ private[spark] class StandaloneRestClient {
.setFieldIfNotNull(MAIN_CLASS, args.mainClass)
.setFieldIfNotNull(JARS, args.jars)
.setFieldIfNotNull(FILES, args.files)
.setFieldIfNotNull(PY_FILES, args.pyFiles)
.setFieldIfNotNull(DRIVER_MEMORY, driverMemory)
.setFieldIfNotNull(DRIVER_CORES, args.driverCores)
.setFieldIfNotNull(DRIVER_EXTRA_JAVA_OPTIONS, args.driverExtraJavaOptions)
Expand All @@ -97,10 +66,8 @@ private[spark] class StandaloneRestClient {
message.validate()
}

/**
* Construct a kill driver request message.
*/
private def constructKillRequest(
/** Construct a kill driver request message. */
override protected def constructKillRequest(
master: String,
driverId: String): KillDriverRequestMessage = {
import KillDriverRequestField._
Expand All @@ -111,10 +78,8 @@ private[spark] class StandaloneRestClient {
.validate()
}

/**
* Construct a driver status request message.
*/
private def constructStatusRequest(
/** Construct a driver status request message. */
override protected def constructStatusRequest(
master: String,
driverId: String): DriverStatusRequestMessage = {
import DriverStatusRequestField._
Expand All @@ -125,67 +90,23 @@ private[spark] class StandaloneRestClient {
.validate()
}

/**
* Send the provided request in an HTTP message to the given URL.
* Return the response received from the REST server.
*/
private def sendHttp(
url: URL,
request: StandaloneRestProtocolMessage): StandaloneRestProtocolMessage = {
val conn = url.openConnection().asInstanceOf[HttpURLConnection]
conn.setRequestMethod("POST")
conn.setRequestProperty("Content-Type", "application/json")
conn.setRequestProperty("charset", "utf-8")
conn.setDoOutput(true)
println("Sending this JSON blob to server:\n" + request.toJson)
val content = request.toJson.getBytes(Charsets.UTF_8)
val out = new DataOutputStream(conn.getOutputStream)
out.write(content)
out.close()
val response = Source.fromInputStream(conn.getInputStream).mkString
StandaloneRestProtocolMessage.fromJson(response)
}

/**
* Throw an exception if this is not standalone cluster mode.
*/
private def validateSubmitArguments(args: SparkSubmitArguments): Unit = {
validateMaster(args.master)
validateDeployMode(args.deployMode)
}

/**
* Throw an exception if this is not standalone mode.
*/
private def validateMaster(master: String): Unit = {
/** Throw an exception if this is not standalone mode. */
override protected def validateMaster(master: String): Unit = {
if (!master.startsWith("spark://")) {
throw new IllegalArgumentException("This REST client is only supported in standalone mode.")
}
}

/**
* Throw an exception if this is not cluster deploy mode.
*/
private def validateDeployMode(deployMode: String): Unit = {
/** Throw an exception if this is not cluster deploy mode. */
override protected def validateDeployMode(deployMode: String): Unit = {
if (deployMode != "cluster") {
throw new IllegalArgumentException("This REST client is only supported in cluster mode.")
}
}

/**
* Extract the URL portion of the master address.
*/
private def getHttpUrl(master: String): URL = {
/** Extract the URL portion of the master address. */
override protected def getHttpUrl(master: String): URL = {
validateMaster(master)
new URL("http://" + master.stripPrefix("spark://"))
}
}

object StandaloneRestClient {
def main(args: Array[String]): Unit = {
assert(args.length > 0)
//val client = new StandaloneRestClient
//client.submitDriver("spark://" + args(0))
println("Done.")
}
}
Loading

0 comments on commit 6ff088d

Please sign in to comment.