-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Windmill support for MultimapState #23492
Conversation
Codecov Report
@@ Coverage Diff @@
## master #23492 +/- ##
==========================================
+ Coverage 72.82% 73.32% +0.50%
==========================================
Files 775 719 -56
Lines 102928 95799 -7129
==========================================
- Hits 74958 70249 -4709
+ Misses 26515 24239 -2276
+ Partials 1455 1311 -144
Flags with carried forward coverage won't be shown. Click here to find out more. see 274 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
bd6791a
to
81bdf90
Compare
81bdf90
to
f48eeab
Compare
f48eeab
to
add1c3f
Compare
add1c3f
to
14a2006
Compare
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
keyState.existence = KeyExistence.KNOWN_EXIST; | ||
keyState.values.extendWith(entry.getValue()); | ||
// We can't set keyState.valuesCached to true here, because there may be more | ||
// paginated values that should not be filtered out in above if statement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't a paginated read, can we set valuesCached?
Or shoudl comment be updated that there may be additional entries for the key.
return Iterables.unmodifiableIterable( | ||
Iterables.concat(mergedCachedEntries(), fromWindmill)); | ||
MultimapIterables<K, V> cachedEntries = mergedCachedEntries(); | ||
entries.forEach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is back to loading all the pages into the memory before iterating
Could you instead make it lazy by returning windmill non-cached data first (filtering cached or added to cached) then return the rest?
Iterables.concat(Iterables.transform( logic in entries.forEach),
some iterable that lazily calls mergedCacheEntries and iterates);
Maybe you can make some test for this by having the test have some iterable that would return GBs of data total which should be streamed through and not cause test to OOM?
56a57e7
to
230a03f
Compare
230a03f
to
8fa3d3f
Compare
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
@@ -679,6 +834,49 @@ private void consumeResponse(Windmill.KeyedGetDataResponse response, Set<StateTa | |||
consumeSortedList(sorted_list, stateTag); | |||
} | |||
|
|||
for (Windmill.TagMultimapFetchResponse tagMultimap : response.getTagMultimapsList()) { | |||
// First check if it's keys()/entries() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can always optimize it later
private enum KeyExistence { | ||
// this key is known to exist, it has at least 1 value in either localAdditions or windmill | ||
KNOWN_EXIST, | ||
// this key is known to be nonexistent, it has 0 value in both localAdditions and windmill |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/0 value/no values/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (allKeysKnown) { | ||
keyState = keyStateMap.get(structuralKey); | ||
if (keyState == null || keyState.existence == KeyExistence.UNKNOWN_EXISTENCE) { | ||
if (keyState != null) keyStateMap.remove(structuralKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be simpler to not remove in this read if we're not inserting.
It seems like we shouldn't have UNKNOWN_EXISTENCE in the map except when we construct new entries. But if that's not the case for some reason it seems simpler to just leave it as is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UNKNOWN_EXISTENCE is needed in situations like when a key doesn't exist in the map, and we need to check windmill to find out its existence. Before the windmill read returns result, we need to mark it as UNKNOWN_EXISTENCE.
ByteString encodedKey = keyStream.toByteStringAndReset(); | ||
Windmill.TagMultimapEntry.Builder entryBuilder = builder.addUpdatesBuilder(); | ||
entryBuilder.setEntryName(encodedKey); | ||
entryBuilder.setDeleteAll(keyState.removedLocally); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant leave proto to default of false if false
if (keyState.removedLocally) entryBuilder.setDeleteAll(true);
keyState.valuesSize = 0; | ||
keyState.existence = KeyExistence.KNOWN_NONEXISTENT; | ||
} else { | ||
// no data in windmill, deleting from local cache is sufficient. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I think that inversion is easier to read, can you use it and swap the cases?
But shouldn't such a key be KNOWN_NONEXISTENT and handled above?
Maybe the special handling can just be removed and can assert that it is in the expected KNOWN state?
keyState.existence = KeyExistence.KNOWN_EXIST; | ||
keyState.values.extendWith(entry.getValue()); | ||
keyState.valuesSize += Iterables.size(entry.getValue()); | ||
// We can't set keyState.valuesCached to true here, because there may be more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the values within the bag page. But the iterable which you are examining here is either
- those values and thus weighted if a single page
- or a PagingIterable (which is not weighted)
entry.getValue().forEach(expectedMap.get(key)::add); | ||
} | ||
for (Map.Entry<ByteString, List<Integer>> entry : actual) { | ||
assertThat( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explicit check that expectedMap contains the key?
to avoid weird possiblities that returning null from remove possibly matches some weird actual state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest2.build()); | ||
Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest3.build()); | ||
Mockito.verifyNoMoreInteractions(mockWindmill); | ||
// NOTE: The future will still contain a reference to the underlying reader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -645,6 +655,1163 @@ public void testMapComplexPersist() throws Exception { | |||
assertEquals(0, commitBuilder.getValueUpdatesCount()); | |||
} | |||
|
|||
private static <T> ByteString encodeWithCoder(T key, Coder<T> coder) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just rethrow as RuntimeException to avoid having to add IOException to everything when not expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
@Test | ||
public void testMultimapCachedPartialEntryCannotCachePolled() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test name is a little unclear
how about a comment explaining the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the test.
ByteArrayCoder.of().decode(entryUpdate.getEntryName().newInput(), Context.OUTER); | ||
assertTrue(Arrays.equals(key1, decodedKey)); | ||
assertTrue(entryUpdate.getDeleteAll()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these look good but with such a complex class it is hard to cover everything. A fuzz test might help ensure that we're not missing something. You could do something like:
have two maps and in a loop
- randomly make a modification to both maps (add value to 100 random keys, remove 100 random keys, clear map)
- perform operations like persist, read that shouldn't affect viewed entries on one of the maps randomly
And then at the end or periodically verify that both maps have the same observed state by iterating over both. Would be a nice sanity check that things are always consistent and we covered everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a fuzz test to perform different modifications and verify the state.
@zhengbuqian Any updates on this PR? |
} | ||
Windmill.WorkItemCommitRequest.Builder commitBuilder = | ||
Windmill.WorkItemCommitRequest.newBuilder(); | ||
underTest.persist(commitBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also recreate the multimap after some rounds so that it has to initialize from persisted state as necessary? As is it is fully cached all the time and thus is likely not merging in-memory or on-disk state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Now it clears cache after 100 rounds, and ran another 100 rounds. In between it calls keys()
and get()
to rebuild cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Buqian for all your work on this!
I'm not a committer so adding
R: @reuvenlax
Run Java PreCommit |
precommit failure looked unrelated so rerunning |
Run Java PreCommit |
@reuvenlax can you merge? |
thanks @scwhittle! If more changes are needed pls let me know, I'll work on fixes! |
This PR adds windmill legacy worker support for MultimapState.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.