-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2321] Stable pull-based progress / status API #2696
Conversation
This change means that the listeners will always be registered, even if the web UI is disabled. The motivation for this change is to allow these listeners to be used when implementing a stable pull-based status / progress API for 1.2.
Some design goals here: - Hide constructors and implementations from users; only expose interfaces. - Return only immutable objects. - Ensure API will be usable from Java (e.g. only expose Array collections, since they're nicely usable from Scala and Java without having to do any implicit conversions on the Scala side or wrapping into Java-friendly types on the Java side).
QA tests have started for PR 2696 at commit
|
QA tests have finished for PR 2696 at commit
|
Test PASSed. |
- Java-style getX() method naming doesn't make sense for immutable objects that will never have any setters. - The "consistent snapshot of the entire job -> stage -> task mapping" semantics might be very expensive to implement for large jobs, so I've decided to remove chaining between SparkJobInfo and SparkStageInfo interfaces. Concretely, this means that you can't write something like job.stages()(0).name to get the name of the first stage in a job. Instead, you have to explicitly get the stage's ID from the job and then look up that stage using sc.getStageInfo(). This isn't to say that we can't implement methods like "getNumActiveStages" that reflect consistent state; the goal is mainly to avoid spending lots of time / memory to construct huge object graphs.
This is a very rough WIP example; things should be considerably simpler once I port AsyncRDDActions to Java.
Test FAILed. |
@@ -132,6 +132,12 @@ class JavaSparkContext(val sc: SparkContext) | |||
/** Default min number of partitions for Hadoop RDDs when not given by user */ | |||
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions | |||
|
|||
def getJobsIdsForGroup(jobGroup: String): Array[Int] = sc.getJobsIdsForGroup(jobGroup) | |||
|
|||
def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add java doc to these 3 methods and explain that this returns null when jobId doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose that I could use a Guava Optional
here, although we were bitten in the past by exposing this third-party type in our APIs. Returning null
would be inconsistent with the use of Optional
in other parts of the Java API (such as JavaSparkContext.getCheckpointDir()
). On the other hand, we may be stuck with Optional
since other code exposes it.
What do you think we should do here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null is fine - and definitely don't depend on teh Optional type in Guava. I was merely suggesting adding more documentation about this API
Hi @JoshRosen, I think this is going in the right direction and is definitely an improvement over the status quo. I left some comments about semantics of certain parts of the API and some general suggestions. One thing is that this PR seems to cover half of SPARK-2321 - the part about monitoring, but not the listener API. Perhaps that bug should be broken into separate sub-tasks? |
@vanzin Yeah, maybe we should split SPARK-2321; this PR is only concerned with progress monitoring; I'm not proposing any stabilization of the listener APIs here. |
- Use enumeration for job status. - Remove StatusAPIImpl class; implement methods directly in SparkContext. - Fix typo in getJobIdsForGroup method name. - Add Javadoc and comment on nullability. - Rename numCompleteTasks -> numCompleted tasks - Perform more efficient filtering in jobIdsForGroup
QA tests have started for PR 2696 at commit
|
QA tests have finished for PR 2696 at commit
|
Test PASSed. |
|
||
package org.apache.spark; | ||
|
||
public enum ExecutionStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a JobExecutionStatus
QA tests have started for PR 2696 at commit
|
Alright, I've updated this based on the latest round of feedback. I've removed the WIP, since I think this is enough for a basic first-cut at this API; we can always expose more things via it in later commits. |
QA tests have finished for PR 2696 at commit
|
Test PASSed. |
|
||
package org.apache.spark; | ||
|
||
public enum JobExecutionStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like it's missing KILLED
, although I'm not sure whether it's possible to differentiate that from FAILED
in the backend at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we have backend support for this, at least not at the JobProgressListener layer.
@JoshRosen lgtm; I don't really like the listener changes in Also, what about unit tests for the new APIs? I don't see any... |
Test build #22171 has started for PR 2696 at commit
|
I added a couple of simple tests. One subtlety here is the fact that this API is sort of "eventually consistent" in the sense that we might know about a job before we have any information on its stages. Offering stronger guarantees, like "once you have a JobInfo for a job that's running, you should always be able to get a StageInfo", may require significant scheduler / listener refactoring. |
Test build #22171 has finished for PR 2696 at commit
|
Test FAILed. |
LGTM. Thanks for adding the tests! |
Thanks for all of the review feedback! I'm going to merge this now so that we can begin using it in other PRs and features, such as progress bars in the Spark shells. |
This PR refactors / extends the status API introduced in #2696. - Change StatusAPI from a mixin trait to a class. Before, the new status API methods were directly accessible through SparkContext, whereas now they're accessed through a `sc.statusAPI` field. As long as we were going to add these methods directly to SparkContext, the mixin trait seemed like a good idea, but this might be simpler to reason about and may avoid pitfalls that I've run into while attempting to refactor other parts of SparkContext to use mixins (see #3071, for example). - Change the name from SparkStatusAPI to SparkStatusTracker. - Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated with any job group. - Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids of whatever's currently active in this SparkContext. This should simplify davies's progress bar code. Author: Josh Rosen <joshrosen@databricks.com> Closes #3197 from JoshRosen/progress-api-improvements and squashes the following commits: 30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker. d1b08d8 [Josh Rosen] Add missing newlines 2cc7353 [Josh Rosen] Add missing file. d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods. a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default group c47e294 [Josh Rosen] Remove StatusAPI mixin trait. (cherry picked from commit 40eb8b6) Signed-off-by: Reynold Xin <rxin@databricks.com>
This PR refactors / extends the status API introduced in #2696. - Change StatusAPI from a mixin trait to a class. Before, the new status API methods were directly accessible through SparkContext, whereas now they're accessed through a `sc.statusAPI` field. As long as we were going to add these methods directly to SparkContext, the mixin trait seemed like a good idea, but this might be simpler to reason about and may avoid pitfalls that I've run into while attempting to refactor other parts of SparkContext to use mixins (see #3071, for example). - Change the name from SparkStatusAPI to SparkStatusTracker. - Make `getJobIdsForGroup(null)` return ids for jobs that aren't associated with any job group. - Add `getActiveStageIds()` and `getActiveJobIds()` methods that return the ids of whatever's currently active in this SparkContext. This should simplify davies's progress bar code. Author: Josh Rosen <joshrosen@databricks.com> Closes #3197 from JoshRosen/progress-api-improvements and squashes the following commits: 30b0afa [Josh Rosen] Rename SparkStatusAPI to SparkStatusTracker. d1b08d8 [Josh Rosen] Add missing newlines 2cc7353 [Josh Rosen] Add missing file. d5eab1f [Josh Rosen] Add getActive[Stage|Job]Ids() methods. a227984 [Josh Rosen] getJobIdsForGroup(null) should return jobs for default group c47e294 [Josh Rosen] Remove StatusAPI mixin trait.
This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see SPARK-2321). For now, I'd like to discuss the basic implementation, API names, and overall interface design. Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API.
Design goals:
Implementation:
getJobInfo
,getStageInfo
) to SparkContext to allow status / progress information to be retrieved.SparkJobInfo
,SparkStageInfo
) for our API return values. These interfaces consist entirely of Java-style getter methods. The interfaces are currently implemented in Java. I decided to explicitly separate the interface from its implementation (SparkJobInfoImpl
,SparkStageInfoImpl
) in order to prevent users from constructing these responses themselves.-Allow an existing JobProgressListener to be used when constructing a live SparkUI. This allows us to re-use this listeners in the implementation of this status API. There are a few reasons why this listener re-use makes sense:
jobId -> Job
andstageId -> Stage
mappings.The progress API methods are implemented in a separate trait that's mixed into SparkContext. This helps to avoid SparkContext.scala from becoming larger and more difficult to read.