Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode #28258

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy

import java.util.concurrent.TimeUnit

import scala.collection.mutable.HashSet
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
Expand Down Expand Up @@ -61,6 +63,11 @@ private class ClientEndpoint(

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion",
srowen marked this conversation as resolved.
Show resolved Hide resolved
false)
private val REPORT_DRIVER_STATUS_INTERVAL = 10000
private var submittedDriverID = ""


private def getProperty(key: String, conf: SparkConf): Option[String] = {
sys.props.get(key).orElse(conf.getOption(key))
Expand Down Expand Up @@ -105,6 +112,10 @@ private class ClientEndpoint(
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))

forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
MonitorDriverStatus()
}, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)

case "kill" =>
val driverId = driverArgs.driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
Expand All @@ -123,16 +134,24 @@ private class ClientEndpoint(
}(forwardMessageExecutionContext)
}
}
private def MonitorDriverStatus(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: need empty line above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: method should start with lowercase.

if (submittedDriverID != "") {
asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID))
}
}

/* Find out driver status then exit the JVM */
/**
* Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors
* the application until it finishes, fails or is killed.
*/
def pollAndReportStatus(driverId: String): Unit = {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
Expand All @@ -148,20 +167,27 @@ private class ClientEndpoint(
e.printStackTrace()
System.exit(-1)
case _ =>
System.exit(0)
if (!waitAppCompletion) {
logInfo(s"spark-submit not configured to wait for completion, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
logInfo(s"spark-submit is configured to wait for completion, " +
s"continue monitoring driver status.")
}
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
}

override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
submittedDriverID = driverId.get
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
Expand All @@ -176,6 +202,22 @@ private class ClientEndpoint(
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}

case DriverStatusResponse(found, state, _, _, _) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we could do some refactor on pollAndReportStatus in order to reduce some duplicate logic.
For example, we can only call pollAndReportStatus here, and remove other invocations in SubmitDriverResponse/ KillDriverResponse. And, of course, the pollAndReportStatus(it also needs a new name) will not poll the status anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks for your feedback. pollAndReportStatus is only being used the first time after submitting or killing drivers. I am not sure which is the duplicate logic you are referring to. Also, pollAndReportStatus is only polling the driver status and handling the response. If we removing polling from that, what logic should be handled there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we removing polling from that, what logic should be handled there?

we use this:

forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
          MonitorDriverStatus()
        }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)

(but the initial delay need to change)

in this way, submitting or killing drivers will still use it only for one time when waitAppCompletion=false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduling to monitor driver status is done only in case of submit and not in kill as of now. So we may need to explicitly send a message to monitor driver status after 5 seconds delay in case of kill.

Copy link
Member

@Ngone51 Ngone51 May 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why I said we need to change the delay (e.g. 5s) instead of 0 for both submiting and killing.

Copy link
Contributor Author

@akshatb1 akshatb1 May 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can change the delay to 5 seconds to keep it consistent with current logic. My question is that should we add the following block in case "kill" => as well or should we just monitor with a single message instead of scheduled messages?
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { MonitorDriverStatus() }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to add forwardMessageThread.scheduleAtFixedRate(...) into any case branches but just put it as a global one(just do what you do now). I think it still works for case "kill".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the case "launch" as of now. I will move it to a global place and refactor the code. Thanks for your suggestions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry miss that. yea, thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 I have refactored the code as suggested. Kindly review it again. Thanks.

if (found) {
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
logInfo(s"State of driver $submittedDriverID is ${state.get}, " +
s"exiting spark-submit JVM.")
System.exit(0)
case _ =>
logDebug(s"State of driver $submittedDriverID is ${state.get}, " +
s"continue monitoring driver status.")
}
} else {
System.exit(-1)
}
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
19 changes: 19 additions & 0 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,25 @@ To run an interactive Spark shell against the cluster, run the following command

You can also pass an option `--total-executor-cores <numCores>` to control the number of cores that spark-shell uses on the cluster.

# Client Properties

Spark applications supports the following configuration properties specific to standalone mode:

<table class="table">
akshatb1 marked this conversation as resolved.
Show resolved Hide resolved
<tr><th style="width:21%">Property Name</th><th>Default Value</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.standalone.submit.waitAppCompletion</code></td>
<td><code>false</code></td>
<td>
In standalone cluster mode, controls whether the client waits to exit until the application completes.
If set to <code>true</code>, the client process will stay alive polling the driver's status.
Otherwise, the client process will exit after submission.
</td>
<td>3.1.0</td>
</tr>
</table>


# Launching Spark Applications

The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to
Expand Down