diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index 5d9e1cb61d07e..abeb385573370 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -26,7 +26,7 @@ /** * A meta representation of a {@link Topology topology}. *

- * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected. + * The nodes of a topology are grouped into {@link SubtopologyDescription sub-topologies} if they are connected. * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology * {@link Topology#addSource(String, String...) reads} from the same topic. @@ -43,7 +43,7 @@ public interface TopologyDescription { * {@link Topology#connectProcessorAndStateStores(String, String...) state stores} * (i.e., if multiple processors share the same state). */ - interface Subtopology { + interface SubtopologyDescription { /** * Internally assigned unique ID. * @return the ID of the sub-topology @@ -62,10 +62,10 @@ interface Subtopology { * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String, * String, org.apache.kafka.streams.processor.api.ProcessorSupplier) global store}. * Adding a global store results in adding a source node and one stateful processor node. - * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different + * Note, that all added global stores form a single unit (similar to a {@link SubtopologyDescription}) even if different * global stores are not connected to each other. * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global - * stores will never be part of any {@link Subtopology}. + * stores will never be part of any {@link SubtopologyDescription}. */ interface GlobalStore { /** @@ -168,7 +168,7 @@ interface Sink extends Node { * All sub-topologies of the represented topology. * @return set of all sub-topologies */ - Set subtopologies(); + Set subtopologies(); /** * All global stores of the represented topology. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java index ff6a0c7e260e2..59e7ef4bbc841 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +40,7 @@ public class TaskId implements Comparable { /** The ID of the partition. */ public final int partition; /** The namedTopology that this task belongs to, or null if it does not belong to one */ - private final String namedTopology; + protected final String namedTopology; public TaskId(final int topicGroupId, final int partition) { this(topicGroupId, partition, null); @@ -59,10 +58,6 @@ public TaskId(final int topicGroupId, final int partition, final String namedTop } } - public Optional namedTopology() { - return namedTopology == null ? Optional.empty() : Optional.of(namedTopology); - } - @Override public String toString() { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java index f47db270f611c..c498bd030f9c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; + import org.slf4j.Logger; import java.util.Collections; @@ -34,8 +36,8 @@ public class ChangelogTopics { private final InternalTopicManager internalTopicManager; - private final Map topicGroups; - private final Map> tasksForTopicGroup; + private final Map topicGroups; + private final Map> tasksForTopicGroup; private final Map> changelogPartitionsForStatefulTask = new HashMap<>(); private final Map> preExistingChangelogPartitionsForTask = new HashMap<>(); private final Set preExistingNonSourceTopicBasedChangelogPartitions = new HashSet<>(); @@ -44,8 +46,8 @@ public class ChangelogTopics { private final Logger log; public ChangelogTopics(final InternalTopicManager internalTopicManager, - final Map topicGroups, - final Map> tasksForTopicGroup, + final Map topicGroups, + final Map> tasksForTopicGroup, final String logPrefix) { this.internalTopicManager = internalTopicManager; this.topicGroups = topicGroups; @@ -57,13 +59,13 @@ public ChangelogTopics(final InternalTopicManager internalTopicManager, public void setup() { // add tasks to state change log topic subscribers final Map changelogTopicMetadata = new HashMap<>(); - for (final Map.Entry entry : topicGroups.entrySet()) { - final int topicGroupId = entry.getKey(); + for (final Map.Entry entry : topicGroups.entrySet()) { + final Subtopology subtopology = entry.getKey(); final TopicsInfo topicsInfo = entry.getValue(); - final Set topicGroupTasks = tasksForTopicGroup.get(topicGroupId); + final Set topicGroupTasks = tasksForTopicGroup.get(subtopology); if (topicGroupTasks == null) { - log.debug("No tasks found for topic group {}", topicGroupId); + log.debug("No tasks found for subtopology {}", subtopology); continue; } else if (topicsInfo.stateChangelogTopics.isEmpty()) { continue; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 6eb451aed80e8..60ca59dd5134e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; @@ -129,6 +130,17 @@ public class InternalTopologyBuilder { private StreamsConfig config = null; + // The name of the topology this builder belongs to, or null if none + private final String namedTopology; + + public InternalTopologyBuilder() { + this.namedTopology = null; + } + + public InternalTopologyBuilder(final String namedTopology) { + this.namedTopology = namedTopology; + } + public static class StateStoreFactory { private final StoreBuilder builder; private final Set users = new HashSet<>(); @@ -352,7 +364,6 @@ public synchronized final StreamsConfig getStreamsConfig() { return config; } - public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsConfig config) { Objects.requireNonNull(config, "config can't be null"); @@ -1065,8 +1076,8 @@ public Set allStateStoreName() { * * @return groups of topic names */ - public synchronized Map topicGroups() { - final Map topicGroups = new LinkedHashMap<>(); + public synchronized Map topicGroups() { + final Map topicGroups = new LinkedHashMap<>(); if (nodeGroups == null) { nodeGroups = makeNodeGroups(); @@ -1129,7 +1140,7 @@ public synchronized Map topicGroups() { } } if (!sourceTopics.isEmpty()) { - topicGroups.put(entry.getKey(), new TopicsInfo( + topicGroups.put(new Subtopology(entry.getKey(), namedTopology), new TopicsInfo( Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableMap(repartitionTopics), @@ -1457,7 +1468,7 @@ private void describeSubtopology(final TopologyDescription description, } } - description.addSubtopology(new Subtopology( + description.addSubtopology(new SubtopologyDescription( subtopologyId, new HashSet<>(nodesByName.values()))); } @@ -1743,11 +1754,11 @@ public int hashCode() { } } - public final static class Subtopology implements org.apache.kafka.streams.TopologyDescription.Subtopology { + public final static class SubtopologyDescription implements org.apache.kafka.streams.TopologyDescription.SubtopologyDescription { private final int id; private final Set nodes; - public Subtopology(final int id, final Set nodes) { + public SubtopologyDescription(final int id, final Set nodes) { this.id = id; this.nodes = new TreeSet<>(NODE_COMPARATOR); this.nodes.addAll(nodes); @@ -1792,7 +1803,7 @@ public boolean equals(final Object o) { return false; } - final Subtopology that = (Subtopology) o; + final SubtopologyDescription that = (SubtopologyDescription) o; return id == that.id && nodes.equals(that.nodes); } @@ -1880,10 +1891,10 @@ public int compare(final TopologyDescription.GlobalStore globalStore1, private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator(); - private static class SubtopologyComparator implements Comparator, Serializable { + private static class SubtopologyComparator implements Comparator, Serializable { @Override - public int compare(final TopologyDescription.Subtopology subtopology1, - final TopologyDescription.Subtopology subtopology2) { + public int compare(final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology1, + final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology2) { if (subtopology1.equals(subtopology2)) { return 0; } @@ -1894,10 +1905,10 @@ public int compare(final TopologyDescription.Subtopology subtopology1, private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator(); public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { - private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); + private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); - public void addSubtopology(final TopologyDescription.Subtopology subtopology) { + public void addSubtopology(final SubtopologyDescription subtopology) { subtopologies.add(subtopology); } @@ -1906,7 +1917,7 @@ public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) { } @Override - public Set subtopologies() { + public Set subtopologies() { return Collections.unmodifiableSet(subtopologies); } @@ -1919,8 +1930,8 @@ public Set globalStores() { public String toString() { final StringBuilder sb = new StringBuilder(); sb.append("Topologies:\n "); - final TopologyDescription.Subtopology[] sortedSubtopologies = - subtopologies.descendingSet().toArray(new Subtopology[0]); + final SubtopologyDescription[] sortedSubtopologies = + subtopologies.descendingSet().toArray(new SubtopologyDescription[0]); final TopologyDescription.GlobalStore[] sortedGlobalStores = globalStores.descendingSet().toArray(new GlobalStore[0]); int expectedId = 0; @@ -1928,7 +1939,7 @@ public String toString() { int globalStoresIndex = sortedGlobalStores.length - 1; while (subtopologiesIndex != -1 && globalStoresIndex != -1) { sb.append(" "); - final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex]; final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; if (subtopology.id() == expectedId) { sb.append(subtopology); @@ -1940,7 +1951,7 @@ public String toString() { expectedId++; } while (subtopologiesIndex != -1) { - final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex]; sb.append(" "); sb.append(subtopology); subtopologiesIndex--; @@ -2030,7 +2041,6 @@ private void updateSubscribedTopics(final Set topics, final String logPr setRegexMatchedTopicToStateStore(); } - public synchronized List fullSourceTopicNames() { return maybeDecorateInternalSourceTopics(sourceTopicNames); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGrouper.java index 8dcb8233913a6..69ee09e6113d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGrouper.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +50,11 @@ public class PartitionGrouper { * @param metadata metadata of the consuming cluster * @return The map from generated task ids to the assigned partitions */ - public Map> partitionGroups(final Map> topicGroups, final Cluster metadata) { + public Map> partitionGroups(final Map> topicGroups, final Cluster metadata) { final Map> groups = new HashMap<>(); - for (final Map.Entry> entry : topicGroups.entrySet()) { - final Integer topicGroupId = entry.getKey(); + for (final Map.Entry> entry : topicGroups.entrySet()) { + final Subtopology subtopology = entry.getKey(); final Set topicGroup = entry.getValue(); final int maxNumPartitions = maxNumPartitions(metadata, topicGroup); @@ -66,7 +68,7 @@ public Map> partitionGroups(final Map topicGroups = internalTopologyBuilder.topicGroups(); + final Map topicGroups = internalTopologyBuilder.topicGroups(); final Map repartitionTopicMetadata = computeRepartitionTopicConfig(topicGroups, clusterMetadata); // ensure the co-partitioning topics within the group have the same number of partitions, @@ -90,7 +91,7 @@ public Map topicPartitionsInfo() { return Collections.unmodifiableMap(topicPartitionInfos); } - private Map computeRepartitionTopicConfig(final Map topicGroups, + private Map computeRepartitionTopicConfig(final Map topicGroups, final Cluster clusterMetadata) { final Map repartitionTopicConfigs = new HashMap<>(); @@ -129,7 +130,7 @@ private void checkIfExternalSourceTopicsExist(final TopicsInfo topicsInfo, * Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata */ private void setRepartitionSourceTopicPartitionCount(final Map repartitionTopicMetadata, - final Map topicGroups, + final Map topicGroups, final Cluster clusterMetadata) { boolean partitionCountNeeded; do { @@ -167,7 +168,7 @@ private void setRepartitionSourceTopicPartitionCount(final Map repartitionTopicMetadata, - final Map topicGroups, + final Map topicGroups, final Cluster clusterMetadata, final String repartitionSourceTopic) { Integer partitionCount = null; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2e517f1bcbb69..6c3d6571def24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -37,6 +37,7 @@ import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; @@ -49,6 +50,7 @@ import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTaskId; import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; @@ -369,10 +371,10 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // construct the assignment of tasks to clients - final Map topicGroups = taskManager.builder().topicGroups(); + final Map topicGroups = taskManager.builder().topicGroups(); final Set allSourceTopics = new HashSet<>(); - final Map> sourceTopicsByGroup = new HashMap<>(); - for (final Map.Entry entry : topicGroups.entrySet()) { + final Map> sourceTopicsByGroup = new HashMap<>(); + for (final Map.Entry entry : topicGroups.entrySet()) { allSourceTopics.addAll(entry.getValue().sourceTopics); sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics); } @@ -385,7 +387,6 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr final boolean probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks); - // ---------------- Step Three ---------------- // // construct the global partition assignment per host map @@ -497,7 +498,7 @@ private Map prepareRepartitionTopics(final Cluste * @param fullMetadata the cluster metadata */ private void populateTasksForMaps(final Map taskForPartition, - final Map> tasksForTopicGroup, + final Map> tasksForTopicGroup, final Set allSourceTopics, final Map> partitionsForTask, final Cluster fullMetadata) { @@ -515,7 +516,7 @@ private void populateTasksForMaps(final Map taskForParti } allAssignedPartitions.addAll(partitions); - tasksForTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet<>()).add(id); + tasksForTopicGroup.computeIfAbsent(new Subtopology(id.topicGroupId, NamedTaskId.namedTopology(id)), k -> new HashSet<>()).add(id); } checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata); @@ -554,7 +555,7 @@ private void checkAllPartitions(final Set allSourceTopics, */ private boolean assignTasksToClients(final Cluster fullMetadata, final Set allSourceTopics, - final Map topicGroups, + final Map topicGroups, final Map clientMetadataMap, final Map> partitionsForTask, final Set statefulTasks) { @@ -563,7 +564,7 @@ private boolean assignTasksToClients(final Cluster fullMetadata, } final Map taskForPartition = new HashMap<>(); - final Map> tasksForTopicGroup = new HashMap<>(); + final Map> tasksForTopicGroup = new HashMap<>(); populateTasksForMaps(taskForPartition, tasksForTopicGroup, allSourceTopics, partitionsForTask, fullMetadata); final ChangelogTopics changelogTopics = new ChangelogTopics( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java new file mode 100644 index 0000000000000..2b7392ab5bbb7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Objects; + +public class TopologyMetadata { + //TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683) + + public static class Subtopology { + final int nodeGroupId; + final String namedTopology; + + public Subtopology(final int nodeGroupId, final String namedTopology) { + this.nodeGroupId = nodeGroupId; + this.namedTopology = namedTopology; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Subtopology that = (Subtopology) o; + return nodeGroupId == that.nodeGroupId && + Objects.equals(namedTopology, that.namedTopology); + } + + @Override + public int hashCode() { + return Objects.hash(nodeGroupId, namedTopology); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTaskId.java new file mode 100644 index 0000000000000..82be29cdb9e6f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTaskId.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.streams.processor.TaskId; + +public class NamedTaskId extends TaskId { + public NamedTaskId(final int topicGroupId, final int partition, final String namedTopology) { + super(topicGroupId, partition, namedTopology); + if (namedTopology == null) { + throw new IllegalStateException("NamedTopology is required for a NamedTaskId"); + } + } + + public String namedTopology() { + return namedTopology; + } + + public static String namedTopology(final TaskId taskId) { + if (taskId instanceof NamedTaskId) { + return ((NamedTaskId) taskId).namedTopology(); + } else { + return null; + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 54ec9e84b7cc9..129d29aecfa5c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -64,6 +64,9 @@ import java.util.Properties; import java.util.regex.Pattern; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1; + import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -458,7 +461,7 @@ public void shouldReuseSourceTopicAsChangelogsWithOptimization20() { internalTopologyBuilder.stateStores().get("store").loggingEnabled(), equalTo(false)); assertThat( - internalTopologyBuilder.topicGroups().get(0).nonSourceChangelogTopics().isEmpty(), + internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).nonSourceChangelogTopics().isEmpty(), equalTo(true)); } @@ -486,7 +489,7 @@ public void shouldNotReuseRepartitionTopicAsChangelogs() { equalTo(true) ); assertThat( - internalTopologyBuilder.topicGroups().get(1).stateChangelogTopics.keySet(), + internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_1).stateChangelogTopics.keySet(), equalTo(Collections.singleton("appId-store-changelog")) ); } @@ -509,7 +512,7 @@ public void shouldNotReuseSourceTopicAsChangelogsByDefault() { internalTopologyBuilder.stateStores().get("store").loggingEnabled(), equalTo(true)); assertThat( - internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(), + internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.keySet(), equalTo(Collections.singleton("appId-store-changelog"))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 66c678d2bd17f..8068c4839eec4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.SubtopologyDescription; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -427,8 +428,8 @@ public void singleSourceShouldHaveSingleSubtopology() { final TopologyDescription.Source expectedSourceNode = addSource("source", "topic"); expectedDescription.addSubtopology( - new InternalTopologyBuilder.Subtopology(0, - Collections.singleton(expectedSourceNode))); + new SubtopologyDescription(0, + Collections.singleton(expectedSourceNode))); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -439,8 +440,8 @@ public void singleSourceWithListOfTopicsShouldHaveSingleSubtopology() { final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3"); expectedDescription.addSubtopology( - new InternalTopologyBuilder.Subtopology(0, - Collections.singleton(expectedSourceNode))); + new SubtopologyDescription(0, + Collections.singleton(expectedSourceNode))); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -451,8 +452,8 @@ public void singleSourcePatternShouldHaveSingleSubtopology() { final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]")); expectedDescription.addSubtopology( - new InternalTopologyBuilder.Subtopology(0, - Collections.singleton(expectedSourceNode))); + new SubtopologyDescription(0, + Collections.singleton(expectedSourceNode))); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -462,18 +463,18 @@ public void singleSourcePatternShouldHaveSingleSubtopology() { public void multipleSourcesShouldHaveDistinctSubtopologies() { final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1"); expectedDescription.addSubtopology( - new InternalTopologyBuilder.Subtopology(0, - Collections.singleton(expectedSourceNode1))); + new SubtopologyDescription(0, + Collections.singleton(expectedSourceNode1))); final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2"); expectedDescription.addSubtopology( - new InternalTopologyBuilder.Subtopology(1, - Collections.singleton(expectedSourceNode2))); + new SubtopologyDescription(1, + Collections.singleton(expectedSourceNode2))); final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3"); expectedDescription.addSubtopology( - new InternalTopologyBuilder.Subtopology(2, - Collections.singleton(expectedSourceNode3))); + new SubtopologyDescription(2, + Collections.singleton(expectedSourceNode3))); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -487,7 +488,7 @@ public void sourceAndProcessorShouldHaveSingleSubtopology() { final Set allNodes = new HashSet<>(); allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -503,7 +504,7 @@ public void sourceAndProcessorWithStateShouldHaveSingleSubtopology() { final Set allNodes = new HashSet<>(); allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -520,7 +521,7 @@ public void sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() { final Set allNodes = new HashSet<>(); allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -536,7 +537,7 @@ public void sourceWithMultipleProcessorsShouldHaveSingleSubtopology() { allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode1); allNodes.add(expectedProcessorNode2); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -552,7 +553,7 @@ public void processorWithMultipleSourcesShouldHaveSingleSubtopology() { allNodes.add(expectedSourceNode1); allNodes.add(expectedSourceNode2); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -572,17 +573,17 @@ public void multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() { final Set allNodes1 = new HashSet<>(); allNodes1.add(expectedSourceNode1); allNodes1.add(expectedProcessorNode1); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes1)); final Set allNodes2 = new HashSet<>(); allNodes2.add(expectedSourceNode2); allNodes2.add(expectedProcessorNode2); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2)); + expectedDescription.addSubtopology(new SubtopologyDescription(1, allNodes2)); final Set allNodes3 = new HashSet<>(); allNodes3.add(expectedSourceNode3); allNodes3.add(expectedProcessorNode3); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3)); + expectedDescription.addSubtopology(new SubtopologyDescription(2, allNodes3)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -602,17 +603,17 @@ public void multipleSourcesWithSinksShouldHaveDistinctSubtopologies() { final Set allNodes1 = new HashSet<>(); allNodes1.add(expectedSourceNode1); allNodes1.add(expectedSinkNode1); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes1)); final Set allNodes2 = new HashSet<>(); allNodes2.add(expectedSourceNode2); allNodes2.add(expectedSinkNode2); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2)); + expectedDescription.addSubtopology(new SubtopologyDescription(1, allNodes2)); final Set allNodes3 = new HashSet<>(); allNodes3.add(expectedSourceNode3); allNodes3.add(expectedSinkNode3); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3)); + expectedDescription.addSubtopology(new SubtopologyDescription(2, allNodes3)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -644,7 +645,7 @@ public void processorsWithSameSinkShouldHaveSameSubtopology() { allNodes.add(expectedSourceNode3); allNodes.add(expectedProcessorNode3); allNodes.add(expectedSinkNode); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); @@ -675,7 +676,7 @@ public void processorsWithSharedStateShouldHaveSameSubtopology() { allNodes.add(expectedProcessorNode2); allNodes.add(expectedSourceNode3); allNodes.add(expectedProcessorNode3); - expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new SubtopologyDescription(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); assertThat(topology.describe().hashCode(), equalTo(expectedDescription.hashCode())); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index ca26393bb33a0..c3e32447b7f61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -53,6 +53,8 @@ import java.util.Set; import static java.time.Duration.ofMillis; + +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; @@ -210,8 +212,8 @@ public void shouldEnableLoggingWithCustomConfigOnStreamJoined() { assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true)); assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true)); - assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.size(), equalTo(2)); - for (final InternalTopicConfig config : internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.values()) { + assertThat(internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2)); + for (final InternalTopicConfig config : internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) { assertThat( config.getProperties(Collections.emptyMap(), 0).get("test"), equalTo("property") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 348750a725975..c1814241db6c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.TopologyDescription.SubtopologyDescription; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyTestDriverWrapper; import org.apache.kafka.streams.kstream.Consumed; @@ -405,7 +406,7 @@ public void shouldEnableSendingOldValuesIfNotMaterializedAlreadyButForcedToMater } private void assertTopologyContainsProcessor(final Topology topology, final String processorName) { - for (final TopologyDescription.Subtopology subtopology: topology.describe().subtopologies()) { + for (final SubtopologyDescription subtopology: topology.describe().subtopologies()) { for (final TopologyDescription.Node node: subtopology.nodes()) { if (node.name().equals(processorName)) { return; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java index 06480b3e1cf25..17db61d7c5b13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ChangelogTopicsTest.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; + import org.junit.Test; import java.util.Collections; @@ -28,6 +30,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; + import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; @@ -80,8 +84,8 @@ public class ChangelogTopicsTest { @Test public void shouldNotContainChangelogsForStatelessTasks() { expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet()); - final Map topicGroups = mkMap(mkEntry(0, TOPICS_INFO2)); - final Map> tasksForTopicGroup = mkMap(mkEntry(0, mkSet(TASK_0_0, TASK_0_1, TASK_0_2))); + final Map topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO2)); + final Map> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, mkSet(TASK_0_0, TASK_0_1, TASK_0_2))); replay(internalTopicManager); final ChangelogTopics changelogTopics = @@ -100,9 +104,9 @@ public void shouldNotContainChangelogsForStatelessTasks() { public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() { expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)))) .andStubReturn(mkSet(CHANGELOG_TOPIC_NAME1)); - final Map topicGroups = mkMap(mkEntry(0, TOPICS_INFO1)); + final Map topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1)); final Set tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); - final Map> tasksForTopicGroup = mkMap(mkEntry(0, tasks)); + final Map> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks)); replay(internalTopicManager); final ChangelogTopics changelogTopics = @@ -122,9 +126,9 @@ public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() { expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)))) .andStubReturn(Collections.emptySet()); - final Map topicGroups = mkMap(mkEntry(0, TOPICS_INFO1)); + final Map topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1)); final Set tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); - final Map> tasksForTopicGroup = mkMap(mkEntry(0, tasks)); + final Map> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks)); replay(internalTopicManager); final ChangelogTopics changelogTopics = @@ -149,9 +153,9 @@ public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() { @Test public void shouldOnlyContainPreExistingSourceBasedChangelogs() { expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet()); - final Map topicGroups = mkMap(mkEntry(0, TOPICS_INFO3)); + final Map topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO3)); final Set tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); - final Map> tasksForTopicGroup = mkMap(mkEntry(0, tasks)); + final Map> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks)); replay(internalTopicManager); final ChangelogTopics changelogTopics = @@ -176,9 +180,9 @@ public void shouldOnlyContainPreExistingSourceBasedChangelogs() { public void shouldContainBothTypesOfPreExistingChangelogs() { expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG)))) .andStubReturn(Collections.emptySet()); - final Map topicGroups = mkMap(mkEntry(0, TOPICS_INFO4)); + final Map topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO4)); final Set tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); - final Map> tasksForTopicGroup = mkMap(mkEntry(0, tasks)); + final Map> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks)); replay(internalTopicManager); final ChangelogTopics changelogTopics = diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 2d5cde58983d8..ef5bebb5d8201 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.SubtopologyDescription; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -53,6 +55,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_2; + import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -570,12 +576,12 @@ public void testTopicGroups() { builder.addProcessor("processor-3", new MockApiProcessorSupplier<>(), "source-3", "source-4"); - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); - final Map expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap())); - expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap())); - expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap())); + final Map expectedTopicGroups = new HashMap<>(); + expectedTopicGroups.put(SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap())); + expectedTopicGroups.put(SUBTOPOLOGY_1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap())); + expectedTopicGroups.put(SUBTOPOLOGY_2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap())); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -607,21 +613,21 @@ public void testTopicGroupsByStateStore() { builder.connectProcessorAndStateStores("processor-5", "store-3"); builder.buildTopology(); - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); - final Map expectedTopicGroups = new HashMap<>(); + final Map expectedTopicGroups = new HashMap<>(); final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1"); final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2"); final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3"); - expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo( + expectedTopicGroups.put(SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo( Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.emptyMap(), Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap())))); - expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo( + expectedTopicGroups.put(SUBTOPOLOGY_1, new InternalTopologyBuilder.TopicsInfo( Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap())))); - expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo( + expectedTopicGroups.put(SUBTOPOLOGY_2, new InternalTopologyBuilder.TopicsInfo( Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap())))); @@ -837,7 +843,7 @@ public void shouldAddInternalTopicConfigForWindowStores() { "processor" ); builder.buildTopology(); - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog"); final Map properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000); @@ -862,7 +868,7 @@ public void shouldAddInternalTopicConfigForNonWindowStores() { builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source"); builder.addStateStore(storeBuilder, "processor"); builder.buildTopology(); - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog"); final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); @@ -903,11 +909,11 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() { builder.addSubscribedTopicsFromMetadata(updatedTopics, null); builder.setApplicationId("test-id"); - final Map topicGroups = builder.topicGroups(); - assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo")); - assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A")); - assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B")); - assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3")); + final Map topicGroups = builder.topicGroups(); + assertTrue(topicGroups.get(SUBTOPOLOGY_0).sourceTopics.contains("topic-foo")); + assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-A")); + assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-B")); + assertTrue(topicGroups.get(SUBTOPOLOGY_2).sourceTopics.contains("topic-3")); } @Test @@ -943,7 +949,7 @@ public void shouldSortProcessorNodesCorrectly() { assertEquals(1, builder.describe().subtopologies().size()); - final Iterator iterator = ((InternalTopologyBuilder.Subtopology) builder.describe().subtopologies().iterator().next()).nodesInOrder(); + final Iterator iterator = ((SubtopologyDescription) builder.describe().subtopologies().iterator().next()).nodesInOrder(); assertTrue(iterator.hasNext()); InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode) iterator.next(); @@ -1093,9 +1099,9 @@ public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesArePr builder.addInternalTopic("topic-1z", new InternalTopicProperties(numberOfPartitions)); builder.addSource(null, "source-1", null, null, null, "topic-1z"); - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); - final Map repartitionSourceTopics = topicGroups.get(0).repartitionSourceTopics; + final Map repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics; assertEquals( repartitionSourceTopics.get("Z-topic-1z"), @@ -1114,9 +1120,9 @@ public void shouldHandleWhenTopicPropertiesNumberOfPartitionsIsNull() { builder.addInternalTopic("topic-1t", InternalTopicProperties.empty()); builder.addSource(null, "source-1", null, null, null, "topic-1t"); - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); - final Map repartitionSourceTopics = topicGroups.get(0).repartitionSourceTopics; + final Map repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics; assertEquals( repartitionSourceTopics.get("T-topic-1t"), @@ -1133,9 +1139,9 @@ public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesAreNo builder.addInternalTopic("topic-1y", InternalTopicProperties.empty()); builder.addSource(null, "source-1", null, null, null, "topic-1y"); - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); - final Map repartitionSourceTopics = topicGroups.get(0).repartitionSourceTopics; + final Map repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics; assertEquals( repartitionSourceTopics.get("Y-topic-1y"), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGrouperTest.java index 669f710ab97c4..93d2dced7b25f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGrouperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGrouperTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; + import org.junit.Test; import java.util.Arrays; @@ -31,6 +33,9 @@ import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -55,18 +60,16 @@ public class PartitionGrouperTest { public void shouldComputeGroupingForTwoGroups() { final PartitionGrouper grouper = new PartitionGrouper(); final Map> expectedPartitionsForTask = new HashMap<>(); - final Map> topicGroups = new HashMap<>(); - - int topicGroupId = 0; + final Map> topicGroups = new HashMap<>(); - topicGroups.put(topicGroupId, mkSet("topic1")); - expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic1", 0))); - expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic1", 1))); - expectedPartitionsForTask.put(new TaskId(topicGroupId, 2), mkSet(new TopicPartition("topic1", 2))); + topicGroups.put(SUBTOPOLOGY_0, mkSet("topic1")); + expectedPartitionsForTask.put(new TaskId(SUBTOPOLOGY_0.nodeGroupId, 0, SUBTOPOLOGY_0.namedTopology), mkSet(new TopicPartition("topic1", 0))); + expectedPartitionsForTask.put(new TaskId(SUBTOPOLOGY_0.nodeGroupId, 1, SUBTOPOLOGY_0.namedTopology), mkSet(new TopicPartition("topic1", 1))); + expectedPartitionsForTask.put(new TaskId(SUBTOPOLOGY_0.nodeGroupId, 2, SUBTOPOLOGY_0.namedTopology), mkSet(new TopicPartition("topic1", 2))); - topicGroups.put(++topicGroupId, mkSet("topic2")); - expectedPartitionsForTask.put(new TaskId(topicGroupId, 0), mkSet(new TopicPartition("topic2", 0))); - expectedPartitionsForTask.put(new TaskId(topicGroupId, 1), mkSet(new TopicPartition("topic2", 1))); + topicGroups.put(SUBTOPOLOGY_1, mkSet("topic2")); + expectedPartitionsForTask.put(new TaskId(SUBTOPOLOGY_1.nodeGroupId, 0, SUBTOPOLOGY_1.namedTopology), mkSet(new TopicPartition("topic2", 0))); + expectedPartitionsForTask.put(new TaskId(SUBTOPOLOGY_1.nodeGroupId, 1, SUBTOPOLOGY_1.namedTopology), mkSet(new TopicPartition("topic2", 1))); assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata)); } @@ -75,19 +78,17 @@ public void shouldComputeGroupingForTwoGroups() { public void shouldComputeGroupingForSingleGroupWithMultipleTopics() { final PartitionGrouper grouper = new PartitionGrouper(); final Map> expectedPartitionsForTask = new HashMap<>(); - final Map> topicGroups = new HashMap<>(); + final Map> topicGroups = new HashMap<>(); - final int topicGroupId = 0; - - topicGroups.put(topicGroupId, mkSet("topic1", "topic2")); + topicGroups.put(SUBTOPOLOGY_0, mkSet("topic1", "topic2")); expectedPartitionsForTask.put( - new TaskId(topicGroupId, 0), + new TaskId(SUBTOPOLOGY_0.nodeGroupId, 0, SUBTOPOLOGY_0.namedTopology), mkSet(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))); expectedPartitionsForTask.put( - new TaskId(topicGroupId, 1), + new TaskId(SUBTOPOLOGY_0.nodeGroupId, 1, SUBTOPOLOGY_0.namedTopology), mkSet(new TopicPartition("topic1", 1), new TopicPartition("topic2", 1))); expectedPartitionsForTask.put( - new TaskId(topicGroupId, 2), + new TaskId(SUBTOPOLOGY_0.nodeGroupId, 2, SUBTOPOLOGY_0.namedTopology), mkSet(new TopicPartition("topic1", 2))); assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata)); @@ -96,11 +97,9 @@ public void shouldComputeGroupingForSingleGroupWithMultipleTopics() { @Test public void shouldNotCreateAnyTasksBecauseOneTopicHasUnknownPartitions() { final PartitionGrouper grouper = new PartitionGrouper(); - final Map> topicGroups = new HashMap<>(); - - final int topicGroupId = 0; + final Map> topicGroups = new HashMap<>(); - topicGroups.put(topicGroupId, mkSet("topic1", "unknownTopic", "topic2")); + topicGroups.put(SUBTOPOLOGY_0, mkSet("topic1", "unknownTopic", "topic2")); assertThrows(RuntimeException.class, () -> grouper.partitionGroups(topicGroups, metadata)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java index 61a3716fef43a..30a4641e3d2a9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java @@ -38,6 +38,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1; + import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -96,7 +99,7 @@ public class RepartitionTopicsTest { @Test public void shouldSetupRepartitionTopics() { expect(internalTopologyBuilder.topicGroups()) - .andReturn(mkMap(mkEntry(0, TOPICS_INFO1), mkEntry(1, TOPICS_INFO2))); + .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2))); final Set coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2); final Set coPartitionGroup2 = mkSet(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_NAME2); final List> coPartitionGroups = Arrays.asList(coPartitionGroup1, coPartitionGroup2); @@ -135,7 +138,7 @@ public void shouldSetupRepartitionTopics() { @Test public void shouldThrowMissingSourceTopicException() { expect(internalTopologyBuilder.topicGroups()) - .andReturn(mkMap(mkEntry(0, TOPICS_INFO1), mkEntry(1, TOPICS_INFO2))); + .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2))); expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList()); copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata)); expect(internalTopicManager.makeReady( @@ -162,8 +165,8 @@ public void shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedFo new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT, TOPIC_CONFIG5); expect(internalTopologyBuilder.topicGroups()) .andReturn(mkMap( - mkEntry(0, TOPICS_INFO1), - mkEntry(1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) + mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), + mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) )); expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList()); copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata)); @@ -200,8 +203,8 @@ public void shouldThrowTaskAssignmentExceptionIfSourceTopicHasNoPartitionCount() ); expect(internalTopologyBuilder.topicGroups()) .andReturn(mkMap( - mkEntry(0, topicsInfo), - mkEntry(1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) + mkEntry(SUBTOPOLOGY_0, topicsInfo), + mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) )); expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList()); copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata)); @@ -243,8 +246,8 @@ public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTop ); expect(internalTopologyBuilder.topicGroups()) .andReturn(mkMap( - mkEntry(0, topicsInfo), - mkEntry(1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) + mkEntry(SUBTOPOLOGY_0, topicsInfo), + mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) )); expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList()); copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata)); @@ -297,8 +300,8 @@ public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartiti ); expect(internalTopologyBuilder.topicGroups()) .andReturn(mkMap( - mkEntry(0, topicsInfo), - mkEntry(1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) + mkEntry(SUBTOPOLOGY_0, topicsInfo), + mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount)) )); expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList()); copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata)); @@ -345,7 +348,7 @@ public void shouldNotSetupRepartitionTopicsWhenTopologyDoesNotContainAnyRepartit Collections.emptyMap() ); expect(internalTopologyBuilder.topicGroups()) - .andReturn(mkMap(mkEntry(0, topicsInfo))); + .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo))); expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptySet()); expect(internalTopicManager.makeReady(Collections.emptyMap())).andReturn(Collections.emptySet()); setupCluster(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 7e58a8ae61d0a..3ef892a7c803d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; @@ -811,7 +812,7 @@ public void testAssignWithStates() { assertEquals(new HashSet<>(tasks), allTasks); // check tasks for state topics - final Map topicGroups = builder.topicGroups(); + final Map topicGroups = builder.topicGroups(); assertEquals(mkSet(TASK_0_0, TASK_0_1, TASK_0_2), tasksForState("store1", tasks, topicGroups)); assertEquals(mkSet(TASK_1_0, TASK_1_1, TASK_1_2), tasksForState("store2", tasks, topicGroups)); @@ -820,16 +821,16 @@ public void testAssignWithStates() { private static Set tasksForState(final String storeName, final List tasks, - final Map topicGroups) { + final Map topicGroups) { final String changelogTopic = ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, storeName); final Set ids = new HashSet<>(); - for (final Map.Entry entry : topicGroups.entrySet()) { + for (final Map.Entry entry : topicGroups.entrySet()) { final Set stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet(); if (stateChangelogTopics.contains(changelogTopic)) { for (final TaskId id : tasks) { - if (id.topicGroupId == entry.getKey()) { + if (id.topicGroupId == entry.getKey().nodeGroupId) { ids.add(id); } } @@ -2024,13 +2025,13 @@ public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount( } private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder { - private Map corruptedTopicGroups; + private Map corruptedTopicGroups; @Override - public synchronized Map topicGroups() { + public synchronized Map topicGroups() { if (corruptedTopicGroups == null) { corruptedTopicGroups = new HashMap<>(); - for (final Map.Entry topicGroupEntry : super.topicGroups().entrySet()) { + for (final Map.Entry topicGroupEntry : super.topicGroups().entrySet()) { final TopicsInfo originalInfo = topicGroupEntry.getValue(); corruptedTopicGroups.put( topicGroupEntry.getKey(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 58567529346b7..1afa76f22a6bf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -25,6 +25,8 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; + import org.easymock.EasyMock; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -90,6 +92,10 @@ public final class AssignmentTestUtils { public static final TaskId NAMED_TASK_1_0 = new TaskId(1, 0, "topology1"); public static final TaskId NAMED_TASK_1_1 = new TaskId(1, 1, "topology1"); + public static final Subtopology SUBTOPOLOGY_0 = new Subtopology(0, null); + public static final Subtopology SUBTOPOLOGY_1 = new Subtopology(1, null); + public static final Subtopology SUBTOPOLOGY_2 = new Subtopology(2, null); + public static final Set EMPTY_TASKS = emptySet(); public static final Map EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();