Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1686: keep schedule() calling in the main thread #639

Closed
wants to merge 5 commits into from

Conversation

CodingCat
Copy link
Contributor

https://issues.apache.org/jira/browse/SPARK-1686

moved from original JIRA (by @markhamstra):

In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties.

There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread.

In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@markhamstra
Copy link
Contributor

Okay, first, I didn't start work on this JIRA myself because I haven't had the time to come to a complete understanding of the current and intended functionality, nor of the scope of the problem from running completeRecovery() on an Akka scheduler thread, so you shouldn't assume that the JIRA fully describes the problem or that solving just what it describes constitutes a complete fix. For example, any exception currently thrown by completeRecovery() when run by the scheduler will not be caught because of SPARK-1620, so a complete fix to the use of the scheduler from case ElectedLeader will also have to incorporate whatever we end up doing to fix that issue.

Second, what you've done in this PR significantly changes the results of calling completeRecovery(), and there is at least a minor problem in doing so. Previously, schedule() would be called synchronously after setting RecoveryState.ALIVE and before logging the completion of recovery. Now you're causing recovery() to be called asynchronously when the TriggerSchedule event makes its way to the head of the message queue. That will potentially be long after RecoveryState.ALIVE has been set, the completion of recovery has been logged, and other events are processed with the new RecoveryState even though schedule() has not yet been run. The out-of-order logging is at least a minor problem. The processing of other events before schedule() has been run is potentially a major problem -- but like I said, I haven't yet worked through exactly how things are supposed to work, so I'm also not certain how big of a problem this PR would create.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14653/

@aarondav
Copy link
Contributor

aarondav commented May 5, 2014

I was thinking about this JIRA earlier, but the solution I had in mind was to change the schedule to send a "CompleteRecovery" message instead of having the completeRecovery() method send a TriggerSchedule method. Incidentally, the CompleteRecovery message already exists, because I totally meant to use it...

I'm not sure I fully understand how schedule behaves in the face of restart, though. Really, we don't want CompleteRecovery coming from a prior version of the actor (i.e., between restarts). Would cancelling the timer in the postStop() method accomplish this?

@CodingCat
Copy link
Contributor Author

@markhamstra , thank you very much for the comments and I'm sorry that I didn't explain clearly in the description of the PR, let me try to explain why I come up with this solution,

first, I went through the instances you mentioned in SPARK-1620 before I submit this patch, also I read the API document of Akka...I didn't find a general way (maybe I missed something) to capture exception in the caller thread...so I think we need to have a case-specific solution for each instance there, or we should say that we need to be careful when use Akka.scheduler.schedule() to do some timer task...but I'm not sure if I'm correct on this point, so I didn't address exception handle issue in this PR

second, I'm aware of the change of the behaviour in my solution (the scheduling is called asynchronously). Actually in my opinion, it is even safe to remove the call (I guess, this line is here is just for more frequent scheduling?), because schedule() is also called in other places (relaunchDriver()). (But now, I realized that that call of schedule() in relaunchDriver() is also a potential problem, as it's called in completeRecovery, maybe we should change that call to TriggerSchedule also) Or, we should refactor schedule() method to be just send TriggerSchedule to the actor, and update the TriggerSchedule handle to do real schedule work, in this way we can prevent the future risk of calling schedule() in another thread

@aarondav , I think sending a completeRecovery message is simpler than what I proposed above... I guess we can capture the return value of akka.scheduler.schedule() and cancel that in postStop as you proposed....

@CodingCat
Copy link
Contributor Author

for the future update of the PR

I think for this specific case, what @aarondav proposed is a pretty good, as we can keep the running of CompleteRecovery in the main thread

for schedule(), I think we also need to ensure schedule is always called in the main thread

@CodingCat
Copy link
Contributor Author

I discarded the idea of doing real scheduling work in akka thread for now, as I found that it will change the current behaviour of Master...(in some places, user may expect that the scheduling work is done immediately after they registerWorker or something else)

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14664/

@markhamstra
Copy link
Contributor

@CodingCat The documentation on Akka's handling of exceptions thrown from scheduled functions isn't entirely clear, so before re-opening SPARK-1620 I wrote various pieces of test code to discover exactly what the behavior is. From what I can tell, a default uncaught exception handler set in the thread that calls scheduler.schedule won't catch exceptions from a scheduled function, but it will catch at least some exceptions thrown by Akka itself (e.g. if the scheduled function closes over a reference that becomes unreachable in the scheduled function.) Similarly, an uncaught exception handler set within the ActorSystem (as we do in IndestructibleActorSystem) won't catch exceptions thrown by scheduled functions.

What will work is an explicit try-catch in the scheduled code (which can allow the scheduled event to happen again) or setting an uncaught exception handler explicitly on the scheduled thread (which will not allow any further scheduled events after the first thrown exception.) The latter approach is about as generalizable as we can get, I think, and is the approach I took in #622. That PR could be extended to cover the scheduling of completeRecovery() as well, but maybe the fix for the rest of SPARK-1686 will make that unnecessary.

@aarondav
Copy link
Contributor

aarondav commented May 9, 2014

