-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10648: Add Prefix Scan support to State Stores #9508
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vamossagar12, Thank you for the PR!
Here my feedback!
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
Show resolved
Hide resolved
.../src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Outdated
Show resolved
Hide resolved
@cadonna thanks i made the requeiste changes barring 1. |
@cadonna , The failing tests here https://github.com/apache/kafka/pull/9508/checks?check_run_id=1376108664 don't seem to be related to this PR. Would it be possible to retest? |
hi @cadonna , wanted to know did you get a chance to look at my comment on the failing test case and the changes in general? |
@vamossagar12 I am really sorry. I haven't found the time yet, but it is on my ToDo list. I hope I will manage to make a pass this week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vamossagar12 Thank you for the updates and sorry again for the long delay!
Here is my feedback!
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
Outdated
Show resolved
Hide resolved
@vamossagar12 I replied to your comments. Let me know when the PR is ready for review. |
84b0ac8
to
0e45602
Compare
Thanks @cadonna , I have resolved all comments. Plz review again if things look ok now.. |
hey @cadonna did you get a chance to look at the latest set of changes? |
@vamossagar12 I am really sorry, but I haven't found the time yet to review the changes. I hope I will find the time next week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vamossagar12 for the updates!
I am wondering if we really need a separate prefix iterator or if we could just extend the range iterator and use that. See my comment in the respective iterator class.
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
Show resolved
Hide resolved
dd4c426
to
822edfe
Compare
@cadonna , made further changes. Plz review whenever you get a chance to do so! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vamossagar12, Thanks for the updates.
I have a couple of comments, but I also think that we are almost there.
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
Outdated
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
Outdated
Show resolved
Hide resolved
...ms/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
Outdated
Show resolved
Hide resolved
...ms/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
Outdated
Show resolved
Hide resolved
...ms/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
Outdated
Show resolved
Hide resolved
@cadonna , done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, @vamossagar12 !
@@ -291,6 +292,16 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { | |||
return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator, true); | |||
} | |||
|
|||
@Override | |||
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method needs unit testing. Try to use a mock for the cache in the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as below. Unit tests are in CachingInMemoryKeyValueStoreTest which already extends AbstractKeyValueStoreTest and creates an in memory cache store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. I missed those. Sorry! That is fine then, although I think unit tests with mocks would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I had created another ticket to streamline tests for CachingKVStore: https://issues.apache.org/jira/browse/KAFKA-10788. @rohitrmd had volunteered to take this up.
.../test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
Outdated
Show resolved
Hide resolved
.../test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
Outdated
Show resolved
Hide resolved
.../test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
Outdated
Show resolved
Hide resolved
.../test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
Outdated
Show resolved
Hide resolved
@@ -103,6 +105,20 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { | |||
} | |||
} | |||
|
|||
@Override | |||
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vamossagar12 I can still not find the unit test for this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this, do you want me to add the test cases here?https://github.com/apache/kafka/blob/17be91a37214bf77430c65d9300a5120e4348df9/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
There are tests in CachingInMemoryKeyValueStoreTest, which is where the tests for other methods like range etc have been added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those tests never call the prefixScan()
on the underlying in-memory state store because all entries fit into the cache. You would need to add another test that flushes the cache before you call prefixScan()
. I would prefer a test that directly tests the in-memory store without any cache in between.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the new ticket: https://issues.apache.org/jira/browse/KAFKA-12289 and the PR for the ticket:
#10052
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
Show resolved
Hide resolved
...ms/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
Outdated
Show resolved
Hide resolved
@vamossagar12 Could you please add a more descriptive commit message? |
Call for committer review: @mjsax @guozhangwang @ableegoldman |
…/ChangeLoggingKeyValueBytesStoreTest.java Co-authored-by: Bruno Cadonna <bruno@confluent.io>
…/ChangeLoggingKeyValueBytesStoreTest.java Co-authored-by: Bruno Cadonna <bruno@confluent.io>
…/ChangeLoggingKeyValueBytesStoreTest.java Co-authored-by: Bruno Cadonna <bruno@confluent.io>
…/ChangeLoggingKeyValueBytesStoreTest.java Co-authored-by: Bruno Cadonna <bruno@confluent.io>
…/MeteredKeyValueStoreTest.java Co-authored-by: Bruno Cadonna <bruno@confluent.io>
…/metrics/StateStoreMetricsTest.java Co-authored-by: Bruno Cadonna <bruno@confluent.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a quick pass on the PR and it looks good to me. After test is green I can merge it.
@cadonna could you also help @vamossagar12 to come up with a PR on docs for this KIP?
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
Outdated
Show resolved
Hide resolved
@guozhangwang @vamossagar12 This failure is related to this PR:
I provided the fix for this in the corresponding test. |
@vamossagar12 Probably, it would also be good to rebase this PR to current trunk to get a green build since we merged a couple of fixes to flaky tests recently. |
Yes, @vamossagar12, you should open a PR and add a short paragraph about KIP-614 in the upgrade guide. Looking forward to reviewing it! 🙂 |
…/MeteredKeyValueStoreTest.java Co-authored-by: Bruno Cadonna <bruno@confluent.io>
I've merged this PR to trunk. Thanks @vamossagar12 so much for your contribution! And thanks to @cadonna for the very thorough review! |
Thanks @guozhangwang ! |
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)