Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10271 Performance regression while fetching a key from a single partition #9020

Merged
merged 20 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2fb966f
Performance issue, in case of withPartition parameter exists - do not…
dima5rr Jul 14, 2020
839379d
Find task in-place, avoid synchorinized calling to topicsGroup
dima5rr Jul 15, 2020
a15d28f
Find task in-place, avoid synchorinized calling to topicsGroup
dima5rr Jul 15, 2020
c180275
Find task in-place, avoid synchorinized calling to topicsGroup
dima5rr Jul 15, 2020
4209ce2
Find task in-place, avoid synchorinized calling to topicsGroup
dima5rr Jul 15, 2020
f5f2e46
remove extra loops over all stores of all providers as a sanity check…
dima5rr Jul 15, 2020
b7d38c6
remove extra loops over all stores of all providers as a sanity check…
dima5rr Jul 15, 2020
89ac008
refactor if/else flow to eliminate early-return
dima5rr Jul 16, 2020
158db40
concise if/else flow into functional
dima5rr Jul 16, 2020
b22417f
optimize performance - avoid creating intermediate active tasks map
dima5rr Jul 17, 2020
da8554c
optimize performance - avoid creating intermediate active tasks map
dima5rr Jul 17, 2020
fca67e4
[TESTS] remove expensive double-iteration from state provider
dima5rr Jul 17, 2020
0fb7fe2
Merge branch 'trunk' into KAFKA-10271
dima5rr Aug 16, 2020
9838391
KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…
albert02lowis Aug 15, 2020
5d7c37f
Merge remote-tracking branch 'upstream/trunk' into KAFKA-10271
dima5rr Oct 5, 2020
73def50
fix test to throw exception on store request
dima5rr Oct 5, 2020
f4f9355
Merge remote-tracking branch 'upstream/trunk' into KAFKA-10271
dima5rr Oct 7, 2020
702cc06
waiting state store readiness by probing non-existing value
dima5rr Oct 7, 2020
274617c
Merge remote-tracking branch 'upstream/trunk' into KAFKA-10271
dima5rr Oct 7, 2020
049ec22
Merge remote-tracking branch 'upstream/trunk' into KAFKA-10271
dima5rr Oct 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
delegatingStateRestoreListener,
i + 1);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder));
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}

ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;

Expand Down Expand Up @@ -56,25 +55,6 @@ public <T> T getStore(final StoreQueryParameters<T> storeQueryParameters) {
if (!globalStore.isEmpty()) {
return queryableStoreType.create(globalStoreProvider, storeName);
}
final List<T> allStores = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
final List<T> stores = storeProvider.stores(storeQueryParameters);
if (!stores.isEmpty()) {
allStores.addAll(stores);
if (storeQueryParameters.partition() != null) {
break;
}
}
}
if (allStores.isEmpty()) {
if (storeQueryParameters.partition() != null) {
throw new InvalidStateStoreException(
String.format("The specified partition %d for store %s does not exist.",
storeQueryParameters.partition(),
storeName));
}
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders, storeQueryParameters),
storeName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,53 @@
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

public class StreamThreadStateStoreProvider {

private final StreamThread streamThread;
private final InternalTopologyBuilder internalTopologyBuilder;

public StreamThreadStateStoreProvider(final StreamThread streamThread,
final InternalTopologyBuilder internalTopologyBuilder) {
public StreamThreadStateStoreProvider(final StreamThread streamThread) {
this.streamThread = streamThread;
this.internalTopologyBuilder = internalTopologyBuilder;
}

@SuppressWarnings("unchecked")
public <T> List<T> stores(final StoreQueryParameters storeQueryParams) {
final String storeName = storeQueryParams.storeName();
final QueryableStoreType<T> queryableStoreType = storeQueryParams.queryableStoreType();
final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition());
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
final StreamThread.State state = streamThread.state();
if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
final Map<TaskId, ? extends Task> tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap();
final List<T> stores = new ArrayList<>();
if (keyTaskId != null) {
final Task task = tasks.get(keyTaskId);
if (task == null) {
return Collections.emptyList();
}
final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
if (store != null) {
return Collections.singletonList(store);
}
final Collection<Task> tasks = storeQueryParams.staleStoresEnabled() ?
streamThread.allTasks().values() : streamThread.activeTasks();

if (storeQueryParams.partition() != null) {
return findStreamTask(tasks, storeName, storeQueryParams.partition()).
map(streamTask ->
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
map(Collections::singletonList).
orElse(Collections.emptyList());
} else {
for (final Task streamTask : tasks.values()) {
final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
if (store != null) {
stores.add(store);
}
}
return tasks.stream().
map(streamTask ->
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())).
filter(Objects::nonNull).
collect(Collectors.toList());
}
return stores;
} else {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
state + ", not RUNNING" +
Expand Down Expand Up @@ -104,19 +95,11 @@ private <T> T validateAndListStores(final StateStore store, final QueryableStore
}
}

private TaskId createKeyTaskId(final String storeName, final Integer partition) {
if (partition == null) {
return null;
}
final List<String> sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
final Set<String> sourceTopicsSet = new HashSet<>(sourceTopics);
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) {
return new TaskId(topicGroup.getKey(), partition);
}
}
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " +
partition + " is not available on this instance");
private Optional<Task> findStreamTask(final Collection<Task> tasks, final String storeName, final int partition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great find, thanks!

return tasks.stream().
filter(streamTask -> streamTask.id().partition == partition &&
streamTask.getStore(storeName) != null &&
storeName.equals(streamTask.getStore(storeName).name())).
findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet
public <T> List<T> stores(final String storeName,
final QueryableStoreType<T> queryableStoreType) {
final List<T> allStores = new ArrayList<>();
for (final StreamThreadStateStoreProvider provider : storeProviders) {
final List<T> stores = provider.stores(storeQueryParameters);
allStores.addAll(stores);
for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
final List<T> stores = storeProvider.stores(storeQueryParameters);
if (!stores.isEmpty()) {
allStores.addAll(stores);
if (storeQueryParameters.partition() != null) {
break;
}
}
}
if (allStores.isEmpty()) {
if (storeQueryParameters.partition() != null) {
throw new InvalidStateStoreException(
String.format("The specified partition %d for store %s does not exist.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a different condition than the one on L65? It seems like the failure is still probably that the store "migrated" instead of "doesn't exist", right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L65 catches on rebalancing, while L60 is parameter validation for incorrect partition case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more about this? If allStores.isEmpty() is empty, it is always possible that the specified store-partition or just store-"null" does not exist in this client. Why they are different failure cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dima5rr , I think Guozhang's question was hidden because the conversation was already "resolved". Do you mind answering this concern?

Copy link
Contributor Author

@dima5rr dima5rr Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang, you're right, this check is ambiguous, it's more likely parameter sanity validation when user explicitly specify a single partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, in that case how about we just encode the partition in the thrown's message so that upon throwing, people can still check if the partition is null or not when debugging?

Otherwise, this PR all LGTM :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang, I am just care that in case of partition is null, the error message is referenced in official FAQ.

https://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point, let's just merge it as is then.

storeQueryParameters.partition(),
storeName));
}
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return allStores;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,34 @@ public void prepareTopology() throws InterruptedException {
rightStream = builder.stream(INPUT_TOPIC_RIGHT);
}

@Test
public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good coverage improvement! Thanks.

STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-no-store-access");
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, Integer> left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer()));
final KStream<String, Integer> right = builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(), Serdes.Integer()));
final CountDownLatch latch = new CountDownLatch(1);

left.join(
right,
(value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100)),
StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("join-store"));

try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING) {
latch.countDown();
}
});

kafkaStreams.start();
latch.await();
assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get("1"));
}
}

@Test
public void testInner() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public void before() {

@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionIfKVStoreDoesntExist() {
storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore()));
storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())).get("1");
}

@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionIfWindowStoreDoesntExist() {
storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore()));
storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis());
}

@Test
Expand All @@ -80,12 +80,12 @@ public void shouldReturnWindowStoreWhenItExists() {

@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() {
storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore()));
storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())).get("1");
}

@Test(expected = InvalidStateStoreException.class)
public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() {
storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore()));
storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis());
}

@Test
Expand All @@ -106,7 +106,7 @@ public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() {
storeProvider.getStore(
StoreQueryParameters
.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())
.withPartition(partition))
.withPartition(partition)).get("1")
);
assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore)));
}
Expand All @@ -123,7 +123,7 @@ public void shouldThrowExceptionWhenWindowStoreWithPartitionDoesntExists() {
storeProvider.getStore(
StoreQueryParameters
.fromNameAndType(windowStore, QueryableStoreTypes.windowStore())
.withPartition(partition))
.withPartition(partition)).fetch("1", System.currentTimeMillis())
);
assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void before() {
tasks.put(new TaskId(0, 1), taskTwo);

threadMock = EasyMock.createNiceMock(StreamThread.class);
provider = new StreamThreadStateStoreProvider(threadMock, internalTopologyBuilder);
provider = new StreamThreadStateStoreProvider(threadMock);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,24 @@ public class WrappingStoreProviderTest {

private WrappingStoreProvider wrappingStoreProvider;

private final int numStateStorePartitions = 2;

@Before
public void before() {
final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false);
final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false);


stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(String.class))
.build());
stubProviderOne.addStore("window", new NoOpWindowStore());
stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(String.class))
.build());
stubProviderTwo.addStore("window", new NoOpWindowStore());
wrappingStoreProvider = new WrappingStoreProvider(
Arrays.asList(stubProviderOne, stubProviderTwo),
StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
);
for (int partition = 0; partition < numStateStorePartitions; partition++) {
stubProviderOne.addStore("kv", partition, Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(String.class))
.build());
stubProviderOne.addStore("window", partition, new NoOpWindowStore());
wrappingStoreProvider = new WrappingStoreProvider(
Arrays.asList(stubProviderOne, stubProviderTwo),
StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())
);
}
}

@Test
Expand All @@ -82,4 +80,20 @@ public void shouldThrowInvalidStoreExceptionIfNoStoreOfTypeFound() {
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore()));
wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.<String, String>keyValueStore());
}

@Test
public void shouldReturnAllStoreWhenQueryWithoutPartition() {
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()));
final List<ReadOnlyKeyValueStore<String, String>> results =
wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
assertEquals(numStateStorePartitions, results.size());
}

@Test
public void shouldReturnSingleStoreWhenQueryWithPartition() {
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(numStateStorePartitions - 1));
final List<ReadOnlyKeyValueStore<String, String>> results =
wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
assertEquals(1, results.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider {
private final int defaultStorePartition = 0;

public StateStoreProviderStub(final boolean throwException) {
super(null, null);
super(null);
this.throwException = throwException;
}

Expand Down