Is it true that the uncaught exception handler set within the ActorSystem doesn't catch exceptions in scheduled functions? I was just reading the Akka source code for this very reason, and it seems like the uncaughtExceptionHandler should be used by the scheduler: https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L676 (this threadFactory is created with the uncaughtExceptionHandler).

@markhamstra
Copy link
Contributor

@aarondav That's what I thought too when looking at the Akka code, and that's why I closed the JIRA for a while. After writing some test code, though, it really didn't look to me like the uncaught exception handler was effective for the scheduled functions, so I re-opened 1620 and created #622. However, I'm still not clear on just why I'm seeing the behavior (or lack thereof) that I am, so I'd certainly welcome any insights you can arrive at by looking deeper.

@@ -171,10 +177,13 @@ private[spark] class Master(
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
recoverCallable = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's actually an alternate syntax for just sending a message, I think it's something like
scheduleOnce(WORKER_TIMEOUT millis, self, CompleteRecovery).

@aarondav
Copy link
Contributor

aarondav commented May 9, 2014

@markhamstra Further testing (test code here) seems to indicate that the uncaughtExceptionHandler is not called in any of these cases:

  • Error in actor receive (causes a LocalActorRefProvider guardian failure and shuts down the system)
  • Error in scheduler (special TaskInvocation logic seems to catch this)
  • Error in future using actor's context.dispatcher (not propagated anywhere I can see)

The last one was especially surprising, but further investigation indicates that the ExecutionContext in context.dispatcher is unrelated to the ThreadFactory. Custom specification of a default ExecutionContext was added in Akka 2.3.0 version (akka/akka@4b2d98c).

TL;DR It's pretty hard to get a custom exception handler that logs any errors in Akka-related threads.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14852/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@markhamstra
Copy link
Contributor

@aarondav Your test code is similar to mine, as are your conclusions. Somebody really needs to do a systematic inventory of Akka exception handling throughout Spark, including actor restart-ability. This PR, #622, #186 and #637 are a start, but I don't think that they are the end of the matter (cf. SPARK-1769, SPARK-1771, SPARK-1772.) Most of my Spark work right now, though, will be focused on SPARK-1021.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14853/

@@ -104,6 +104,8 @@ private[spark] class Master(

var leaderElectionAgent: ActorRef = _

private var recoverCallable: Cancellable = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Perhaps this would be better named "completeRecoveryTask" (we use the term "task" in a couple other places in the code base)

@aarondav
Copy link
Contributor

aarondav commented May 9, 2014

@markhamstra Absolutely agree.

@CodingCat The test failure is unrelated, I submitted #716 to fix it. Had one last minor comment, other than that LGTM.

@CodingCat
Copy link
Contributor Author

@aarondav thanks for the comments, just addressed it

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14865/

@aarondav
Copy link
Contributor

LGTM, merging into master and branch-1.0. Thanks!

asfgit pushed a commit that referenced this pull request May 10, 2014
https://issues.apache.org/jira/browse/SPARK-1686

moved from original JIRA (by @markhamstra):

In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties.

There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread.

In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread

Author: CodingCat <zhunansjtu@gmail.com>

Closes #639 from CodingCat/SPARK-1686 and squashes the following commits:

81bb4ca [CodingCat] rename variable
69e0a2a [CodingCat] style fix
36a2ac0 [CodingCat] address Aaron's comments
ec9b7bb [CodingCat] address the comments
02b37ca [CodingCat] keep schedule() calling in the main thread

(cherry picked from commit 2f452cb)
Signed-off-by: Aaron Davidson <aaron@databricks.com>
@asfgit asfgit closed this in 2f452cb May 10, 2014
markhamstra pushed a commit to markhamstra/spark that referenced this pull request May 14, 2014
https://issues.apache.org/jira/browse/SPARK-1686

moved from original JIRA (by @markhamstra):

In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties.

There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread.

In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread

Author: CodingCat <zhunansjtu@gmail.com>

Closes apache#639 from CodingCat/SPARK-1686 and squashes the following commits:

81bb4ca [CodingCat] rename variable
69e0a2a [CodingCat] style fix
36a2ac0 [CodingCat] address Aaron's comments
ec9b7bb [CodingCat] address the comments
02b37ca [CodingCat] keep schedule() calling in the main thread

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/master/Master.scala
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
https://issues.apache.org/jira/browse/SPARK-1686

moved from original JIRA (by @markhamstra):

In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties.

There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread.

In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread

Author: CodingCat <zhunansjtu@gmail.com>

Closes apache#639 from CodingCat/SPARK-1686 and squashes the following commits:

81bb4ca [CodingCat] rename variable
69e0a2a [CodingCat] style fix
36a2ac0 [CodingCat] address Aaron's comments
ec9b7bb [CodingCat] address the comments
02b37ca [CodingCat] keep schedule() calling in the main thread
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
This changes:
1. Remove set -o option to avoid the job exit when
zinc error happened which we should ignore.
2. Write the logs with correct permissions.

Related-Bug: theopenlab/openlab#316
rshkv pushed a commit to rshkv/spark that referenced this pull request Feb 27, 2020
Backport of https://issues.apache.org/jira/browse/SPARK-30512. Intent is to upgrade the shuffle service that we run inside NodeManagers to a version with this change.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants