-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-313] Enable the use of an existing spark context with the SparkPipelineRunner #401
Conversation
*/ | ||
@Description("Set to true if the spark runner will be " | ||
+ "initialized with an existing Spark Context") | ||
boolean isProvidedJavaSparkContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't create this as a boolean option. but infer it from the JavaSparkContext presence if the user passes it. If he passes the context is because he wants to use it. And I would put the JavaSparkContext as an option in the SparkPipelineOptions object.
Why not just creating an extend on SparkPipelineOptions injecting the SparkContext ? |
R: @jbonofre |
I thought that there could be issues with serialization, but I crossed my fingers it was not the case mmmm... this can be trickier then. |
@amarouni good point. |
I have to be honest, passing the SparkContext seems a bit hacky to me, since I think the Spark Runner should have the responsability for the runtime of their SparkContext(s). Another option would be to change the runner so it instantiates a n number of contexts (1 by default) and reuses them if needed. What do you think about this other approach guys ? |
The Spark runner is "executed" for each pipeline "execution". If you create three different pipelines (in a main method), then you will have three different contexts. It will be hard then to manage the contexts and pass from one execution to the other. |
@iemejia The spark context should be seen as an additional configuration element for the Spark runner. This opens up the Spark Runner for more real use cases (Interactive Spark shell with beam, Spark job server, ...). Spark context pooling is hard to get right since we're limited to 1 Spark context per JVM. |
@amarouni I agree, it makes more sense to me. Now, the main use case is that Beam bootstraps the spark job. But we can extend this. |
@amarouni this seems like a really interesting use case, but I have a few questions: Why not just add Did you check how this behaves with SparkSession in 2.0 ? because we're focusing our efforts towards 2.0 support to improve streaming support with it, so i'm not sure if this will still "fly". I still didn't get the chance to deep dive in there, but it's worth to understand it before we make this kind of a change. |
I understand that the runner is stateless, but if I understood well, one of the advantages of passing the SparkContext is to make the initialization shorter (like spark-job-server does). Of course I know that the pooling part is way more complex but if the goal is to reduce the initialization cost it is one possible way to achieve it, however we lose the flexibility of all the magic things you can do with the context before you inject it, but this will resolve the serialization issue. The other approach, the configuration (via the SparkRunner or the SparkOptions) is runner specific, my idea was more in the line of creating a somehow generic option to reuse pools of contexts in a more general and 'runner agnostic' way with a parameter like '--contexts 10'. But well maybe this does not make sense in other runners. |
@amitsela Do you have some 'public' branch for the ongoing work on the spark 2.0 integration that we can check/test ? |
@iemejia of course: we are working here: https://github.com/amitsela/incubator-beam/tree/spark-runner-datasets |
@jbonofre this is the "cleaner" version, which will become the PR - https://github.com/amitsela/incubator-beam/commits/BEAM-198 |
It's a bit stuck because of some serialization issues, but it seems like it's going to resolve in a few days, and I plan to get back to it next week. But to the point of this PR - I didn't ask why the context is not a part of the options but rather why have more PipelineOptions implementations ? IF this is a feature that benefits the runner, it should be (a boolean) option in the About optimising on initialisation - I don't see a great benefit, it's a bootstrap, so it'll take another second.. This could be really cool, though I think this is a bit more complex... |
@amitsela I didn't get the chance to test with it Spark 2.0 sessions, but will do so as soon as I get some time. But is there any Beam spark 2.0 branch ? |
No Spark-runner 2.0 branch, but there is no Spark 2.0 release yet, there is a working progress as I mentioned above against Spark 1.6 Considering the use case, I don't think that optimising on context initialization overhead matters since it's a bootstrap - but please correct me if I'm missing something. |
@amitsela Our core use-case is Beam on Spark Job Server (https://github.com/spark-jobserver/spark-jobserver). SJS creates/pools contexts, the contexts can also be setup with all necessary library dependencies and provided to something like Beam. That's how we got the idea of a SparkRunner with a provided context, and from there we saw the spark shell use case. |
R: @amitsela |
|
||
EvaluationResult res = SparkRunner.create(options, jsc).run(p); | ||
res.close(); | ||
jsc.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be implicit, probably to be handled by res.close()
internally.
@amarouni I owe you an apology.. This somehow slipped by me until I got a kind reminder from a friend. |
We just discussed with Amit about having two different spark runners (one for Spark 1.x, another one for Spark 2.x). So, this PR could be merged on the Spark 1.x runner. |
@amitsela @jbonofre I think that the SparkRunner should just use the context as provided at the moment the Runner is created. The context stays external to the runner along with its state (running, stopped, ...) (we can add a |
@amarouni sounds good to me, care to rebase and I'll give it a final look ? |
@amitsela Here's what it's going to look like (commit 11). |
@@ -195,6 +195,11 @@ public EvaluationResult run(Pipeline pipeline) { | |||
JavaSparkContext jsc; | |||
if (mOptions.isProvidedJavaSparkContext() && this.providedJavaSparkContext != null) { | |||
LOG.info("Using a provided Spark Java Context."); | |||
if (this.providedJavaSparkContext.sc().isStopped()){ | |||
LOG.error("The provided Spark context " | |||
+ this.providedJavaSparkContext + " is stopped"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4 space indent.
One thing I'm concerned about is potability - we're trying to avoid |
@amarouni Thanks, I gonna take a look. |
@@ -44,4 +47,18 @@ | |||
@Default.Long(1000) | |||
Long getBatchIntervalMillis(); | |||
void setBatchIntervalMillis(Long batchInterval); | |||
|
|||
@Override | |||
@Default.String("spark dataflow pipeline job") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was removed a while ago since it is inherited from ApplicationNameOptions
@amarouni I added some comments, I think there some merges of older versions found there way back in your PR, I've commented on that. |
LGTM. waiting for tests to pass. Thanks! |
FYI, there's an issue on OSX machine for Travis CI. As Jenkins passed, I think we are good to go. LGTM |
@amarouni note for future contribution: it's better to keep your commits in the head log using git rebase. It's largely easier to squash. |
@jbonofre will do. |
No problem, thanks for your contribution ! |
Thanks for the rebase and squash. Testing/merging now. |
BigQuery has changed their total size quota to 12 TiB. https://cloud.google.com/bigquery/quota-policy#import
@amitsela does this mean Beam works now with the Spark job server? The beam documentation even mentions the Spark job server, but it's not clear to me how it would work. |
@kohlerm One use of this is to improve the latency of starting Spark jobs by 'pre-heating' a SparkContext with the required environment and reusing these contexts on job executions at SJS (reusing contexts is something you should do with caution). |
Thanks for the quick replies! |
Here's an old Scala snippet that shows how to use Beam & SJS, it's based on old versions of SJS & Beam so it probably won't compile with new SJS/Beam versions but you'll get the idea : import com.typesafe.config.Config
import org.apache.beam.runners.spark.{ SparkContextOptions, SparkRunner }
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.StringUtf8Coder
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.transforms.Create
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import spark.jobserver.{ SparkJob, SparkJobInvalid, SparkJobValid, SparkJobValidation }
import scala.collection.JavaConversions
import scala.util.Try
/**
* Beam wordcount test. Returns the word count of a fixed String seq.
*/
object BeamWordCount extends SparkJob {
override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
Try(config.getStringList("wordList"))
.map(x => SparkJobValid)
.getOrElse(SparkJobInvalid("No wordList in context config"))
}
override def runJob(sc: SparkContext, jobConfig: Config): Any = {
// Input test list
val inputBuffer = scala.collection.JavaConversions.asScalaBuffer(jobConfig.getStringList("wordList"))
val WORDS = inputBuffer.toList
// Pipeline options
val sparkPipelineOptions = PipelineOptionsFactory.as(classOf[SparkContextOptions])
sparkPipelineOptions.setAppName("Beam WordCount test")
sparkPipelineOptions.setRunner(classOf[SparkRunner])
sparkPipelineOptions.setUsesProvidedSparkContext(true)
sparkPipelineOptions.setProvidedSparkContext(new JavaSparkContext(sc))
// Pipeline
val pipeline = Pipeline.create(sparkPipelineOptions)
// Input + processing + Output
val output = pipeline
.apply(Create.of(JavaConversions.seqAsJavaList(WORDS)).withCoder(StringUtf8Coder.of()))
.apply(new CountWords())
// Result
// val result: EvaluationResult = pipeline.run().asInstanceOf[EvaluationResult]
// Run job & wait until finish
pipeline.run().waitUntilFinish()
}
} It'd be nice to contribute this as new documentation if you manage to get it to work. |
Thanks a lot! Of course in case we get it work I would try to document it. I think this is a pretty common use case. |
https://gist.github.com/kohlerm/649f92195e71697a1931f81def176ec9 worked for us. |
Source-Link: googleapis/synthtool@52e4e46 Post-Processor: gcr.io/repo-automation-bots/owlbot-python:latest@sha256:6186535cbdbf6b9fe61f00294929221d060634dae4a0795c1cefdbc995b2d605
The general use case is that the SparkPipelineRunner creates its own Spark context and uses it for the pipeline execution.
Another alternative is to provide the SparkPipelineRunner with an existing spark context. This can be interesting for a lot of use cases where the Spark context is managed outside of beam (context reuse, advanced context management, spark job server, ...).