Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3371] Enable running integration tests on Spark #6244

Merged
merged 4 commits into from
Oct 2, 2018

Conversation

lgajowy
Copy link
Contributor

@lgajowy lgajowy commented Aug 17, 2018

This PR enables running IOIT on Spark runner. One can do that by setting up remote spark cluster (spark master) and running this command:

./gradlew clean integrationTest -p sdks/java/io/file-based-io-tests/ -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=PREFIX", "--runner=TestSparkRunner", "--sparkMaster=spark://LGs-Mac.local:7077", "--tempLocation=/tmp/"]' -DintegrationTestRunner=spark --tests org.apache.beam.sdk.io.text.TextIOIT --info

I experienced some difficulties with TFRecordIOIT, ParquetIOIT and XmlIOIT tests:

  • XmlIOIT fails on assertion (hashcode of PCollection is different that it should be)
  • ParquetIOIT suffers for dependency missmatch (java.lang.NoSuchMethodError: org.apache.parquet.hadoop.ParquetWriter$Builder.<init>(Lorg/apache/parquet/io/OutputFile;)V)
  • TFRecordIOIT cannot find the created file: java.io.FileNotFoundException: No files found for spec: PREFIX_1534520885787*

Those issues are of different nature than the one described in BEAM-3371 and have to be tackled separately.

@iemejia Could you take a look when you're back? There's no need to rush - I'll be off for 2 weeks now.

CC: @pabloem


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
--- --- --- ---

@lgajowy lgajowy requested a review from iemejia August 17, 2018 15:50
@lgajowy
Copy link
Contributor Author

lgajowy commented Aug 17, 2018

BTW: what is the reason of the fact that filesToStage option is duplicated in DataflowWorkerPoolOptions, FlinkPipelineOptions and SparkPipelineOptions? Do you think we can move this to PipelineOptions interface? IMO it would make things even easier (maybe not only for this fix)?

@lgajowy
Copy link
Contributor Author

lgajowy commented Aug 17, 2018

Run Java PreCommit

@lgajowy
Copy link
Contributor Author

lgajowy commented Aug 17, 2018

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 3, 2018

@iemejia rebased and fixed the spotless issue. Could you take a look?

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 10, 2018

@iemejia ping :)

Alternatively, (if you're very busy), please suggest some other reviewer.

@iemejia
Copy link
Member

iemejia commented Sep 11, 2018

You are absolutely right @lgajowy I should have passed this review since I was a bit loaded these last days. Asking @aromanenko-dev to take a look (thanks Alexey!).

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added a couple of minor notes.


List<String> result = PipelineResources.prepareFilesForStaging(filesToStage, temporaryLocation);

assertThat(result, is(empty()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add another entry to filesToStage list, which is existent path, and assert that it removes ONLY non existent one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, good idea.

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 17, 2018

Hi @aromanenko-dev - I applied the suggestions. PTAL.

@aromanenko-dev
Copy link
Contributor

Retest this please

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 18, 2018

@aromanenko-dev I amended it a little bit and left a comment (ptal).

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 19, 2018

@mxm @aromanenko-dev thanks!

I added tests for Flink. Due to the fact that Spark runner invokes the code in run() method and creates SparkContext there (but does not delete it later?), I had some trouble with creating similar tests for Spark too so i didn't submit them in this PR. Maybe we can do this later. I got the following error when I run similar tests:

Cannot reuse spark context with different spark master URL. Existing: local[1], requested: spark://localhost:7077.

I tried to reuse the context (with ReuseContextRule.java class). I also tried to stop the context somehow but all this lead me nowhere. The code I used is here: https://github.com/apache/beam/compare/master...lgajowy:spark-integration-tests-2?expand=1#diff-9336eb87a4aea9ba0f254a1318f1fc90

@mxm @aromanenko-dev could you take a look again?

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lgajowy looks good for the Flink side.

@@ -435,7 +451,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
protected <TransformT extends PTransform<? super PInput, POutput>>
TransformEvaluator<TransformT> translate(
TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
//--- determine if node is bounded/unbounded.
// --- determine if node is bounded/unbounded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted.

@@ -49,6 +63,7 @@ public void shouldRecognizeAndTranslateStreamingPipeline() {
.apply(
ParDo.of(
new DoFn<Long, String>() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted.


@Test
public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws IOException {
FlinkPipelineOptions options = testPreparingResourcesToStage("[local]");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 similar tests above could be only one parameterised but not a big deal for now.

@aromanenko-dev
Copy link
Contributor

Since Flink side is ok (thanks to @mxm for review) and other part is fine for me as well, so it LGTM and let's wait for green tests and I think we can merge it after.

@aromanenko-dev
Copy link
Contributor

@lgajowy Could you squash all commits before?

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 20, 2018

Run Java PreCommit

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 20, 2018

@aromanenko-dev I rewrote the history squashing the commits that were irrelevant. Initially, this PR was all about Spark (BEAM-3371) but I also added one commit related to Flink (BEAM-3370) which is about adding the tests. All this in light of the recent dev list discussion and docs.

Let me know if this looks good to you too - I have never really received feedback about the history shape, although I try to care about it. It looks that we (commiters especially) should be more aware about the git history so this PR is a good chance to actually get the feedback and practice this. :)

@aaltay
Copy link
Member

aaltay commented Sep 28, 2018

What is the status of this PR?

@lgajowy
Copy link
Contributor Author

lgajowy commented Sep 30, 2018

@aaltay @aromanenko-dev suggested some additional manual checks what happens if we run this using spark-submit. I didn't test that yet - I should be able to do it later this week.

@aromanenko-dev
Copy link
Contributor

Run Flink ValidatesRunner

@aromanenko-dev
Copy link
Contributor

Run Spark ValidatesRunner

@aromanenko-dev
Copy link
Contributor

@lgajowy I performed testing on my side - I created a fat jar with basic WordBasic pipeline using mvn artifacts built from the branch of this PR and run it on my virtual YARN/Spark cluster. I didn't see any issues with that, so, I think it LGTM and we can merge it.

@aromanenko-dev aromanenko-dev merged commit e79918c into apache:master Oct 2, 2018
@lgajowy
Copy link
Contributor Author

lgajowy commented Oct 2, 2018

@aromanenko-dev thanks a lot! :)

@lgajowy lgajowy deleted the spark-integration-tests branch May 20, 2019 14:49
@lgajowy lgajowy restored the spark-integration-tests branch May 20, 2019 14:49
@lgajowy lgajowy deleted the spark-integration-tests branch May 20, 2019 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants