Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink 1.17 #30197

Merged
merged 1 commit into from
Apr 17, 2024
Merged

Flink 1.17 #30197

merged 1 commit into from
Apr 17, 2024

Conversation

je-ik
Copy link
Contributor

@je-ik je-ik commented Feb 2, 2024

Support Flink 1.17. Closes #29939.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

github-actions bot commented Feb 2, 2024

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link
Member

@dmvk dmvk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks reasonable overall 👍 My only concern is around the possible compatibility (within the same version) guarantees. PTAL

@@ -17,14 +17,19 @@
*/

def basePath = '..'
def addedVersions = ["1.12", "1.13", "1.14", "1.15"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: previousVersions might be slightly more explicit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I further simplified the build.

}

@Override
public void writeSnapshot(DataOutputView dataOutputView) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this class was introduced because TypeSerializerConfigSnapshot no longer exists. Should we rather bring the old class in to eliminate the risk (eg. restoring snapshots from the same version)?

By quickly checking the code, it mosly delegates the logic to TypeSerializerSnapshotSerializationUtil which still exists 🤔

https://github.com/apache/flink/blob/release-1.16.3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out that provided you preserve the TypeSerializer the TypeSerializerSnapshot can be changed arbitrarily. It can be used to change the TypeSerializer as well, byt we preserve it here.

@dmvk
Copy link
Member

dmvk commented Feb 2, 2024

Can you please link the commits with a corresponding JIRA issue?

@je-ik
Copy link
Contributor Author

je-ik commented Feb 3, 2024

Can you please link the commits with a corresponding JIRA issue?

Beam no longer uses JIRA to track issues, but uses github issues instead. The commit is linked with the corresponding issue.

@Abacn
Copy link
Contributor

Abacn commented Feb 15, 2024

Hi, what is the status of this PR? I see Flink validation runner postcommit test passed. Persumably this is complete?

@Abacn
Copy link
Contributor

Abacn commented Feb 15, 2024

Also, previously Beam aims to support 4 consecutive flink versions. If find anything hard to support Flink 1.12 (or 1.13) and 1.17 at the same time, feel free to drop the former (or do it later)

@je-ik
Copy link
Contributor Author

je-ik commented Feb 16, 2024

I need to polish and validate upgrade path so this is temporarily on hold.

@je-ik je-ik changed the title Flink 1.17 [DO NOT MERGE] Flink 1.17 Feb 16, 2024
* org.apache.beam.sdk.io.FileSystems} registration needed for {@link
* org.apache.beam.sdk.transforms.Reshuffle} translation.
*/
private final SerializablePipelineOptions pipelineOptions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment really mean for this class? From what I can tell, filesystem registration happens when constructing serializable pipeline options, which is not being done here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to keep this as a member?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just moved from main sources due to the fact it needs to be customized for Flink 1.17.

@je-ik
Copy link
Contributor Author

je-ik commented Feb 21, 2024

The upgrade is currently broken due to https://lists.apache.org/thread/s8o8jc2k2kb41q5g0v0xmoyszg1gdcst. After decision is made about the next steps this PR can continue.

@Abacn
Copy link
Contributor

Abacn commented Apr 4, 2024

Hi, as https://lists.apache.org/thread/s8o8jc2k2kb41q5g0v0xmoyszg1gdcst gets resolved by #30403, is this PR ready to be merged?

@je-ik
Copy link
Contributor Author

je-ik commented Apr 4, 2024

Hi, unfortunately I did not have time to finish it, yet. But target is currently release 2.56.0.

@Abacn
Copy link
Contributor

Abacn commented Apr 4, 2024

Thanks, that's awesome!

@je-ik
Copy link
Contributor Author

je-ik commented Apr 12, 2024

I verified the current implementation works and can read savepoint created by 1.16 (at least with the same beam version), but I need more tests regarding upgrade from current stable release. Moreover there is issue #29816 which seems to affect the current release as well. I would like to release 'new' runner without known severe bugs, so I think this should wait for one more release.

@je-ik je-ik changed the title [DO NOT MERGE] Flink 1.17 Flink 1.17 Apr 16, 2024
@je-ik
Copy link
Contributor Author

je-ik commented Apr 16, 2024

Unfortunately, upgrading Flink has still issues with upgrading various sources (due to Java serialization). Upgrade from 2.55.0 to 2.56.0 (in my testing Pipeline) is blocked due to incompatible change of KafkaIO.Read (added option).

Having said that, upgrade from 2.56.0 Flink 1.16 to Flink 1.17 works and that is as far as we can get. We need to merge #30987 and then we would need to implement #30385 and ideally #21897.

Other than that, this is ready from my part @Abacn, with follow-up #30988.

@Abacn
Copy link
Contributor

Abacn commented Apr 16, 2024

KafkaIO option change is #30877. It was having issue on upgrade compatibility. Let me check with the author if #30915 has fixed the upgrade issue

@je-ik
Copy link
Contributor Author

je-ik commented Apr 16, 2024

I don't think we can do anything about that at this point. I tried adding serialVersionUID, but it ends on incompatible change of the AutoValue generated class. Upgrading KafkaIO has been for this reason broken several times in last releases, we need to resolve the underlying issue (not serializing sources to checkpoint using Serializable interface). Which will again be upgrade-breaking change.

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, just have a minor comment about build file

versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
}

def all_versions = ['1.12', '1.13', '1.14', '1.15', '1.16', '1.17']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get these values from gradle.properties so keep a single path maintaining the versions? Just use

def all_versions = project.flink_versions.split(',')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've done this here 👍

flink_version = '1.12.7'
// Version specific code overrides.
main_source_overrides = ['./src/main/java']
test_source_overrides = ['./src/test/java']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for doing these refactorings

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, just have a minor comment about build file

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@je-ik je-ik merged commit d70c253 into apache:master Apr 17, 2024
35 checks passed
@je-ik je-ik deleted the flink-1.17 branch April 17, 2024 14:52
@damccorm damccorm mentioned this pull request May 3, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request]: Upgrade Beam Flink Runner to Flink 1.17
4 participants