-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8162] Encode keys as NESTED for flink keyselector #9464
Conversation
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, thanks for the help! How did you find this?
The patch makes total sense. I'm only concerned that this may break other runners (how can we make sure that they implement this correctly??). Maybe we should go the opposite way and make the flink side coder NESTED instead?
Also I'm wondering if we can create a simple @ValidatesRunner
test that would make sure that both FnApi and Runner use the same coder across all runners 🤔
@robertwb You've deprecated coder contexts few years back, what is the current status of this effort?
PreCommit test failures seem to be unrelated (CassandraIO again).
cc @kennknowles |
Adding a couple of folks who would know the answer better. I am not sure where the conversation ended about portability and "outer" vs "nested" encodings. |
IIRC, we were moving to simply use nested coders everywhere at the portability layer. That seems the better fix. |
@robertwb thanks a lot for the feedbacks. I agree that for making our framework as simply as possible is a good idea. What I concerned is it will introduce some overhead for each key if we use NESTED here. What to do you think? |
Hi @dmvk I have debug in my local many times, finally find this bug, and great thanks for helping double check the PreCommit test failures is unrelated this PR. |
OK, this matches what I thought. I think "nested" means "self-delimiting". So for things like fixed-width twos complement integers there is no performance cost. The performance cost is only for non-fixed-width encodings like strings. The only unusual case is protobuf-style "varint" and related encodings. |
@kennknowles great, can you think about any |
Is there a measurable performance difference/overhead is for using nested in this context? If not, I'd rather keep thing simple over introducing unnecessary optimization. |
CC: @lukecwik |
The proto specifically says to use NESTED:
We currently use NESTED everywhere in the Beam portability APIs since NESTED vs OUTER becomes quite complicated and extremely error prone. Until there are benchmarks that show that this is a noticeable area of performance concern, we should stick with a simpler API specification. Mental note to self is to update the proto descriptions to mention self-delimiting everywhere instead of NESTED/OUTER. |
a856332
to
4a0eaf3
Compare
Thanks for all of your feedback! @kennknowles @robertwb @lukecwik @dmvk Frankly speaking, I did not do any performance testing, just an intuitive theoretical speculation. If we need to use NESTED from an architectural perspective, I agree with this decision. I have update the PR which change the OUTER to NESTED. Best, Jincheng |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍 Thanks for the contribution!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this PR. A couple of comments:
- Why is there no JIRA issue? "Hotfixes" are not in line with the Beam development guide. This is quite an important change which effects backwards-compatibility.
- There is no proper explanation why this is necessary.
- The original
OUTER
context was chosen for a reason because there is no need to useNESTED
for the Flink key. If tests are failing, I think we should look at the root cause and not change the entire encoding of Flink keys.
In my opinion, without further explanation and analysis, this warrants a revert.
It looks to me the test issues were coming from the timer key passed from the portability layer. We could easily change the encoding context there. |
@mxm reverting this until your comments are resolved @sunjincheng121 can you please create a JIRA for the issue? |
Just to be clear, fixing the underlying cause is a one liner. The following line needs to be commented out: Line 349 in 8869fce
edit: We explicitly need to change the key encoding context here: // Move from NESTED transport (Beam) to OUTER context (Flink internal)
K bytes = CoderUtils.decodeFromByteArray(keyCoder, key.toByteArray(), Coder.Context.NESTED);
ByteBuffer encodedKey = ByteBuffer.wrap(bytes.toByteArray()); The simplification that comes from the |
+1 for reverting this I find it helpful to annotate the source, which would not only provide relevant context but also suitable author(s) to ask questions. @sunjincheng121 please see https://beam.apache.org/contribute/ |
Thank you very much for all of your feedback! There are three solutions to this problem in my thoughts (we have already mentioned it above): Approach 1: Use OUTER in both keyselector and portability level. Approach 2: NESTED is used in both keselector and portability level. Approach 3: Use OUTER in the keyselector and NESTED in the portability level. I have did a simple test for case2: Hi @mxm, regarding you said: "break backwards-compatibility.". Do you meant that the break state backwards-compatibility, i.e., when change the OUTER to NESTED, the key-group-id will be changed, then user can not get the old state when upgrade the job. If so, #3 also encounter same problem. because we chanege the current key of state, user also cannot get the old state when upgrade the job. If I understand correctly, all of the approaches should have the backwards-compatibility problem. So, please correct me If there anything I misunderstand. So, for now, I still prefer #2. What do you think? @kennknowles @robertwb @dmvk @mxm @lukecwik @tweise Best, |
Personally. I lean towards 2) as well, because it's way more error prone. These things are super hard to test and it makes the runner code hard to understand (at least for me, because I'm not that familiar with the runner code-base). Also coder contexts are marked as deprecated, is the deprecation still valid? |
Yes, the deprecation on coder contexts is valid. Originally coders were used to read/write from/to files in addition to encoding/decoding records that are being processed by the pipeline and that reading/writing is what really needed an OUTER context. |
@sunjincheng121 thanks for the follow-up. I think this discussion would be worth having on the mailing list though :) |
@sunjincheng121 that sounds good. There is another PR that will unblock the 1.9 runner: #9484 The 1.9 runner PR has open issues on its own also. |
It may have been open for a long time but it was only considered ready-to-review with tests passing yesterday. I too want this to be part of 1.9 but we can't force it. Things need enough time to be reviewed to ensure the quality of the next Beam release. |
Yes @mxm we already do your best try to make it available. and I can see you already do more effort for that PR. 👍 |
To comment on your summary, only approach I agree that it would be convenient to just use NESTED context. I'm not against it. However, I think it is also valid to keep the existing behavior, as the state requests is the only interface that exposes the raw key. In any case, it makes sense to think about this, test this, and ask the people who are most familiar with the code. |
I don't consider a response time of one day to be long. Please keep in mind that people may be occupied with other tasks. |
Sorry, maybe I didn't express my meaning correctly. I meant that, we already do our best to fix it. :) |
Is there any progress here? @mxm |
Great thanks for your remind @dmvk! I seriously looked at the changes in #9484(already merged). I found that the changes to the Java side are essentially the same as my current PR, so until now I still think there is no problem with the solution in my PR. Merging this PR is no problem. We can always improve or fix some code if reviewer have any concern. So, I think that it is not necessary to revert this PR. I will add more comments on the #9484. Best, |
I very much appreciate your work on this @sunjincheng121. Let it be said, your contributions are recognized. I also have to repeat that your solution in this PR was incomplete. Not only did it not adhere to our contribution standard because it was originally without a JIRA ("hotfix"), but it was also missing the Python part which would have broken the Python support, as our tests showed. It required careful investigation to find out the root of the problem and to not break any other SDK as the result of a quick fix. We could not iterate on your PR because it was already merged, so reverting it was the only option. I know that getting one's commit reverted is not a good feeling. It happens to everyone eventually, and it is not to diminish your contributors in any way. Keep up the great contributions. I know that everyone in the Beam community greatly appreciates your work. |
Hi @mxm, thanks for your reply. I am glad to share my thoughts as follows:
BTW, not all the new contributors know the rules of Beam well. We should have a better way to correct the mistakes of new contributors. Reverting is not the only way, nor the best way. I believe we have the same goal: to find a better way to make contributors happy to contribute to the Beam community. So I just share my thoughts not only for this PR but also for how to deal with same kinds of problems in the future. Great thanks for affirmation of my contribution. Feel free to correct me if there are anything incorrect. Best, |
This is a hotfix, the changes is encode keys as OUTER for flink key supplier.
This PR blocked the PR: #9296
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.