Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7730] Introduce Flink 1.9 Runner #9296

Merged
merged 3 commits into from
Oct 22, 2019
Merged

[BEAM-7730] Introduce Flink 1.9 Runner #9296

merged 3 commits into from
Oct 22, 2019

Conversation

dmvk
Copy link
Member

@dmvk dmvk commented Aug 8, 2019

Runner for Flink 1.9. JIRA.

@dmvk dmvk force-pushed the BEAM-7730 branch 2 times, most recently from 0f5805f to 09fc860 Compare August 12, 2019 11:13
Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @dmvk!
The changes looks good to me.
Only one suggestion here: Could you please upgrade the flink version in https://github.com/apache/beam/blob/master/runners/flink/1.8/build.gradle. i.e., flink_version = '1.8.0' -> flink_version = '1.8.1'?
Best,
Jincheng

@dmvk
Copy link
Member Author

dmvk commented Aug 26, 2019

@sunjincheng121 Thanks for the review! I'll try to finish this one later this week. I think we can not upgrade to 1.8.1 due to:

and will have to wait for 1.8.2 release.

Best,
D.

@dmvk
Copy link
Member Author

dmvk commented Aug 26, 2019

Run Java PreCommit

@sunjincheng121
Copy link
Member

I see. Thanks @dmvk

@dmvk
Copy link
Member Author

dmvk commented Aug 27, 2019

@sunjincheng121 I'm still unable to fix the portability test for timers (the one that is @Ignored in this PR). It looks up state for a key from a different key-group during @OnTimer execution, which results in NPE. I'm still unsure about the cause. When I change paralellism to 1, it works as expected because all the keys are in a single partition.

Flink PR that caused the "regression" from 1.8 is most likely this one https://issues.apache.org/jira/browse/FLINK-12693.

Any ideas?

@sunjincheng121
Copy link
Member

Thanks @dmvk, I would like to check it, then feedback you! :)

@sunjincheng121
Copy link
Member

Hi @dmvk,I creat a hotfix to address the problem. (see: #9464)

Currently, state keys are encoded as NESTED while the keys in keyselector(KvToByteBufferKeySelector)
are encoded as OUTER. This makes the keyGroupId of the same key are different and
lead to the test failure.
The newest flink version makes this bug more visible.

I have check it in my local with your patch. Could you please have look at the changes of the hotfix and double check it whether the hotfix is make sense to you. :)

Thank you!
Jincheng

@dmvk dmvk changed the title WIP: [BEAM-7730] Introduce Flink 1.9 Runner [BEAM-7730] Introduce Flink 1.9 Runner Sep 4, 2019
@dmvk
Copy link
Member Author

dmvk commented Sep 4, 2019

@sunjincheng121 Thanks for the help! ;) I've changed the flink side coder to NESTED for now, until #9464 discussion gets resolved. I'm unsure about the implications changing context to OUTER would have on other runners.

@dmvk
Copy link
Member Author

dmvk commented Sep 5, 2019

@sunjincheng121 All checks are finally green ;) Can please you do the final review?

@dmvk dmvk requested a review from mxm September 5, 2019 09:17
@sunjincheng121
Copy link
Member

Thanks for the update @dmvk
LGTM. +1 to mreged.
Best, Jincheng

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dmvk for taking the time to work on the Flink 1.9 Runner. Much appreciated!

There are some questions inline, especially I'm concerned about the high number of duplicated files, for which we already have two versions. It should be possible to avoid those duplications and keep at most two versions of the suspect files (current state).

Another point is the addition of a feature unrelated to Flink 1.9. I don't think his PR should add any new features but Flink 1.9. Additionally, the stop functionality will even be removed in the next Flink release, so I don't see a reason for adding this. Even if we decide to add it, it should be handled in a separate PR.

