-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable MapState and SetState for dataflow streaming engine pipelines with legacy runner by building on top of MultimapState. #31453
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
91f0765
to
310042b
Compare
R: @acrites |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
310042b
to
63627a6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple small things I noticed.
@Override | ||
void initializeForWorkItem( | ||
WindmillStateReader reader, Supplier<Closeable> scopedReadStateSupplier) { | ||
super.cleanupAfterWorkItem(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be initializeForWorkItem
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doh, fixed.
@Override | ||
public T read() { | ||
Iterator<T> iterator = wrapped.read().iterator(); | ||
if (iterator.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If `!hasNext()'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the tests could use some improvement. I updated the validates runner test to cover a little more but I will also going to add unit tests to WindmillStateInternalsTest. I'll let you know when it's ready for another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some unit tests
24e1905
to
b002791
Compare
@@ -763,6 +786,28 @@ public void testMultimapPutAndGet() { | |||
assertThat(result.read(), Matchers.containsInAnyOrder(1, 1, 2, 3)); | |||
} | |||
|
|||
@Test | |||
public void testMapViaMultimapPutAndGet() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some other edge cases that might be worth testing (unless I missed them somewhere):
- Overwriting a key with a new value and then reading returns the new value.
- Reading a key that hasn't been "put" returns correct value.
- Put then Remove then Read returns empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added more tests and it exposed bug in multimapstate (improved multimap state test to expose it directly). During sequence of remove/put/remove/put multimapstate would remove the local knowledge of the remove.
waitAndSet(keysFuture, Arrays.asList(multimapEntry(key1), multimapEntry(key2)), 30); | ||
|
||
mapState.put(key1, 7); | ||
mapState.put(dup(key3), 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: why do we sometimes need to call this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it just copies the key to ensure that it is compared based upon value instead of shallow compararison
mapState.put(key1, 7); | ||
mapState.put(dup(key3), 8); | ||
mapState.put(key4, 1); | ||
mapState.remove(key4); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also be interesting to test the case where key5
is returned by the backend read, but we've already called mapState.remove(key5)
to make sure it's properly filtered out. I think this is basically the contents of testMultimapEntriesAndKeysMergeLocalRemove
below, but not for MapViaMultimap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more small comments about tests.
0c743a7
to
c586f41
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
8e2ff30
to
3d4aab4
Compare
with legacy runner by building on top of MultimapState.
3d4aab4
to
1c11d53
Compare
Run Java PreCommit |
…with legacy runner by building on top of MultimapState. (apache#31453)
Tests were not enabled for MapState/SetState for appliance on legacy runner. Enabling tests shows that MapState wasn't conforming to the spec that a read iterator is unaffected by subsequent writes, so this was fixed. Additional tests for the new map state built on MultimapState, exposed that MultimapState had a bug with remove/put/remove/put sequence which was fixed.
Related to issue #18200
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.