Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode #28258

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ private class ClientEndpoint(

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
private val waitAppCompletion = conf.getBoolean("spark.submit.waitAppCompletion", false)
private val REPORT_DRIVER_STATUS_INTERVAL = 1000


private def getProperty(key: String, conf: SparkConf): Option[String] = {
sys.props.get(key).orElse(conf.getOption(key))
Expand Down Expand Up @@ -124,38 +127,57 @@ private class ClientEndpoint(
}
}

/* Find out driver status then exit the JVM */
/**
* Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors
* the application until it finishes, fails or is killed.
*/
def pollAndReportStatus(driverId: String): Unit = {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
// Exception, if present
statusResponse.exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
System.exit(0)
while (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could block ClientEndpoint when waitAppCompletion=true?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, please pay attention to my comment here. I believe the current implementation could block ClientEndpoint because it's a ThreadSafeRpcEndpoint. When enabling waitAppCompletion, ClientEndpoint would actually keep handling message SubmitDriverResponse until the application finished. So, ClientEndpoint is unable to handle other messages, e.g. RemoteProcessDisconnected, RemoteProcessConnectionError, at the same time, which breaks the current behaviour. Furthermore, it could also block messages from backup masters, though not fatal in this case.

Copy link
Contributor Author

@akshatb1 akshatb1 May 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Apologies, somehow missed this comment. How can I quickly verify this? I am looking into this. Could you kindly suggest if you have any pointers on how this can be fixed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can periodically send a message (e.g. we can send it after Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) ) to ClientEndpoint itself to check driver's status.

Copy link
Member

@Ngone51 Ngone51 May 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possible way to verify this is to launch a long running application and then shutdown Master at the middle and see whether onDisconnected is called immediately.

Copy link
Contributor Author

@akshatb1 akshatb1 May 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 I launched a long-running application with flag enabled and disabled and stopped the Spark Master in middle. In both cases, I see the following in driver logs. I couldn't find any difference in logs.

20/05/09 13:42:59 WARN StandaloneAppClient$ClientEndpoint: Connection to Akshats-MacBook-Pro.local:7077 failed; waiting for master to reconnect...
20/05/09 13:42:59 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...

onDisconnected method from StandaloneAppClient.scala is getting called:
image

Copy link
Contributor Author

@akshatb1 akshatb1 May 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can periodically send a message (e.g. we can send it after Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) ) to ClientEndpoint itself to check driver's status.

@Ngone51 Thanks for this suggestion. Just to confirm, are you suggesting to do this in line # 180 in pollAndReportStatus method? Or should we handle this outside?
image

Copy link
Member

@Ngone51 Ngone51 May 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In both cases, I see the following in driver logs. I couldn't find any difference in logs.

Hi @akshatb1 , logs are from StandaloneAppClient$ClientEndpoint and StandaloneSchedulerBackend rather than org.apache.spark.deploy.ClientEndpoint. Can you check again?

Just to confirm, are you suggesting to do this in lin # 180 in pollAndReportStatus method? Or should we handle this outside?

I think just after line 180 should be ok.

Copy link
Contributor Author

@akshatb1 akshatb1 May 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Yes, not sure about the logs from StandaloneAppClient$ClientEndpoint. I will check again. This is the command I am using to submit jobs:./bin/spark-submit --master spark://127.0.0.1:7077 --conf spark.standalone.submit.waitAppCompletion=true --deploy-mode cluster --class org.apache.spark.examples.SparkPi examples/target/original-spark-examples_2.12-3.1.0-SNAPSHOT.jar

Copy link
Contributor Author

@akshatb1 akshatb1 May 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Ngone51 , I tried putting periodic messages in the loop in pollAndReportStatus but it doesn't seem to receive message until the loop sending is completed (checked with a for loop, will be stuck in an infinite loop in case of current while(true) loop). Hence, I have implemented it based on sending an async message from the pollAndReportStatus method and if need be, send the message again or exit while receiving the message. Please let me know what you think of this approach. I have tested for the common scenarios and I could see onNetworkError method getting called on shutting down Spark master when an application is running.

val statusResponse =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated the indentation in the latest commit.

activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
// Exception, if present
statusResponse.exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
if (!waitAppCompletion) {
logInfo(s"No exception found and waitAppCompletion is false, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else if (statusResponse.state.get == DriverState.FINISHED ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use a match statement to simplify the next 10 lines or so

Copy link
Contributor Author

@akshatb1 akshatb1 May 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated in the latest commit.

statusResponse.state.get == DriverState.FAILED ||
statusResponse.state.get == DriverState.ERROR ||
statusResponse.state.get == DriverState.KILLED) {
logInfo(s"waitAppCompletion is true, state is ${statusResponse.state.get}, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
logTrace(s"waitAppCompletion is true, state is ${statusResponse.state.get}," +
akshatb1 marked this conversation as resolved.
Show resolved Hide resolved
s"continue monitoring driver status.")
}
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
}
}

override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(master, success, driverId, message) =>
Expand Down
10 changes: 10 additions & 0 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ SPARK_MASTER_OPTS supports the following system properties:
</td>
<td>1.6.3</td>
</tr>
<tr>
<td><code>spark.submit.waitAppCompletion</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the wrong place; this shows properties you can set on the master, but this is an app setting. Right? or it should be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, it's an app setting. I couldn't find a section for application settings in spark-standalone.md. Could you please suggest where we can add it? https://spark.apache.org/docs/latest/spark-standalone.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should it be added in the generic configuration application-properties section here: https://spark.apache.org/docs/latest/configuration.html#application-properties?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, maybe configuration.md, but it's specific to standalone.
I suppose you could mark the existing configs as 'cluster configs' and make a new table of 'client configs'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
How about updating in this Application Properties section and making the config name as spark.submit.standalone.waitAppCompletion as suggested by @HeartSaVioR . Asking this since changing existing configs to cluster config may confuse w.r.t. existing documentation and general configuration has specific mode properties as well. Please let me know what you think. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, there's not a great solution. I think I would add a new section in the standalone docs page, after "Connecting an Application to the Cluster", called "Spark Properties", and make a table with this single property. I'd call it spark.standalone.submit.waitAppCompletion" for full consistency with YARN et al.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, will do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the documentation file, please review.

<td><code>true</code></td>
akshatb1 marked this conversation as resolved.
Show resolved Hide resolved
<td>
In Standalone cluster mode, controls whether the client waits to exit until the application completes.
If set to <code>true</code>, the client process will stay alive reporting the application's status.
Otherwise, the client process will exit after submission.
</td>
<td>3.1.0</td>
</tr>
<tr>
<td><code>spark.worker.timeout</code></td>
<td>60</td>
Expand Down