* href="https://github.com/apache/flink/commit/e95b347dda5233f22fb03e408f2aa521ff924996">Flink
* interface removal commit.</a>
*/
public interface BeamStoppableFunction extends StoppableFunction {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are adding support for this now, although it has already been removed upstream? Please make this a separate PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Max, this is just an interface for a backward compatibility.

Originally RichParallelSourceFunction was implementing StoppableFunction, which was removed as of Flink 1.9., so the following code wouldn't compile:

I didn't want to break this for the previous runner versions, so I introduced this interface that would handle the transition.

Other options would be completely remove the stop() implementation from the above mentioned classes.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, that makes sense to me.

Other options would be completely remove the stop() implementation from the above mentioned classes.

That wouldn't work across different versions due to stop function having to be implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally RichParallelSourceFunction was implementing StoppableFunction, which was removed as of Flink 1.9., so the following code wouldn't compile:

Actually, I just checked this again in Flink. StoppableFunction always has been optionally. It was not implemented by RichParallelSourceFunction. The only place where we use it in Beam is in UnboundedSourceWrapper. Like you also suggested, I'd remove the stop implementation because it does not allow for a more graceful stop than cancel. This simplifies the setup here and in any case this will be broken in 1.9, so good to also make this change in Beam now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather create a JIRA to remove this once we get rid of 1.7 runner as some users may still rely on this.

applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env);
SourceTransformation<?> sourceTransform =
(SourceTransformation)
applyReadSourceTransform(transform, PCollection.IsBounded.BOUNDED, env);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to cast here, change the variable type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because SourceTransformation and OneInputTrasformation no longer extend a common interface, so applyReadSourceTransform needs to return Object instead.

@@ -1038,7 +1039,7 @@ void pushbackDataCheckpointing(

assertThat(
stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(helloElement, worldElement));
containsInAnyOrder(helloElement, worldElement));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the order reversed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the last issue, will try to investigate today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't guarantee the order here for keys in the state backend after restoring from a savepoint. This just so happens to work for older Flink versions. So seems fair to change this.

Copy link
Contributor

@tweise tweise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dmvk there are a few more general things in this PR that should have been brought to the attention of the community upfront, in the form of an ML thread. Did such discussion happen? I apologize if I missed it. In the meantime I'm placing a stop sign on this PR until there is clarity specifically how we can strike the balance between handling differences between multiple Flink versions and maintainability / tech debt.

@dmvk
Copy link
Member Author

dmvk commented Sep 7, 2019

@tweise Hi Thomas, there was no upfront discussion as this was first developed in our internal build and now we are in the process of contributing the runner to beam. The runner is made the exactly same way as the runners for previous flink versions. Unfortunately this time, there were many breaking changes on the flink side (due to stream & batch unification effort), so the problem with the current approach became more visible.

There were discussions on ML about removal of 1.5, 1.6 versions, which would IMO eliminate the overhead to a bearable level (these versions are unsupported by the flink community anyway). Most of the classes that we "copy & paste" between versions are due to 1.5 compatibility.

I'll try to bring this to ML to see, if anyone has a better idea how to handle these differences.

Thanks,
D.

@dmvk
Copy link
Member Author

dmvk commented Sep 7, 2019

It seems that 1.5 and 1.6 discussion was never brought to the ML, I've probably recalled it from a private conversation + BEAM-7962.

* href="https://github.com/apache/flink/commit/e95b347dda5233f22fb03e408f2aa521ff924996">Flink
* interface removal commit.</a>
*/
public interface BeamStoppableFunction extends StoppableFunction {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, that makes sense to me.

Other options would be completely remove the stop() implementation from the above mentioned classes.

That wouldn't work across different versions due to stop function having to be implemented.

runners/flink/1.9/build.gradle Outdated Show resolved Hide resolved
runners/flink/1.9/build.gradle Outdated Show resolved Hide resolved
@mxm
Copy link
Contributor

mxm commented Sep 8, 2019

IMHO there is no need to block this PR. When the comments are resolved, this looks good to merge. As for your concerns @tweise, there will be a major simplification in many regards (directory structure, code workarounds) once we remove 1.5. Supporting multiple versions does not come for free, especially when interfacing with the internals of a stream processor. That said, the current layout is very practical, we just need to make sure we do not duplicate code if it is not necessary.

@tweise
Copy link
Contributor

tweise commented Sep 8, 2019

@mxm Let's continue the larger discussion on the ML.

@tweise
Copy link
Contributor

tweise commented Sep 8, 2019

It seems that 1.5 and 1.6 discussion was never brought to the ML, I've probably recalled it from a private conversation + BEAM-7962.

I remember that being a topic several times, but don't recall exactly where. But such change should be an explicit discussion on the ML. In any case, there seems to be consensus to drop 1.5/1.6.

@tweise
Copy link
Contributor

tweise commented Sep 8, 2019

IMHO there is no need to block this PR.

AFAIK this PR is blocked pending resolution of the key encoding issue, so that it passes tests. Hopefully in parallel we can make progress on trimming down the code duplication also.

@mxm
Copy link
Contributor

mxm commented Sep 12, 2019

@dmvk Let us know when you have addressed the comments. Happy to take another look then.

@sunjincheng121
Copy link
Member

Hi @dmvk @mxm I have left some comments in both #9484 and #9464.

@dmvk dmvk force-pushed the BEAM-7730 branch 2 times, most recently from c89b232 to 791e3ee Compare October 8, 2019 13:42
* href="https://github.com/apache/flink/commit/e95b347dda5233f22fb03e408f2aa521ff924996">Flink
* interface removal commit.</a>
*/
public interface BeamStoppableFunction extends StoppableFunction {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally RichParallelSourceFunction was implementing StoppableFunction, which was removed as of Flink 1.9., so the following code wouldn't compile:

Actually, I just checked this again in Flink. StoppableFunction always has been optionally. It was not implemented by RichParallelSourceFunction. The only place where we use it in Beam is in UnboundedSourceWrapper. Like you also suggested, I'd remove the stop implementation because it does not allow for a more graceful stop than cancel. This simplifies the setup here and in any case this will be broken in 1.9, so good to also make this change in Beam now.

@dmvk
Copy link
Member Author

dmvk commented Oct 10, 2019

Run Java PreCommit

@dmvk
Copy link
Member Author

dmvk commented Oct 11, 2019

Run Python PreCommit

@sunjincheng121
Copy link
Member

Run CommunityMetrics PreCommit

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to update the Flink Runner page. Looks good other than the comments below.

@@ -142,7 +165,7 @@ class ValidatesRunnerConfig {

def createValidatesRunnerTask(Map m) {
def config = m as ValidatesRunnerConfig
tasks.create(name: config.name, type: Test) {
tasks.register(config.name, Test) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

@@ -178,8 +201,8 @@ def createValidatesRunnerTask(Map m) {
createValidatesRunnerTask(name: "validatesRunnerBatch", streaming: false)
createValidatesRunnerTask(name: "validatesRunnerStreaming", streaming: true)

task validatesRunner {
group = "Verification"
tasks.register('validatesRunner') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is recommended approach since 5.1.x as it creates the task lazily

https://docs.gradle.org/current/userguide/task_configuration_avoidance.html

main_source_dirs = ["$basePath/src/main/java", "./src/main/java"]
test_source_dirs = ["$basePath/src/test/java", "./src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep the ability to override resource dirs?

main_source_dirs = ["$basePath/src/main/java", "./src/main/java"]
test_source_dirs = ["$basePath/src/test/java", "./src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep the ability to override resource dirs?

flink_version = '1.9.0'
// Version specific code overrides.
main_source_overrides = ["${basePath}/1.7/src/main/java", "${basePath}/1.8/src/main/java", './src/main/java']
test_source_overrides = ["${basePath}/1.7/src/test/java", "${basePath}/1.8/src/test/java", './src/test/java']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that it easier to reason about the source, but at least we do not have code duplication. So looks fine for now.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is practically ready. Going forward, we can merge this and address any follow ups related to documentation and build flexibility.

@mxm
Copy link
Contributor

mxm commented Oct 21, 2019

Thanks @dmvk.

@mxm
Copy link
Contributor

mxm commented Oct 21, 2019

Retest this please

@mxm
Copy link
Contributor

mxm commented Oct 21, 2019

Run ValidatesRunner

@mxm
Copy link
Contributor

mxm commented Oct 21, 2019

Run Java Flink PortableValidatesRunner Streaming

@mxm
Copy link
Contributor

mxm commented Oct 21, 2019

Run Java Flink PortableValidatesRunner Batch

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@dmvk
Copy link
Member Author

dmvk commented Oct 22, 2019

@mxm Thanks! I'll follow-up with docs, build improvement suggestions and 1.9.1 upgrade.

@mxm
Copy link
Contributor

mxm commented Oct 23, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants