Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6629] cancelJobGroup() may not work for jobs whose job groups are inherited from parent threads #5288

Closed

Conversation

JoshRosen
Copy link
Contributor

When a job is submitted with a job group and that job group is inherited from a parent thread, there are multiple bugs that may prevent this job from being cancelable via SparkContext.cancelJobGroup():

  • When filtering jobs based on their job group properties, DAGScheduler calls get() instead of getProperty(), which does not respect inheritance, so it will skip over jobs whose job group properties were inherited.
  • Properties objects are mutable, but we do not make defensive copies / snapshots, so modifications of the parent thread's job group will cause running jobs' groups to change; this also breaks cancelation.

Both of these issues are easy to fix: use getProperty() and perform defensive copying.

@@ -493,7 +495,8 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
SerializationUtils.clone(properties)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use SerializationUtils.clone() in order to perform a deep-clone which also captures properties that were inherited from parents; see 5a55261 for more discussion of this issue.

@SparkQA
Copy link

SparkQA commented Mar 31, 2015

Test build #29471 has started for PR 5288 at commit 3f7b9e8.

@JoshRosen
Copy link
Contributor Author

On further reflection, this might not be a use-case that we want to support. I don't think we make any user-facing promises about how / whether these properties are inherited across threads and there are reasons why we may want to discourage this implicit inheritance in favor of explicitly setting the job group in each thread that submits jobs; see https://issues.apache.org/jira/browse/SPARK-4514 for a related discussion.

@JoshRosen
Copy link
Contributor Author

@jerryshao, since you added unit tests for localProperty inheritance in ffa5f8e, do you know of any actual use-cases that motivate this inheritance? As far as I can tell, we don't seem to rely on it internally inside of Spark and it's not documented in a user-facing way. Users can't be relying on it for job group cancellation, since this patch demonstrates that this has always been broken, and it seems unlikely that anyone would choose to use this for cross-thread communication: in order to read and write these properties, you need to have a reference to the SparkContext instance, at which point you might as well just pass the information directly instead of implicitly inheriting it across threads.

If there's no actual use for this inheritance, it might be nice to remove it.

@SparkQA
Copy link

SparkQA commented Mar 31, 2015

Test build #29471 has finished for PR 5288 at commit 3f7b9e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/29471/
Test PASSed.

@jerryshao
Copy link
Contributor

Hi @JoshRosen , sorry for late response, looks like I missed this notification. The reason why here changes to InheritableThreadLocal is that: we have such a scenario, we enabled FairScheduler in Spark with two groups, one group is running streaming program, and another group is running query, they separated in two threads and shared with same SparkContext, since streaming job is running in a threadpool when submitted to Spark DAGScheduler, so if we do not use InheritableThreadLocal, we cannot get the right value in child thread, so I changed to this way.

@jerryshao
Copy link
Contributor

