-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ordered list state for FnApi. #30317
Conversation
If there are changes on a state after we obtain iterators from calling read() and readRange(), the behavior of these pre-existing iterators were incorrect in the previous implementation. The change introduced here will make sure that these iterators will still work as if no local change is made.
ac40716
to
5d7cd5e
Compare
Run Python PreCommit 3.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't get yet to implementation/test
// A response to the get state request for an ordered list. | ||
message OrderedListStateGetResponse { | ||
bytes continuation_token = 1; | ||
bytes data = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we return multiple elements if the request was a range?
should we return the sort-key for elements, that seems like part of the user-data for example if it's some id/timestamp the user might want it back instead of having to duplicate it in the payload as well.
should this be repeated OrderedListEntry instead of data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we return multiple elements if the request was a range?
We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token.
should this be repeated OrderedListEntry instead of data?
I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is not achievable with OrderedListEntry.
But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s
That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/state/OrderedListState.java
Line 29 in 3693174
extends GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> { |
// timestamp range is not specified, the runner should use [MIN_INT64, | ||
// MAX_INT64) by default. | ||
message OrderedListStateGetRequest { | ||
bytes continuation_token = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on continuation token, ie should be returned by previous response and the key/range should match the previous request generating the token
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on continuation token ...
Good idea! I will add that when I revise the code.
Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?
I don't think the range is a hard requirement.
In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token:
beam/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Line 228 in 5d7cd5e
// The continuation format here is the sort key (long) followed by an index (int) |
In other implementation, you may need the range, but I think it is implementation-dependent. That's why I hesitate to put a range/sort key as a separate field when continuation token is present.
I thought the continuation token should allow the runner to uniquely identify where to find the next block of data. Is that what you mean by "mutually exclusive"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I meant by mutually exclusive. If we allow providing both, we have to figure out what to do when they disagree, and there's no good usecase for that.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
Outdated
Show resolved
Hide resolved
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
Outdated
Show resolved
Hide resolved
// timestamp range is not specified, the runner should use [MIN_INT64, | ||
// MAX_INT64) by default. | ||
message OrderedListStateGetRequest { | ||
bytes continuation_token = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?
// A response to the get state request for an ordered list. | ||
message OrderedListStateGetResponse { | ||
bytes continuation_token = 1; | ||
bytes data = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?
|
||
// A request to update an ordered list | ||
message OrderedListStateUpdateRequest { | ||
// when the request is processed, deletes should always happen before inserts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we instead split this up into two separate requests, e.g. OrderedListStateInsertRequest and OrderedListStateDeleteRequest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion 2 in the design doc talked about this, but there is no conclusion on that https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/
I am fine with splitting them though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to outline the pro/con in the design doc of little decisions like, and note which one was chosen and why.
For example one benefit to splitting the requests is to avoid ordering issues. We would have to spec that either the inserts or deletes happen first, even though they are in one request together. It is a bit confusing. And then if you want them in the other order, you still have to make two requests but each one has an empty field.
And note whether there is an efficiency consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after this round of review completes.
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
Outdated
Show resolved
Hide resolved
// A response to the get state request for an ordered list. | ||
message OrderedListStateGetResponse { | ||
bytes continuation_token = 1; | ||
bytes data = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
* The range information is placed in the state key of ordered list * For consistency, we reuse the existing get request and response mesasages of other states like Bag, MultiMap, etc.
* Reuse existing messages of clear and append.
* Replace String::size() > 0 with String::isEmpty() * Return this in readLater and readRangeLater instead of throwing an exception * Remove the added SupressWarnings("unchecked")
Previously, we used a repeated OrderedListEntry field in the AppendRequest particularly for ordered list state. For consistency, we now get rid of that and use the same data field as other states.
5a438a3
to
90b7f7d
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Run Java PreCommit |
bd5838b
to
b481880
Compare
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
Outdated
Show resolved
Hide resolved
@@ -47,6 +60,7 @@ public class FakeBeamFnStateClient implements BeamFnStateClient { | |||
private static final int DEFAULT_CHUNK_SIZE = 6; | |||
private final Map<StateKey, List<ByteString>> data; | |||
private int currentId; | |||
private final Map<StateKey, NavigableSet<Long>> orderedListSortKeysFromStateKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a pretty inefficient data structure to use. You might look at SortedSet
(which requires adding a deduplicating id) unless we can use SortedMultiset
from Guava. This would allow us to do things like remove all elements in a range without first copying over all keys in the range and then removing them one by one (which requires a hash each time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see this is part of the "fake" client, so maybe we don't care as much about performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The structure is used to support multiple ordered list states within one DoFn.
For example, we can have order list state A and B, which have different StateKeys. The outer map is only used once when looking up the right ordered list.
Then under each state key, the NavigableSet, which is internally initialized with a TreeSet, can support removing a range of keys efficiently.
|
||
StateKey.Builder keyBuilder = key.toBuilder(); | ||
|
||
// clear the range in the state key before using it as a key to store, because ordered list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment. Are we trying to handle the case where the same state key shows up in initialData multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initialData is a list of entries to be stored in the ordered list, which is an idea borrowed from the MultimapUserStateTest.
An entry is specified by the value and a state key (including the name of the state and the range this entry will fall in). Multiple entries may belong to the same state because only the name of the state is the identifier.
When we need to put these entries in the internal structure, we will remove the range field in the state key so that entries of the same state identifier will be put into one bucket.
@Override | ||
public void verifyDeterministic() throws NonDeterministicException { | ||
verifyDeterministic( | ||
this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why is this? It seems like we have no way of overwriting an individual element in the ordered list, so if it gets coded differently it wouldn't matter... It seems weird to be using non-deterministic coders anyway, but how would it affect correctness here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The coder here is based on the one defined at
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
Line 122 in 462c833
public void verifyDeterministic() throws NonDeterministicException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just part of the contract that coders must define, though I agree it's not something that would be likely to get leaked to somewhere that it matters...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this first version of implementation. I prefer keeping it there so it is consistent with the existing TimeStampedValueCoder defined in TimestampedValue.java
. We can surely improve that later.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Outdated
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Show resolved
Hide resolved
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Show resolved
Hide resolved
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/OrderedListUserStateTest.java
Show resolved
Hide resolved
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #30317 +/- ##
============================================
- Coverage 71.18% 71.13% -0.06%
Complexity 3014 3014
============================================
Files 1058 1055 -3
Lines 134076 133466 -610
Branches 3254 3254
============================================
- Hits 95437 94935 -502
+ Misses 35498 35400 -98
+ Partials 3141 3131 -10 ☔ View full report in Codecov by Sentry. |
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
Show resolved
Hide resolved
import org.junit.runners.JUnit4; | ||
|
||
@RunWith(JUnit4.class) | ||
public class OrderedListUserStateTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there also pipeline-level tests that we can now enable for portable runners (and, once it's implemented, Dataflow)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have pipeline level tests for states including OrderedListState at java/org/apache/beam/sdk/transforms/ParDoTest.java
.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Outdated
Show resolved
Hide resolved
@Override | ||
public void verifyDeterministic() throws NonDeterministicException { | ||
verifyDeterministic( | ||
this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just part of the contract that coders must define, though I agree it's not something that would be likely to get leaked to somewhere that it matters...
|
||
public void add(TimestampedValue<T> value) { | ||
checkState( | ||
!isClosed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would the user see a closed state? Is it if they held onto it after the bundle closed or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I implement the closed state in OrderedListUserState is consistent with the other states. The flag isClosed
is set when the class method asyncClose()
is called.
beam/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Line 276 in 65aef1a
isClosed = true; |
This class method is registered in stateFinalizers at FnApiStateAccessor.java
beam/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
Line 929 in 65aef1a
stateFinalizers.add(rval::asyncClose); |
It is then called after each processElement in FnApiDoFnRunner.java
beam/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
Lines 836 to 842 in 65aef1a
doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext))))); | |
} finally { | |
currentElement = null; | |
currentRestriction = null; | |
} | |
this.stateAccessor.finalizeState(); |
OrderedListRange.newBuilder().setStart(sortKey).setEnd(sortKey + 1).build())) | ||
.build(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've covered a lot of cases here, but this seems like the kind of thing where it'd be good to have fuzz tests too. Say you do dozens of random interspersed reads, writes, and clears. You could do the same thing on a naive List (as long as you kept your range small enough) and verify you got the same result. (Make sure you record a seed so you can reproduce failures.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. Maybe we can do that in a follow-up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Please file an issue and assign it to yourself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Outdated
Show resolved
Hide resolved
private NavigableMap<Instant, Collection<T>> pendingAdds = Maps.newTreeMap(); | ||
private TreeRangeSet<Instant> pendingRemoves = TreeRangeSet.create(); | ||
|
||
private boolean isCleared = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this represent, as one can clear various sub-ranges.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a flag when a user calls the class method clear()
to wipe out all elements in the order list. In this case, we have some special code path, as any previously persistent storage for this state will be cleared once async_close()
is invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to add that to a comment for this variable.
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/OrderedListUserState.java
Show resolved
Hide resolved
… and pendingRemoves during async_close()
Run Java PreCommit |
Run Java_IOs_Direct PreCommit |
Run Go PreCommit |
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java
Show resolved
Hide resolved
OrderedListRange.newBuilder().setStart(sortKey).setEnd(sortKey + 1).build())) | ||
.build(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Please file an issue and assign it to yourself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
First attempt to implement ordered list state for FnApi. Notice that caching has not been implemented yet.
Reference:
https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.