Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Cancellation of Spark Jobs #665

Closed
wants to merge 2 commits into from
Closed

Support Cancellation of Spark Jobs #665

wants to merge 2 commits into from

Conversation

harsha2010
Copy link

Hi

This patch allows us to cancel runaway Spark queries by issuing a killJob command passing in the jobId. The approach taken here is to let the DAG scheduler clean up its state about a job that is currently executing, and propagate an interrupt to the running task. Each task runs within a Future to properly respond to interrupts and cancel execution. Every Iterator that iterates over data (either via CacheManager/ or BlockFetcher or the HadoopRDD) is wrapped by an InterruptibleIterator which checks for the interrupt and cleans up state accordingly and exits.
I have tested the performance of this patch against master on a bunch of internal queries and the performance is not impacted by this patch.
I am in the process of obtaining TPCH benchmarks with and without the patch which i will attach here.
In the meanwhile, please review and let me know if the design needs changes.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

extends AnyRef with InterruptibleIterator[T] {

override def hasNext(): Boolean = {
super.hasNext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems kinda weird to me that InterruptibleIterator is actually implementing hasNext, which then you override here. Maybe it should have a method exceptionIfThreadInterrupted. It seems like the trait is not actually implementing hasNext at all, its just supplying a utility methods for implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this seems odd to me, too. Is there some reason why InterruptibleIterator can't be implemented more like CompletionIterator? In other words, class InterruptibleIteratorDecorator just becomes class InterruptibleIterator, which extends Iterator; and your InterruptibleIterator trait becomes object InterruptibleIterator, which defs a function called something like notInterrupted to replace all of the calls to InterruptibleIterator.hasNext -- i.e. the peculiar super.hasNext calls become something like InterruptibleIterator.notInterrupted.

@squito
Copy link
Contributor

squito commented Jun 28, 2013

This looks great! I have wanted this feature for a long time. I made a couple of minor style comments.

I only have one question about the implementation -- it seems like you register the cores as freed as soon as the killTask request gets sent. Do the tasks really die immediately? Should it wait for some acknowledgement that the task really has been killed? I guess its not horrible to have too many tasks running on an executor for a little while, so if it adds a lot of complication for a rare corner case, maybe we can forget it.

@harsha2010
Copy link
Author

Thanks for your comments., I'll incorporate them in the update i will add shortly with TPCH performance #s.
The registering cores point is well taken: I couldn't find an easy way to deal with the acknowledgement that a task has been actually killed so implemented the optimistic solution. I'll take a look once more to see if i can do something better.

f.run()
f.get()
} catch {
case e: Exception => throw e.getCause()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be catching Throwable here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right., we should be catching throwable here.

@shivaram
Copy link
Contributor

There was one major issue I ran into when I tried to do this before. I am not sure if this still applies:

When canceling tasks that were reading files from HDFS, I noticed that the sockets open in DFSClient would be interrupted and this would lead to datanodes being marked as dead. I had to add a bunch of catch InterruptedException in DFSClient to avoid this before in Hadoop 0.20. I am not sure if this is the case in later versions of Hadoop and this would be good to test.

(As a generalization, I guess we are relying on user-defined classes behaving appropriately when we interrupt a thread ?)

@mateiz
Copy link
Member

mateiz commented Jun 29, 2013

Hey Ram, as I'm looking at this more closely, one question on the Iterator design: why do we need CompletionIterator to also extend InterruptibleIterator? Can't we just catch the InterruptedException in the code that's running the task and run the cleanup procedure there?

@mateiz
Copy link
Member

mateiz commented Jun 29, 2013

(Or more generally, catch it on a call to next or hasNext; basically the on-complete callbacks should be called even if the task throws an exception, so InterruptedException doesn't need to be different).

@harsha2010
Copy link
Author

Hey Matei,
Now that i think about it i think you are right: the runInterruptibly methods in all tasks have a finally block that calls oncomplete callbacks. So the interrupted exception does not need to be different.

@@ -53,7 +55,8 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
elements ++= rdd.computeOrReadCheckpoint(split, context)
// Try to put this block in the blockManager
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
val iter = elements.iterator.asInstanceOf[Iterator[T]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excess indentation

@markhamstra
Copy link
Contributor

After merging this PR into master @ 7dcda9a, CancellationSuite "Cancel Task" is not completing for me.

@harsha2010
Copy link
Author

Oh ok will take a look. The test might not have been as well thought out as I imagined, it seems to work on my machine but possibly has a race condition.
Thanks will fix it

Sent from my iPhone

On Jul 10, 2013, at 12:20 PM, Mark Hamstra notifications@github.com wrote:

After merging this PR into master @ 7dcda9a, CancellationSuite "Cancel Task" is not completing for me.


Reply to this email directly or view it on GitHub.

}
}

private def killJob(job: ActiveJob, reason: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mateiz Doesn't this have the same problem discussed in #414 where more than one ActiveJob can share a stage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is actually true. To do this properly we'll need to do some kind of reference-counting on the stages (keep a list of which jobs currently want to run this stage). One difference here is that killJob is called by the user and for the first use case, of Shark, it's probably going to be fine. But it would be good to either track this properly or send a warning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's pretty much the conclusion that I was arriving at. I'll work on the reference-counting refactoring. Should be doable independently of this PR and only require a minimal change here once it is done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, that would be great to have.

@AmplabJenkins
Copy link

Thank you for your pull request. An admin will review this request soon.

@rxin rxin mentioned this pull request Sep 17, 2013
@rxin
Copy link
Member

rxin commented Sep 17, 2013

Ram - I am closing this one because it is going to be subsumed by #935.

@rxin rxin closed this Sep 17, 2013
xiajunluan pushed a commit to xiajunluan/spark that referenced this pull request May 30, 2014
…in"...

... java.lang.ClassNotFoundException: org.apache.spark.broadcast.TorrentBroadcastFactory

Author: witgo <witgo@qq.com>

Closes mesos#665 from witgo/SPARK-1734 and squashes the following commits:

cacf238 [witgo] SPARK-1734: spark-submit throws an exception: Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.broadcast.TorrentBroadcastFactory
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants