Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job killing in Spark #935

Closed
wants to merge 12 commits into from
Closed

job killing in Spark #935

wants to merge 12 commits into from

Conversation

rxin
Copy link
Member

@rxin rxin commented Sep 17, 2013

This subsumes pull request #665 by @harsha2010

Different from #665 , this PR is not using interrupts to kill tasks because interrupts would rely on the user code properly handling the interrupts. One big counter example is Hadoop client would mark a data node as dead when it sees an interrupt during reads. Instead, this pull requests add a new volatile variable "interrupted" to the TaskContext class, and rely on the upper level code to check that flag. This can be done by simply wrapping an existing iterator with an InterruptibleIterator.

Todos:

  1. Use InterruptibleIterator in all "input" iterators, including the ones in ParallelCollectionsRDD, HadoopRDD, NewHadoopRDD, ShuffleFetcher. (It might be easier to create an InterruptibleRDD class and just make the various input RDDs the parent of this new RDD.)
  2. Design an interface for the user to be able to kill jobs. One idea is to add a new submitJob function in DAGScheduler, and for that to return a Future-like object that has a "cancel" or "kill" method. Note that this can be reused in the future to implement progress reporting feature.
  3. Unit tests.

@@ -812,6 +813,38 @@ class SparkContext(
result
}

def submitJob[T, U, R](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly not unique to this method, but this is another example of something that is publicly accessible in a SparkContext that probably shouldn't be. Is there any plan to provide a more restricted SparkContext API (for use, e.g., in the shell), or are we going to continue to assume that if we don't tell people about something, then they won't misuse it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually why wouldn't we want this till be accessible? It is at the same
level as runJob.
On Sep 18, 2013 9:50 AM, "Mark Hamstra" notifications@github.com wrote:

In core/src/main/scala/org/apache/spark/SparkContext.scala:

@@ -812,6 +813,38 @@ class SparkContext(
result
}

  • def submitJob[T, U, R](

Certainly not unique to this method, but this is another example of
something that is publicly accessible in a SparkContext that probably
shouldn't be. Is there any plan to provide a more restricted SparkContext
API (for use, e.g., in the shell), or are we going to continue to assume
that if we don't tell people about something, then they won't misuse it?


Reply to this email directly or view it on GitHubhttps://github.com//pull/935/files#r6439038
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, and we certainly want custom RDD developers and the like to be able to access runJob etc. My question, though, is whether we really want all users of, e.g., spark-shell to be able to call runJob, submitJob, etc. directly. It's something of a question of whether spark-shell is targeted at developers (who want to be able to do anything in the repl that they could in standalone code), or whether it is targeted at end users who just want/need to be able to do the kinds of higher-level things with RDDs that we typically show in examples. Maybe the issue isn't so much that, within SparkContext itself, there needs to be another access level for methods as it is that we could use another kind of interactive tool/UI (something like iPython Notebooks, perhaps...) that doesn't have full access to all the SparkContext methods that many users shouldn't really be messing with directly.

* without a JobSubmitted event.
* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to kill the job.
* If the given RDD does not contain any partitions, the function returns None.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this last sentence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's an old comment. Need to change that.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

One or more automated tests failed
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/940/

} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
.interruptible()
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which means that you need this in PairRDDFunctionsSuite:

-    assert(deps.size === 2) // ShuffledRDD, ParallelCollection
+    assert(deps.size === 3) // ShuffledRDD, ParallelCollection, InterruptibleRDD

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i know. im intentionally not changing the tests until i am happy with my design

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, okay. But so far, that's the only test you've broken.

@mateiz
Copy link
Member

mateiz commented Sep 28, 2013

Reynold, did you run the spark-perf tests on this? Would be good to check that it adds no overhead.

@rxin
Copy link
Member Author

rxin commented Oct 3, 2013

Just ran some perf tests. Didn't notice any performance degradation with this turned on.

without job killing:

scheduling-throughput, --num-tasks=10000 --num-trials=10 --inter-trial-wait=30, 5.198, 0.312, 4.78, 5.198, 5.084
scala-agg-by-key, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 2.728, 0.754, 2.595, 3.116, 2.633
scala-sort-by-key, --num-trials=10 --inter-trial-wait=30 --num-partitions=20 --reduce-tasks=20 --random-seed=5 --persistent-type=memory  --num-records=10000000 --unique-keys=1000 --key-length=10 --unique-values=50000 --value-length=10, 4.343, 0.223, 3.841, 4.343, 3.971
scala-count, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 0.273, 0.062, 0.245, 0.398, 0.273
scala-count-w-fltr, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 0.552, 0.049, 0.497, 0.643, 0.497

with job killing run 1

scheduling-throughput, --num-tasks=10000 --num-trials=10 --inter-trial-wait=30, 5.148, 1.739, 4.841, 5.134, 8.928
scala-agg-by-key, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 2.727, 0.170, 2.569, 3.163, 2.569
scala-sort-by-key, --num-trials=10 --inter-trial-wait=30 --num-partitions=20 --reduce-tasks=20 --random-seed=5 --persistent-type=memory  --num-records=10000000 --unique-keys=1000 --key-length=10 --unique-values=50000 --value-length=10, 4.405, 0.217, 3.928, 4.425, 3.977
scala-count, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 0.271, 0.058, 0.246, 0.384, 0.269
scala-count-w-fltr, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 0.578, 0.032, 0.544, 0.635, 0.616

with job killing run 2

scheduling-throughput, --num-tasks=10000 --num-trials=10 --inter-trial-wait=30, 5.423, 2.570, 4.846, 5.423, 5.376
scala-agg-by-key, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 2.765, 0.150, 2.714, 3.161, 2.738
scala-sort-by-key, --num-trials=10 --inter-trial-wait=30 --num-partitions=20 --reduce-tasks=20 --random-seed=5 --persistent-type=memory  --num-records=10000000 --unique-keys=1000 --key-length=10 --unique-values=50000 --value-length=10, 4.354, 0.230, 3.954, 4.511, 4.168
scala-count, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 0.313, 0.078, 0.245, 0.512, 0.245
scala-count-w-fltr, --num-trials=10 --inter-trial-wait=30 --num-partitions=200 --reduce-tasks=200 --random-seed=5 --persistent-type=memory  --num-records=100000000 --unique-keys=10000 --key-length=10 --unique-values=500000 --value-length=10, 0.537, 0.041, 0.476, 0.608, 0.476

@rxin
Copy link
Member Author

rxin commented Oct 3, 2013

@rxin rxin closed this Oct 3, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants