Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10271 Performance regression while fetching a key from a single partition #9020

Merged
merged 20 commits into from
Oct 8, 2020

Conversation

dima5rr
Copy link
Contributor

@dima5rr dima5rr commented Jul 14, 2020

StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.

https://issues.apache.org/jira/browse/KAFKA-10271

Copy link
Contributor

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, could we add some unit test coverage?

@vvcephei
Copy link
Contributor

vvcephei commented Jul 14, 2020

Hi @dima5rr , thanks for the PR! I was looking into the context of this ticket and noticed that we're essentially just duplicating here the logic in org.apache.kafka.streams.state.internals.QueryableStoreProvider#getStore, which also loops over all stores of all providers as a sanity check before even returning the WrappingStoreProvider, which will loop over all stores of all providers again before returning any results.

Anecdotally, I think that most people would actually just query immediately and then discard their store reference, for example like value = kafkaStreams.store("storeName", partition=2).get("key"). In that case, this double-iteration is pretty expensive, and the up-front sanity check doesn't seem to provide any value.

In fact, the only value it could provide is if you do plan to save the store reference and re-use it for multiple queries. But in that case, there could be a rebalance at any time, so checking up front probably doesn't help much no matter what the use case is.

What do you think about removing the loop

final List<T> allStores = new ArrayList<>();
for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
final List<T> stores = storeProvider.stores(storeQueryParameters);
if (!stores.isEmpty()) {
allStores.addAll(stores);
if (storeQueryParameters.partition() != null) {
break;
}
}
}
if (allStores.isEmpty()) {
if (storeQueryParameters.partition() != null) {
throw new InvalidStateStoreException(
String.format("The specified partition %d for store %s does not exist.",
storeQueryParameters.partition(),
storeName));
}
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
in favor of the one in here?

Of course, the short-circuit you're providing here is valuable in any case.

Thanks!
-John

@vvcephei
Copy link
Contributor

Alternatively, as you observed, if the parameters contain a single partition, then there should just be one specific store that matches. Instead of returning a new WrappingStoreProvider at all, maybe the QueryableStoreProvider should just have a high-level branch that, if the partition is speficied, then find the store and return a new SingleStoreProvider, otherwise return a new WrappingStoreProvider.

@dima5rr
Copy link
Contributor Author

dima5rr commented Jul 15, 2020

Hi @vvcephei, thank you for the input.

After profiling under load it looks like problem in excessive loops over calling internalTopologyBuilder.topicGroups()
which is synchronized.
Though short-circuit WrappingStoreProvider can help, but will not solve root cause.

WDYT?

final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();

public synchronized Map<Integer, TopicsInfo> topicGroups() {

9020

@dima5rr
Copy link
Contributor Author

dima5rr commented Jul 15, 2020

Indeed StreamThreadStateStoreProvider does not need InternalTopologyBuilder in order to find stream task, it has all required data in StreamThread.

@dima5rr dima5rr requested a review from abbccdda July 15, 2020 17:48
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, and for adding the tests, and especially for profiling it!

I just had a couple of minor comments. Do you mind running the profiler again, just to make sure it lines up with your expectations?

Thanks,
-John

}
if (allStores.isEmpty()) {
if (storeQueryParameters.partition() != null) {
throw new InvalidStateStoreException(
String.format("The specified partition %d for store %s does not exist.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a different condition than the one on L65? It seems like the failure is still probably that the store "migrated" instead of "doesn't exist", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L65 catches on rebalancing, while L60 is parameter validation for incorrect partition case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more about this? If allStores.isEmpty() is empty, it is always possible that the specified store-partition or just store-"null" does not exist in this client. Why they are different failure cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dima5rr , I think Guozhang's question was hidden because the conversation was already "resolved". Do you mind answering this concern?

Copy link
Contributor Author

@dima5rr dima5rr Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang, you're right, this check is ambiguous, it's more likely parameter sanity validation when user explicitly specify a single partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, in that case how about we just encode the partition in the thrown's message so that upon throwing, people can still check if the partition is null or not when debugging?

Otherwise, this PR all LGTM :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang, I am just care that in case of partition is null, the error message is referenced in official FAQ.

https://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point, let's just merge it as is then.

@dima5rr dima5rr changed the title KAFKA-10271 Performance degradation while fetching a key from a single partition KAFKA-10271 Performance regression while fetching a key from a single partition Jul 21, 2020
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a great find, thanks @dima5rr ! I just had a question about one failure handling case, other wise it LGTM! cc @vvcephei

}
if (allStores.isEmpty()) {
if (storeQueryParameters.partition() != null) {
throw new InvalidStateStoreException(
String.format("The specified partition %d for store %s does not exist.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more about this? If allStores.isEmpty() is empty, it is always possible that the specified store-partition or just store-"null" does not exist in this client. Why they are different failure cases?

@@ -60,6 +60,34 @@ public void prepareTopology() throws InterruptedException {
rightStream = builder.stream(INPUT_TOPIC_RIGHT);
}

@Test
public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good coverage improvement! Thanks.

}
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " +
partition + " is not available on this instance");
private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great find, thanks!

@@ -56,25 +55,6 @@ public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storePr
if (!globalStore.isEmpty()) {
return queryableStoreType.create(globalStoreProvider, storeName);
}
final List<T> allStores = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@guozhangwang
Copy link
Contributor

test this please

1 similar comment
@guozhangwang
Copy link
Contributor

test this please

@guozhangwang guozhangwang reopened this Oct 3, 2020
@guozhangwang
Copy link
Contributor

test this

1 similar comment
@guozhangwang
Copy link
Contributor

test this

@guozhangwang
Copy link
Contributor

@dima5rr I tried to compile your branch but got a few compilation error like the following:

/Users/guozhang/Workspace/github/guozhangwang/kafka-work/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java:68: error: cannot find symbol
        final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer()));
                                                                                             ^
  symbol:   variable Serdes
  location: class StreamStreamJoinIntegrationTest
/Users/guozhang/Workspace/github/guozhangwang/kafka-work/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java:68: error: cannot find symbol
        final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer()));

albert02lowis and others added 3 commits October 5, 2020 12:08
apache#9108)

The main goal is to remove usage of embedded broker (EmbeddedKafkaCluster) in AbstractJoinIntegrationTest and its subclasses.
This is because the tests under this class are no longer using the embedded broker, except for two.
testShouldAutoShutdownOnIncompleteMetadata is one of such tests.
Furthermore, this test does not actually perfom stream-table join; it is testing an edge case of joining with a non-existent topic, so it should be in a separate test.

Testing strategy: run existing unit and integration test

Reviewers: Boyang Chen <boyang@confluent.io>, Bill Bejeck <bbejeck@apache.org>
@dima5rr
Copy link
Contributor Author

dima5rr commented Oct 5, 2020

Hi @guozhangwang can you trigger new build, looks like flaky tests?

@guozhangwang
Copy link
Contributor

test this please

1 similar comment
@guozhangwang
Copy link
Contributor

test this please

@guozhangwang
Copy link
Contributor

test this

@guozhangwang guozhangwang reopened this Oct 6, 2020
@guozhangwang
Copy link
Contributor

test this

@guozhangwang guozhangwang merged commit cc54000 into apache:trunk Oct 8, 2020
@guozhangwang
Copy link
Contributor

Test passed, merged to trunk.

Thanks @dima5rr for your great contribution!

guozhangwang pushed a commit that referenced this pull request Oct 8, 2020
…e partition (#9020)

StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.

Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor

Cherry-picked to 2.6 as well.

guozhangwang pushed a commit that referenced this pull request Oct 8, 2020
…e partition (#9020)

StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.

Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
mattwong949 pushed a commit to confluentinc/kafka that referenced this pull request Oct 13, 2020
* Updating trunk versions after cutting branch for 2.7

* KAFKA-9929: Support backward iterator on SessionStore (apache#9139)

Implements KIP-617 for `SessionStore`

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <vvcephei@apache.org>

* MINOR: remove unused scala files from core module (apache#9296)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Lee Dongjin <dongjin@apache.org>

* MINOR: correct package of LinuxIoMetricsCollector (apache#9271)


Reviewers: Mickael Maison <mickael.maison@gmail.com>, Lee Dongjin <dongjin@apache.org>

* KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis (apache#9393)

In this PR, I have addressed the review comments from @chia7712 in apache#9001 which were provided after apache#9001 was merged. The changes are made mainly to KafkaAdminClient:

Improve error message in updateFeatures api when feature name is empty.
Propagate top-level error message in updateFeatures api.
Add an empty-parameter variety for describeFeatures api.
Minor documentation updates to @param and @return to make these resemble other apis.

Reviewers: Chia-Ping Tsai chia7712@gmail.com, Jun Rao junrao@gmail.com

* KAFKA-10271: Performance regression while fetching a key from a single partition (apache#9020)

StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.

Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>

Co-authored-by: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
Co-authored-by: Chia-Ping Tsai <chia7712@gmail.com>
Co-authored-by: Kowshik Prakasam <kprakasam@confluent.io>
Co-authored-by: Dima Reznik <dima.r@fiverr.com>
cadonna pushed a commit to confluentinc/kafka that referenced this pull request Oct 15, 2020
…e partition (apache#9020)

StreamThreadStateStoreProvider excessive loop over calling internalTopologyBuilder.topicGroups(), which is synchronized, thus causing significant performance degradation to the caller, especially when store has many partitions.

Reviewers: John Roesler <vvcephei@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants