-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes a regression related to BQ read transform upgrade via the TransformService #31685
Conversation
R: @ahmedabu98 or @johnjcasey |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (queryTempProject != null) { | ||
builder = builder.setQueryTempProject(queryTempProject); | ||
|
||
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.57.0") >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is for people to set the version back into the past. I don't think it works for this use case of making sure it is far enough into the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem case will be when this flag is not set at all, which will be the default case. So can we make things work by default when the flag is not set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the default case, the flag should be set by TransformUpgrader here
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
Lines 251 to 259 in 1037ede
String updateCompatibilityVersion = | |
optionsClone.as(StreamingOptions.class).getUpdateCompatibilityVersion(); | |
if (updateCompatibilityVersion == null || updateCompatibilityVersion.isEmpty()) { | |
// Setting the option 'updateCompatibilityVersion' to the current SDK version so that the | |
// TransformService uses a compatible schema. | |
optionsClone | |
.as(StreamingOptions.class) | |
.setUpdateCompatibilityVersion(ReleaseInfo.getReleaseInfo().getSdkVersion()); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this flag is set by the client when upgrading via the TransformUpgrader (client) so BQ transform just have to check it when upgrading and skip new fields (and set defaults).
// This property was added for Beam 2.57.0 hence not available when | ||
// upgrading the transform from previous Beam versions. | ||
String queryTempProject = configRow.getString("query_temp_project"); | ||
if (queryTempProject != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like this null check is probably redundant, if the configRow crashes out? Or does the row have three states? (null, empty, string)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, in this case the null check does not help since the field simply doesn't exist. I guess if the field has a null state it can still be good to have (unless we explicitly want to set null values).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, when rows expand the nullable fields should be filled with nulls. I know we have some support for this in the data plane in Dataflow. Perhaps there is something different here because it is not data plane?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "rows expand" here ? Creating a POJO from a "from Row" function created using the SchemaRegistry.fromRowFunction
util ?
Note we are trying to get specific fields from the Row
which map to construction fields here. We don't have a single construction object to directly parse the Row into. So we should probably just add utils to handle schema-errors due to missing fields here.
"pull_licenses_java.py" error is unrelated. |
This fixes a regression introduced by #31128.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.