diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index f78023aec8104..83b84dce8094e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -782,7 +782,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, delegatingStateRestoreListener, i + 1); threadState.put(threads[i].getId(), threads[i].state()); - storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder)); + storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 2af5874af341e..8dd1f032cd19f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.StoreQueryParameters; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; @@ -56,25 +55,6 @@ public T getStore(final StoreQueryParameters storeQueryParameters) { if (!globalStore.isEmpty()) { return queryableStoreType.create(globalStoreProvider, storeName); } - final List allStores = new ArrayList<>(); - for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { - final List stores = storeProvider.stores(storeQueryParameters); - if (!stores.isEmpty()) { - allStores.addAll(stores); - if (storeQueryParameters.partition() != null) { - break; - } - } - } - if (allStores.isEmpty()) { - if (storeQueryParameters.partition() != null) { - throw new InvalidStateStoreException( - String.format("The specified partition %d for store %s does not exist.", - storeQueryParameters.partition(), - storeName)); - } - throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); - } return queryableStoreType.create( new WrappingStoreProvider(storeProviders, storeQueryParameters), storeName diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 7cc263a861ebb..d5a175d9e518d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -20,7 +20,6 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.state.QueryableStoreType; @@ -28,54 +27,46 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; public class StreamThreadStateStoreProvider { private final StreamThread streamThread; - private final InternalTopologyBuilder internalTopologyBuilder; - public StreamThreadStateStoreProvider(final StreamThread streamThread, - final InternalTopologyBuilder internalTopologyBuilder) { + public StreamThreadStateStoreProvider(final StreamThread streamThread) { this.streamThread = streamThread; - this.internalTopologyBuilder = internalTopologyBuilder; } @SuppressWarnings("unchecked") public List stores(final StoreQueryParameters storeQueryParams) { final String storeName = storeQueryParams.storeName(); final QueryableStoreType queryableStoreType = storeQueryParams.queryableStoreType(); - final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition()); if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { - final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); - final List stores = new ArrayList<>(); - if (keyTaskId != null) { - final Task task = tasks.get(keyTaskId); - if (task == null) { - return Collections.emptyList(); - } - final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId); - if (store != null) { - return Collections.singletonList(store); - } + final Collection tasks = storeQueryParams.staleStoresEnabled() ? + streamThread.allTasks().values() : streamThread.activeTasks(); + + if (storeQueryParams.partition() != null) { + return findStreamTask(tasks, storeName, storeQueryParams.partition()). + map(streamTask -> + validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + map(Collections::singletonList). + orElse(Collections.emptyList()); } else { - for (final Task streamTask : tasks.values()) { - final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); - if (store != null) { - stores.add(store); - } - } + return tasks.stream(). + map(streamTask -> + validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + filter(Objects::nonNull). + collect(Collectors.toList()); } - return stores; } else { throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + state + ", not RUNNING" + @@ -104,19 +95,11 @@ private T validateAndListStores(final StateStore store, final QueryableStore } } - private TaskId createKeyTaskId(final String storeName, final Integer partition) { - if (partition == null) { - return null; - } - final List sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName); - final Set sourceTopicsSet = new HashSet<>(sourceTopics); - final Map topicGroups = internalTopologyBuilder.topicGroups(); - for (final Map.Entry topicGroup : topicGroups.entrySet()) { - if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) { - return new TaskId(topicGroup.getKey(), partition); - } - } - throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + - partition + " is not available on this instance"); + private Optional findStreamTask(final Collection tasks, final String storeName, final int partition) { + return tasks.stream(). + filter(streamTask -> streamTask.id().partition == partition && + streamTask.getStore(storeName) != null && + storeName.equals(streamTask.getStore(storeName).name())). + findFirst(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index 5c9ae1a4370af..26c5db0e192fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet public List stores(final String storeName, final QueryableStoreType queryableStoreType) { final List allStores = new ArrayList<>(); - for (final StreamThreadStateStoreProvider provider : storeProviders) { - final List stores = provider.stores(storeQueryParameters); - allStores.addAll(stores); + for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { + final List stores = storeProvider.stores(storeQueryParameters); + if (!stores.isEmpty()) { + allStores.addAll(stores); + if (storeQueryParameters.partition() != null) { + break; + } + } } if (allStores.isEmpty()) { + if (storeQueryParameters.partition() != null) { + throw new InvalidStateStoreException( + String.format("The specified partition %d for store %s does not exist.", + storeQueryParameters.partition(), + storeName)); + } throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); } return allStores; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index cd57acb65b72f..d039d98e6995e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -1080,6 +1080,7 @@ private Set keysFromInstance(final KafkaStreams streams) throws Exception streams, StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()) ); + waitForCondition(() -> store.get(-1L) == null, MAX_WAIT_TIME_MS, () -> "State store did not ready: " + storeName); final Set keys = new HashSet<>(); try (final KeyValueIterator it = store.all()) { while (it.hasNext()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java index c519117fabe20..e15788a89bf6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java @@ -108,7 +108,7 @@ public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedExceptio kafkaStreams.start(); latch.await(); - assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore()))); + assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get(1)); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java index 2d047556f9c82..f2ca0c0298afb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java @@ -60,12 +60,12 @@ public void before() { @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfKVStoreDoesntExist() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())).get("1"); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfWindowStoreDoesntExist() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()); } @Test @@ -80,12 +80,12 @@ public void shouldReturnWindowStoreWhenItExists() { @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())).get("1"); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()); } @Test @@ -106,7 +106,7 @@ public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() { storeProvider.getStore( StoreQueryParameters .fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()) - .withPartition(partition)) + .withPartition(partition)).get("1") ); assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore))); } @@ -123,7 +123,7 @@ public void shouldThrowExceptionWhenWindowStoreWithPartitionDoesntExists() { storeProvider.getStore( StoreQueryParameters .fromNameAndType(windowStore, QueryableStoreTypes.windowStore()) - .withPartition(partition)) + .withPartition(partition)).fetch("1", System.currentTimeMillis()) ); assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 0ac170a86f8de..884244dfcbf6e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -170,7 +170,7 @@ public void before() { tasks.put(new TaskId(0, 1), taskTwo); threadMock = EasyMock.createNiceMock(StreamThread.class); - provider = new StreamThreadStateStoreProvider(threadMock, internalTopologyBuilder); + provider = new StreamThreadStateStoreProvider(threadMock); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index ceb3f798d9e1a..189704819c8f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -39,26 +39,24 @@ public class WrappingStoreProviderTest { private WrappingStoreProvider wrappingStoreProvider; + private final int numStateStorePartitions = 2; + @Before public void before() { final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false); final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); - - stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(String.class)) - .build()); - stubProviderOne.addStore("window", new NoOpWindowStore()); - stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(String.class)) - .build()); - stubProviderTwo.addStore("window", new NoOpWindowStore()); - wrappingStoreProvider = new WrappingStoreProvider( - Arrays.asList(stubProviderOne, stubProviderTwo), - StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()) - ); + for (int partition = 0; partition < numStateStorePartitions; partition++) { + stubProviderOne.addStore("kv", partition, Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), + Serdes.serdeFrom(String.class), + Serdes.serdeFrom(String.class)) + .build()); + stubProviderOne.addStore("window", partition, new NoOpWindowStore()); + wrappingStoreProvider = new WrappingStoreProvider( + Arrays.asList(stubProviderOne, stubProviderTwo), + StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()) + ); + } } @Test @@ -82,4 +80,20 @@ public void shouldThrowInvalidStoreExceptionIfNoStoreOfTypeFound() { wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("doesn't exist", QueryableStoreTypes.keyValueStore())); wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.keyValueStore()); } + + @Test + public void shouldReturnAllStoreWhenQueryWithoutPartition() { + wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())); + final List> results = + wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore()); + assertEquals(numStateStorePartitions, results.size()); + } + + @Test + public void shouldReturnSingleStoreWhenQueryWithPartition() { + wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)); + final List> results = + wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore()); + assertEquals(1, results.size()); + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java index bc0e33a3a5699..9d89ae2082ccb 100644 --- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java +++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java @@ -39,7 +39,7 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider { private final int defaultStorePartition = 0; public StateStoreProviderStub(final boolean throwException) { - super(null, null); + super(null); this.throwException = throwException; }