Hi @JoshRosen , I think this is the original PR (mesos/spark#937). I'm not sure is this issue still existed, it is a quite old code.

@zsxwing
Copy link
Member

zsxwing commented Apr 10, 2015

use getProperty() and perform defensive copying.

+1. It's possible that SparkListener will see some wrong key-values in SparkListenerJobStart.properties without the defensive copying.

@zsxwing
Copy link
Member

zsxwing commented Apr 14, 2015

@JoshRosen, will this PR be merged? I'm using localProperties in #5473. If we don't copy localProperties, SparkListenerJobStart.properties will just point to localProperties.get. It will be used in SparkListener, which runs in another thread. If localProperties is changed before a listener uses SparkListenerJobStart.properties, the listener will see the modification rather than the properties when Job starts.

@JoshRosen
Copy link
Contributor Author

@zsxwing I haven't merged this yet because I had some doubts about how inheritance of local properties should work, especially when threads are re-used. Let me revisit this now, though, since I think the changes here seem to make the behavior more consistent. I'll also take a closer look at your use-case in #5473 to try to understand how it would be affected by this change and whether it's prone to any of the weird thread re-use corner cases that can crop up with localProperties.

…ability-race

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@SparkQA
Copy link

SparkQA commented Apr 19, 2015

Test build #30540 has started for PR 5288 at commit 5d90750.

@SparkQA
Copy link

SparkQA commented Apr 19, 2015

Test build #30540 has finished for PR 5288 at commit 5d90750.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30540/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 19, 2015

Test build #30542 has started for PR 5288 at commit 9e29654.

@SparkQA
Copy link

SparkQA commented Apr 19, 2015

Test build #30542 has finished for PR 5288 at commit 9e29654.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Params(
    • sealed abstract class Node extends Serializable
    • sealed trait Split extends Serializable
    • final class CategoricalSplit(
    • final class ContinuousSplit(override val featureIndex: Int, val threshold: Double) extends Split
    • trait DecisionTreeModel
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30542/
Test PASSed.

@JoshRosen
Copy link
Contributor Author

/cc @andrewor14, what do you think about this fix? The inheritance + cancellation tests here address an incredibly obscure corner-case that most users might not hit or expect to work (since I don't think that we ever explicitly state that job groups will be inherited across threads). The defensive copy of Properties seems like a good change by itself, though.

I'd like to try to get some closure on this issue by either deciding that we're going to support the inheritance feature as it was originally described or by removing this feature and instructing users to manually set job groups in child threads; at this point, I think that either option could work since the current behavior is broken. We could also decide to just leave the inheritance alone and only do the copying change.

@SparkQA
Copy link

SparkQA commented Apr 25, 2015

Test build #706 has started for PR 5288 at commit 9e29654.

@SparkQA
Copy link

SparkQA commented Apr 25, 2015

Test build #707 has started for PR 5288 at commit 9e29654.

@SparkQA
Copy link

SparkQA commented Apr 27, 2015

Test build #31033 has started for PR 5288 at commit 9e29654.

@zsxwing
Copy link
Member

zsxwing commented Apr 29, 2015

@JoshRosen is it ready to go? At least, #5473 needs the defensive copy of Properties to avoid breaking tests. Thank you.

@tdas
Copy link
Contributor

tdas commented Apr 29, 2015

Merging this. The defensive copying is blocker for #5473 .

@asfgit asfgit closed this in 3a180c1 Apr 29, 2015
@JoshRosen JoshRosen deleted the localProperties-mutability-race branch May 26, 2015 03:41
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
…are inherited from parent threads

When a job is submitted with a job group and that job group is inherited from a parent thread, there are multiple bugs that may prevent this job from being cancelable via `SparkContext.cancelJobGroup()`:

- When filtering jobs based on their job group properties, DAGScheduler calls `get()` instead of `getProperty()`, which does not respect inheritance, so it will skip over jobs whose job group properties were inherited.
- `Properties` objects are mutable, but we do not make defensive copies / snapshots, so modifications of the parent thread's job group will cause running jobs' groups to change; this also breaks cancelation.

Both of these issues are easy to fix: use `getProperty()` and perform defensive copying.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#5288 from JoshRosen/localProperties-mutability-race and squashes the following commits:

9e29654 [Josh Rosen] Fix style issue
5d90750 [Josh Rosen] Merge remote-tracking branch 'origin/master' into localProperties-mutability-race
3f7b9e8 [Josh Rosen] Add JIRA reference; move clone into DAGScheduler
707e417 [Josh Rosen] Clone local properties to prevent mutations from breaking job cancellation.
b376114 [Josh Rosen] Fix bug that prevented jobs with inherited job group properties from being cancelled.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
…are inherited from parent threads

When a job is submitted with a job group and that job group is inherited from a parent thread, there are multiple bugs that may prevent this job from being cancelable via `SparkContext.cancelJobGroup()`:

- When filtering jobs based on their job group properties, DAGScheduler calls `get()` instead of `getProperty()`, which does not respect inheritance, so it will skip over jobs whose job group properties were inherited.
- `Properties` objects are mutable, but we do not make defensive copies / snapshots, so modifications of the parent thread's job group will cause running jobs' groups to change; this also breaks cancelation.

Both of these issues are easy to fix: use `getProperty()` and perform defensive copying.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#5288 from JoshRosen/localProperties-mutability-race and squashes the following commits:

9e29654 [Josh Rosen] Fix style issue
5d90750 [Josh Rosen] Merge remote-tracking branch 'origin/master' into localProperties-mutability-race
3f7b9e8 [Josh Rosen] Add JIRA reference; move clone into DAGScheduler
707e417 [Josh Rosen] Clone local properties to prevent mutations from breaking job cancellation.
b376114 [Josh Rosen] Fix bug that prevented jobs with inherited job group properties from being cancelled.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…are inherited from parent threads

When a job is submitted with a job group and that job group is inherited from a parent thread, there are multiple bugs that may prevent this job from being cancelable via `SparkContext.cancelJobGroup()`:

- When filtering jobs based on their job group properties, DAGScheduler calls `get()` instead of `getProperty()`, which does not respect inheritance, so it will skip over jobs whose job group properties were inherited.
- `Properties` objects are mutable, but we do not make defensive copies / snapshots, so modifications of the parent thread's job group will cause running jobs' groups to change; this also breaks cancelation.

Both of these issues are easy to fix: use `getProperty()` and perform defensive copying.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#5288 from JoshRosen/localProperties-mutability-race and squashes the following commits:

9e29654 [Josh Rosen] Fix style issue
5d90750 [Josh Rosen] Merge remote-tracking branch 'origin/master' into localProperties-mutability-race
3f7b9e8 [Josh Rosen] Add JIRA reference; move clone into DAGScheduler
707e417 [Josh Rosen] Clone local properties to prevent mutations from breaking job cancellation.
b376114 [Josh Rosen] Fix bug that prevented jobs with inherited job group properties from being cancelled.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants