-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10271 Performance regression while fetching a key from a single partition #9020
Changes from 8 commits
2fb966f
839379d
a15d28f
c180275
4209ce2
f5f2e46
b7d38c6
89ac008
158db40
b22417f
da8554c
fca67e4
0fb7fe2
9838391
5d7c37f
73def50
f4f9355
702cc06
274617c
049ec22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet | |
public <T> List<T> stores(final String storeName, | ||
final QueryableStoreType<T> queryableStoreType) { | ||
final List<T> allStores = new ArrayList<>(); | ||
for (final StreamThreadStateStoreProvider provider : storeProviders) { | ||
final List<T> stores = provider.stores(storeQueryParameters); | ||
allStores.addAll(stores); | ||
for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { | ||
final List<T> stores = storeProvider.stores(storeQueryParameters); | ||
if (!stores.isEmpty()) { | ||
allStores.addAll(stores); | ||
if (storeQueryParameters.partition() != null) { | ||
break; | ||
} | ||
} | ||
} | ||
if (allStores.isEmpty()) { | ||
if (storeQueryParameters.partition() != null) { | ||
throw new InvalidStateStoreException( | ||
String.format("The specified partition %d for store %s does not exist.", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really a different condition than the one on L65? It seems like the failure is still probably that the store "migrated" instead of "doesn't exist", right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L65 catches on rebalancing, while L60 is parameter validation for incorrect partition case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you elaborate a bit more about this? If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @dima5rr , I think Guozhang's question was hidden because the conversation was already "resolved". Do you mind answering this concern? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @guozhangwang, you're right, this check is ambiguous, it's more likely parameter sanity validation when user explicitly specify a single partition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, in that case how about we just encode the partition in the thrown's message so that upon throwing, people can still check if the partition is Otherwise, this PR all LGTM :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @guozhangwang, I am just care that in case of partition is null, the error message is referenced in official FAQ. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a fair point, let's just merge it as is then. |
||
storeQueryParameters.partition(), | ||
storeName)); | ||
} | ||
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); | ||
} | ||
return allStores; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.