Skip to content

Commit

Permalink
[SPARK-31486][CORE] spark.submit.waitAppCompletion flag to control sp…
Browse files Browse the repository at this point in the history
…ark-submit exit in Standalone Cluster Mode

### What changes were proposed in this pull request?
These changes implement an application wait mechanism which will allow spark-submit to wait until the application finishes in Standalone Spark Mode. This will delay the exit of spark-submit JVM until the job is completed. This implementation will keep monitoring the application until it is either finished, failed or killed. This will be controlled via a flag (spark.submit.waitForCompletion) which will be set to false by default.

### Why are the changes needed?
Currently, Livy API for Standalone Cluster Mode doesn't know when the job has finished. If this flag is enabled, this can be used by Livy API (/batches/{batchId}/state) to find out when the application has finished/failed. This flag is Similar to spark.yarn.submit.waitAppCompletion.

### Does this PR introduce any user-facing change?

Yes, this PR introduces a new flag but it will be disabled by default.

### How was this patch tested?
Couldn't implement unit tests since the pollAndReportStatus method has System.exit() calls. Please provide any suggestions.
Tested spark-submit locally for the following scenarios:
1. With the flag enabled, spark-submit exits once the job is finished.
2. With the flag enabled and job failed, spark-submit exits when the job fails.
3. With the flag disabled, spark-submit exists post submitting the job (existing behavior).
4. Existing behavior is unchanged when the flag is not added explicitly.

Closes #28258 from akshatb1/master.

Lead-authored-by: Akshat Bordia <akshat.bordia31@gmail.com>
Co-authored-by: Akshat Bordia <akshat.bordia@citrix.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
2 people authored and srowen committed Jun 9, 2020
1 parent ca2cfd4 commit 6befb2d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 26 deletions.
95 changes: 69 additions & 26 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy

import java.util.concurrent.TimeUnit

import scala.collection.mutable.HashSet
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
Expand All @@ -27,6 +29,7 @@ import org.apache.log4j.Logger
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
Expand Down Expand Up @@ -61,6 +64,12 @@ private class ClientEndpoint(

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion",
false)
private val REPORT_DRIVER_STATUS_INTERVAL = 10000
private var submittedDriverID = ""
private var driverStatusReported = false


private def getProperty(key: String, conf: SparkConf): Option[String] = {
sys.props.get(key).orElse(conf.getOption(key))
Expand Down Expand Up @@ -107,8 +116,13 @@ private class ClientEndpoint(

case "kill" =>
val driverId = driverArgs.driverId
submittedDriverID = driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
logInfo("... waiting before polling master for driver state")
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
monitorDriverStatus()
}, 5000, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)
}

/**
Expand All @@ -124,58 +138,87 @@ private class ClientEndpoint(
}
}

/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String): Unit = {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
private def monitorDriverStatus(): Unit = {
if (submittedDriverID != "") {
asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID))
}
}

/**
* Processes and reports the driver status then exit the JVM if the
* waitAppCompletion is set to false, else reports the driver status
* if debug logs are enabled.
*/

def reportDriverStatus(
found: Boolean,
state: Option[DriverState],
workerId: Option[String],
workerHostPort: Option[String],
exception: Option[Exception]): Unit = {
if (found) {
// Using driverStatusReported to avoid writing following
// logs again when waitAppCompletion is set to true
if (!driverStatusReported) {
driverStatusReported = true
logInfo(s"State of $submittedDriverID is ${state.get}")
// Worker node, if present
(workerId, workerHostPort, state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
}
// Exception, if present
statusResponse.exception match {
exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
System.exit(0)
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
s"exiting spark-submit JVM.")
System.exit(0)
case _ =>
if (!waitAppCompletion) {
logInfo(s"spark-submit not configured to wait for completion, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
s"continue monitoring driver status.")
}
}
}
} else {
logError(s"ERROR: Cluster master did not recognize $submittedDriverID")
System.exit(-1)
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
}

override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId.get)
submittedDriverID = driverId.get
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}


case KillDriverResponse(master, driverId, success, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
pollAndReportStatus(driverId)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}

case DriverStatusResponse(found, state, workerId, workerHostPort, exception) =>
reportDriverStatus(found, state, workerId, workerHostPort, exception)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
19 changes: 19 additions & 0 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,25 @@ To run an interactive Spark shell against the cluster, run the following command

You can also pass an option `--total-executor-cores <numCores>` to control the number of cores that spark-shell uses on the cluster.

# Client Properties

Spark applications supports the following configuration properties specific to standalone mode:

<table class="table">
<tr><th style="width:21%">Property Name</th><th>Default Value</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.standalone.submit.waitAppCompletion</code></td>
<td><code>false</code></td>
<td>
In standalone cluster mode, controls whether the client waits to exit until the application completes.
If set to <code>true</code>, the client process will stay alive polling the driver's status.
Otherwise, the client process will exit after submission.
</td>
<td>3.1.0</td>
</tr>
</table>


# Launching Spark Applications

The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to
Expand Down

0 comments on commit 6befb2d

Please sign in to comment.