-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode #28258
Conversation
…rol if the spark-submit process will wait to exit until the application finishes/fails/errors out. This flag is set to false by default where the application will exit after submission.
So |
@Ngone51 |
I see, thank you for your explanation. |
Could someone kindly review these changes? CC: @cloud-fan @srowen @Ngone51 @HeartSaVioR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not so sure if there's some reason this wasn't implemented before. It exists for YARN, K8S, so sounds reasonable. My main concern is whether this is app or cluster scoped?
logInfo(s"No exception found and waitAppCompletion is false, " + | ||
s"exiting spark-submit JVM.") | ||
System.exit(0) | ||
} else if (statusResponse.state.get == DriverState.FINISHED || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use a match statement to simplify the next 10 lines or so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated in the latest commit.
docs/spark-standalone.md
Outdated
@@ -240,6 +240,16 @@ SPARK_MASTER_OPTS supports the following system properties: | |||
</td> | |||
<td>1.6.3</td> | |||
</tr> | |||
<tr> | |||
<td><code>spark.submit.waitAppCompletion</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the wrong place; this shows properties you can set on the master, but this is an app setting. Right? or it should be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, it's an app setting. I couldn't find a section for application settings in spark-standalone.md. Could you please suggest where we can add it? https://spark.apache.org/docs/latest/spark-standalone.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or should it be added in the generic configuration application-properties section here: https://spark.apache.org/docs/latest/configuration.html#application-properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, maybe configuration.md, but it's specific to standalone.
I suppose you could mark the existing configs as 'cluster configs' and make a new table of 'client configs'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about updating in this Application Properties section and making the config name as spark.submit.standalone.waitAppCompletion as suggested by @HeartSaVioR . Asking this since changing existing configs to cluster config may confuse w.r.t. existing documentation and general configuration has specific mode properties as well. Please let me know what you think. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, there's not a great solution. I think I would add a new section in the standalone docs page, after "Connecting an Application to the Cluster", called "Spark Properties", and make a table with this single property. I'd call it spark.standalone.submit.waitAppCompletion" for full consistency with YARN et al.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the documentation file, please review.
I'm not that familiar with standalone mode, so assume we would like to make it behave similar with yarn-cluster. How it behaves if supervise option is specified? In yarn-cluster mode it waits till the application has been killed - submit process would wait even the case of relaunching of AM. Does it work like so? And given the flag only affects the standalone mode and yarn has the same flag having prefix, it would be better to add prefix ( |
System.exit(-1) | ||
case _ => | ||
System.exit(0) | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could block ClientEndpoint
when waitAppCompletion=true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, please pay attention to my comment here. I believe the current implementation could block ClientEndpoint
because it's a ThreadSafeRpcEndpoint
. When enabling waitAppCompletion
, ClientEndpoint
would actually keep handling message SubmitDriverResponse
until the application finished. So, ClientEndpoint
is unable to handle other messages, e.g. RemoteProcessDisconnected
, RemoteProcessConnectionError
, at the same time, which breaks the current behaviour. Furthermore, it could also block messages from backup masters, though not fatal in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 Apologies, somehow missed this comment. How can I quickly verify this? I am looking into this. Could you kindly suggest if you have any pointers on how this can be fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can periodically send a message (e.g. we can send it after Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
) to ClientEndpoint
itself to check driver's status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A possible way to verify this is to launch a long running application and then shutdown Master at the middle and see whether onDisconnected
is called immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 I launched a long-running application with flag enabled and disabled and stopped the Spark Master in middle. In both cases, I see the following in driver logs. I couldn't find any difference in logs.
20/05/09 13:42:59 WARN StandaloneAppClient$ClientEndpoint: Connection to Akshats-MacBook-Pro.local:7077 failed; waiting for master to reconnect...
20/05/09 13:42:59 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
onDisconnected
method from StandaloneAppClient.scala
is getting called:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can periodically send a message (e.g. we can send it after
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)
) toClientEndpoint
itself to check driver's status.
@Ngone51 Thanks for this suggestion. Just to confirm, are you suggesting to do this in line # 180 in pollAndReportStatus method? Or should we handle this outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In both cases, I see the following in driver logs. I couldn't find any difference in logs.
Hi @akshatb1 , logs are from StandaloneAppClient$ClientEndpoint
and StandaloneSchedulerBackend
rather than org.apache.spark.deploy.ClientEndpoint
. Can you check again?
Just to confirm, are you suggesting to do this in lin # 180 in pollAndReportStatus method? Or should we handle this outside?
I think just after line 180 should be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 Yes, not sure about the logs from StandaloneAppClient$ClientEndpoint
. I will check again. This is the command I am using to submit jobs:./bin/spark-submit --master spark://127.0.0.1:7077 --conf spark.standalone.submit.waitAppCompletion=true --deploy-mode cluster --class org.apache.spark.examples.SparkPi examples/target/original-spark-examples_2.12-3.1.0-SNAPSHOT.jar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Ngone51 , I tried putting periodic messages in the loop in pollAndReportStatus
but it doesn't seem to receive message until the loop sending is completed (checked with a for
loop, will be stuck in an infinite loop in case of current while(true)
loop). Hence, I have implemented it based on sending an async message from the pollAndReportStatus
method and if need be, send the message again or exit while receiving the message. Please let me know what you think of this approach. I have tested for the common scenarios and I could see onNetworkError
method getting called on shutting down Spark master when an application is running.
@srowen This will be app scoped. |
@srowen Thanks for reviewing. I have addressed your comment. Kindly review it and please let me know your comments. |
@HeartSaVioR Yes, that's correct. The behavior is same as yarn-cluster mode. If the supervise option is specified, it will wait for the driver to be re-launched and continue monitoring. Updated the flag name to spark.submit.standalone.waitAppCompletion. |
docs/spark-standalone.md
Outdated
<td><code>spark.standalone.submit.waitAppCompletion</code></td> | ||
<td><code>false</code></td> | ||
<td> | ||
In Standalone cluster mode, controls whether the client waits to exit until the application completes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standalone -> standalone
reporting -> polling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in the latest commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking reasonable to me
Jenkins test this please |
Oh, is there any simple test that can be added to verify it waits? |
System.exit(-1) | ||
case _ => | ||
System.exit(0) | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, please pay attention to my comment here. I believe the current implementation could block ClientEndpoint
because it's a ThreadSafeRpcEndpoint
. When enabling waitAppCompletion
, ClientEndpoint
would actually keep handling message SubmitDriverResponse
until the application finished. So, ClientEndpoint
is unable to handle other messages, e.g. RemoteProcessDisconnected
, RemoteProcessConnectionError
, at the same time, which breaks the current behaviour. Furthermore, it could also block messages from backup masters, though not fatal in this case.
state.get match { | ||
case DriverState.FINISHED | DriverState.FAILED | | ||
DriverState.ERROR | DriverState.KILLED => | ||
logInfo(s"State of $submittedDriverID is ${state.get}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s"State of driver $submittedDriverID ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated in the latest commit.
System.exit(0) | ||
case _ => | ||
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) | ||
logInfo(s"State of $submittedDriverID is ${state.get}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s"State of driver $submittedDriverID ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since status polling will happen every second, I'm afraid logs can be too verbose. We can log it after a constant polling times, e.g. log every 60 times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would produce too much logs, please consider using 'logDebug'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have changed it to use logDebug
.
s"exiting spark-submit JVM.") | ||
System.exit(0) | ||
case _ => | ||
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread.sleep
could still has the same issue, imaging the network drop happens during sleeping. We should control the period sending logic out of receive
. We could mimic CoarseGrainedSchedulerBackend
to do the same work here:
spark/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Lines 137 to 139 in 9faad07
reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { | |
Option(self).foreach(_.send(ReviveOffers)) | |
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 Thanks for reviewing. I have updated to use the task scheduler to do the same. Could you kindly review it again and please let me know your comments?
docs/spark-standalone.md
Outdated
<td><code>false</code></td> | ||
<td> | ||
In standalone cluster mode, controls whether the client waits to exit until the application completes. | ||
If set to <code>true</code>, the client process will stay alive polling the application's status. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: application's
or driver
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated to driver's
.
System.exit(-1) | ||
case _ => | ||
System.exit(0) | ||
val statusResponse = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated the indentation in the latest commit.
System.exit(0) | ||
case _ => | ||
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) | ||
logInfo(s"State of $submittedDriverID is ${state.get}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would produce too much logs, please consider using 'logDebug'
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) | ||
logInfo(s"State of $submittedDriverID is ${state.get}, " + | ||
s"continue monitoring driver status.") | ||
asyncSendToMasterAndForwardReply[DriverStatusResponse]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you submit too much applications at the same time, this could lead to heavy communication burden to the driver. I would suggest check less frequent (like increasing the interval to 10s)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 Thanks for reviewing. I have changed it to 10 seconds and took care of your other comments. Kindly review the PR again.
@Ngone51 @jiangxb1987 Gentle ping, I have addressed your comments. Please review when you get a chance. Thanks. |
@@ -123,16 +134,24 @@ private class ClientEndpoint( | |||
}(forwardMessageExecutionContext) | |||
} | |||
} | |||
private def MonitorDriverStatus(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: need empty line above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: method should start with lowercase.
@@ -176,6 +202,22 @@ private class ClientEndpoint( | |||
} else if (!Utils.responseFromBackup(message)) { | |||
System.exit(-1) | |||
} | |||
|
|||
case DriverStatusResponse(found, state, _, _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if we could do some refactor on pollAndReportStatus
in order to reduce some duplicate logic.
For example, we can only call pollAndReportStatus
here, and remove other invocations in SubmitDriverResponse
/ KillDriverResponse
. And, of course, the pollAndReportStatus
(it also needs a new name) will not poll the status anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 Thanks for your feedback. pollAndReportStatus
is only being used the first time after submitting or killing drivers. I am not sure which is the duplicate logic you are referring to. Also, pollAndReportStatus
is only polling the driver status and handling the response. If we removing polling from that, what logic should be handled there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we removing polling from that, what logic should be handled there?
we use this:
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
MonitorDriverStatus()
}, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)
(but the initial delay need to change)
in this way, submitting or killing drivers will still use it only for one time when waitAppCompletion=false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scheduling to monitor driver status is done only in case of submit and not in kill as of now. So we may need to explicitly send a message to monitor driver status after 5 seconds delay in case of kill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's why I said we need to change the delay (e.g. 5s) instead of 0 for both submiting and killing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we can change the delay to 5 seconds to keep it consistent with current logic. My question is that should we add the following block in case "kill" =>
as well or should we just monitor with a single message instead of scheduled messages?
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { MonitorDriverStatus() }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to add forwardMessageThread.scheduleAtFixedRate(...)
into any case
branches but just put it as a global one(just do what you do now). I think it still works for case "kill"
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's in the case "launch" as of now. I will move it to a global place and refactor the code. Thanks for your suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry miss that. yea, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 I have refactored the code as suggested. Kindly review it again. Thanks.
@Ngone51 Gentle reminder, I have refactored as suggested, please review when you get a chance. Thanks! |
def reportDriverStatus(found: Boolean, state: Option[DriverState], | ||
workerId: Option[String], | ||
workerHostPort: Option[String], | ||
exception: Option[Exception]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
def reportDriverStatus(found: Boolean, state: Option[DriverState], | |
workerId: Option[String], | |
workerHostPort: Option[String], | |
exception: Option[Exception]): Unit = { | |
def reportDriverStatus( | |
found: Boolean, | |
state: Option[DriverState], | |
workerId: Option[String], | |
workerHostPort: Option[String], | |
exception: Option[Exception]): Unit = { |
basically, looks ok to me, cc @prakharjain09 @jiangxb1987 @srowen |
Thanks, @Ngone51 for reviewing. I have taken care of the last comment (indentation) as well. CC: @srowen @jiangxb1987 @prakharjain09 |
Looks good to me. |
@srowen : Gentle reminder, kindly take a look when you get a chance. Thanks. |
@Ngone51 any final thoughts? |
LGTM. I also tested manually with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Jenkins retest this please |
Test build #123637 has finished for PR 28258 at commit
|
Merged to master |
What changes were proposed in this pull request?
These changes implement an application wait mechanism which will allow spark-submit to wait until the application finishes in Standalone Spark Mode. This will delay the exit of spark-submit JVM until the job is completed. This implementation will keep monitoring the application until it is either finished, failed or killed. This will be controlled via a flag (spark.submit.waitForCompletion) which will be set to false by default.
Why are the changes needed?
Currently, Livy API for Standalone Cluster Mode doesn't know when the job has finished. If this flag is enabled, this can be used by Livy API (/batches/{batchId}/state) to find out when the application has finished/failed. This flag is Similar to spark.yarn.submit.waitAppCompletion.
Does this PR introduce any user-facing change?
Yes, this PR introduces a new flag but it will be disabled by default.
How was this patch tested?
Couldn't implement unit tests since the pollAndReportStatus method has System.exit() calls. Please provide any suggestions.
Tested spark-submit locally for the following scenarios: