Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12648: MINOR - Add TopologyMetadata.Subtopology class for subtopology metadata #10676

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* A meta representation of a {@link Topology topology}.
* <p>
* The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected.
* The nodes of a topology are grouped into {@link SubtopologyDescription sub-topologies} if they are connected.
* In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
* sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
* {@link Topology#addSource(String, String...) reads} from the same topic.
Expand All @@ -43,7 +43,7 @@ public interface TopologyDescription {
* {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
* (i.e., if multiple processors share the same state).
*/
interface Subtopology {
interface SubtopologyDescription {
/**
* Internally assigned unique ID.
* @return the ID of the sub-topology
Expand All @@ -62,10 +62,10 @@ interface Subtopology {
* org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
* String, org.apache.kafka.streams.processor.api.ProcessorSupplier) global store}.
* Adding a global store results in adding a source node and one stateful processor node.
* Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
* Note, that all added global stores form a single unit (similar to a {@link SubtopologyDescription}) even if different
* global stores are not connected to each other.
* Furthermore, global stores are available to all processors without connecting them explicitly, and thus global
* stores will never be part of any {@link Subtopology}.
* stores will never be part of any {@link SubtopologyDescription}.
*/
interface GlobalStore {
/**
Expand Down Expand Up @@ -168,7 +168,7 @@ interface Sink extends Node {
* All sub-topologies of the represented topology.
* @return set of all sub-topologies
*/
Set<Subtopology> subtopologies();
Set<SubtopologyDescription> subtopologies();

/**
* All global stores of the represented topology.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,7 +40,7 @@ public class TaskId implements Comparable<TaskId> {
/** The ID of the partition. */
public final int partition;
/** The namedTopology that this task belongs to, or null if it does not belong to one */
private final String namedTopology;
protected final String namedTopology;

public TaskId(final int topicGroupId, final int partition) {
this(topicGroupId, partition, null);
Expand All @@ -59,10 +58,6 @@ public TaskId(final int topicGroupId, final int partition, final String namedTop
}
}

public Optional<String> namedTopology() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have this could we add something like it to TaskMetadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, TaskMetadata is also a public class, right? I think we'll need to do something similar in that case, ie add an internal NamedTaskMetadata class that exposes the namedTopology and then you can access it via something like NamedTaskMetadata.extractNamedTopology(TaskMetadata)

It's unfortunate that TaskMetadata has the taskId field as a String, rather than as an actual TaskId object -- not sure what the motivation there was 😕 I might try to do a very small KIP to fix that, and then we won't have to bother with this at all (you can just use the internal NamedTaskId class/method to access the namedTopology

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be defiantly better to just fix the taskId in taskMetadata :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, here we are: KIP-740: Use TaskId instead of String for the taskId field in TaskMetadata

That was probably the fastest/easiest KIP I've ever written

return namedTopology == null ? Optional.empty() : Optional.of(namedTopology);
}

@Override
public String toString() {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;

import org.slf4j.Logger;

import java.util.Collections;
Expand All @@ -34,8 +36,8 @@
public class ChangelogTopics {

private final InternalTopicManager internalTopicManager;
private final Map<Integer, TopicsInfo> topicGroups;
private final Map<Integer, Set<TaskId>> tasksForTopicGroup;
private final Map<Subtopology, TopicsInfo> topicGroups;
private final Map<Subtopology, Set<TaskId>> tasksForTopicGroup;
private final Map<TaskId, Set<TopicPartition>> changelogPartitionsForStatefulTask = new HashMap<>();
private final Map<TaskId, Set<TopicPartition>> preExistingChangelogPartitionsForTask = new HashMap<>();
private final Set<TopicPartition> preExistingNonSourceTopicBasedChangelogPartitions = new HashSet<>();
Expand All @@ -44,8 +46,8 @@ public class ChangelogTopics {
private final Logger log;

public ChangelogTopics(final InternalTopicManager internalTopicManager,
final Map<Integer, TopicsInfo> topicGroups,
final Map<Integer, Set<TaskId>> tasksForTopicGroup,
final Map<Subtopology, TopicsInfo> topicGroups,
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
final String logPrefix) {
this.internalTopicManager = internalTopicManager;
this.topicGroups = topicGroups;
Expand All @@ -57,13 +59,13 @@ public ChangelogTopics(final InternalTopicManager internalTopicManager,
public void setup() {
// add tasks to state change log topic subscribers
final Map<String, InternalTopicConfig> changelogTopicMetadata = new HashMap<>();
for (final Map.Entry<Integer, TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
for (final Map.Entry<Subtopology, TopicsInfo> entry : topicGroups.entrySet()) {
final Subtopology subtopology = entry.getKey();
final TopicsInfo topicsInfo = entry.getValue();

final Set<TaskId> topicGroupTasks = tasksForTopicGroup.get(topicGroupId);
final Set<TaskId> topicGroupTasks = tasksForTopicGroup.get(subtopology);
if (topicGroupTasks == null) {
log.debug("No tasks found for topic group {}", topicGroupId);
log.debug("No tasks found for subtopology {}", subtopology);
continue;
} else if (topicsInfo.stateChangelogTopics.isEmpty()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
Expand Down Expand Up @@ -129,6 +130,17 @@ public class InternalTopologyBuilder {

private StreamsConfig config = null;

// The name of the topology this builder belongs to, or null if none
private final String namedTopology;

public InternalTopologyBuilder() {
this.namedTopology = null;
}

public InternalTopologyBuilder(final String namedTopology) {
this.namedTopology = namedTopology;
}

public static class StateStoreFactory<S extends StateStore> {
private final StoreBuilder<S> builder;
private final Set<String> users = new HashSet<>();
Expand Down Expand Up @@ -352,7 +364,6 @@ public synchronized final StreamsConfig getStreamsConfig() {
return config;
}


public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsConfig config) {
Objects.requireNonNull(config, "config can't be null");

Expand Down Expand Up @@ -1065,8 +1076,8 @@ public Set<String> allStateStoreName() {
*
* @return groups of topic names
*/
public synchronized Map<Integer, TopicsInfo> topicGroups() {
final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
public synchronized Map<Subtopology, TopicsInfo> topicGroups() {
final Map<Subtopology, TopicsInfo> topicGroups = new LinkedHashMap<>();

if (nodeGroups == null) {
nodeGroups = makeNodeGroups();
Expand Down Expand Up @@ -1129,7 +1140,7 @@ public synchronized Map<Integer, TopicsInfo> topicGroups() {
}
}
if (!sourceTopics.isEmpty()) {
topicGroups.put(entry.getKey(), new TopicsInfo(
topicGroups.put(new Subtopology(entry.getKey(), namedTopology), new TopicsInfo(
Collections.unmodifiableSet(sinkTopics),
Collections.unmodifiableSet(sourceTopics),
Collections.unmodifiableMap(repartitionTopics),
Expand Down Expand Up @@ -1457,7 +1468,7 @@ private void describeSubtopology(final TopologyDescription description,
}
}

description.addSubtopology(new Subtopology(
description.addSubtopology(new SubtopologyDescription(
subtopologyId,
new HashSet<>(nodesByName.values())));
}
Expand Down Expand Up @@ -1743,11 +1754,11 @@ public int hashCode() {
}
}

public final static class Subtopology implements org.apache.kafka.streams.TopologyDescription.Subtopology {
public final static class SubtopologyDescription implements org.apache.kafka.streams.TopologyDescription.SubtopologyDescription {
private final int id;
private final Set<TopologyDescription.Node> nodes;

public Subtopology(final int id, final Set<TopologyDescription.Node> nodes) {
public SubtopologyDescription(final int id, final Set<TopologyDescription.Node> nodes) {
this.id = id;
this.nodes = new TreeSet<>(NODE_COMPARATOR);
this.nodes.addAll(nodes);
Expand Down Expand Up @@ -1792,7 +1803,7 @@ public boolean equals(final Object o) {
return false;
}

final Subtopology that = (Subtopology) o;
final SubtopologyDescription that = (SubtopologyDescription) o;
return id == that.id
&& nodes.equals(that.nodes);
}
Expand Down Expand Up @@ -1880,10 +1891,10 @@ public int compare(final TopologyDescription.GlobalStore globalStore1,

private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator();

private static class SubtopologyComparator implements Comparator<TopologyDescription.Subtopology>, Serializable {
private static class SubtopologyComparator implements Comparator<org.apache.kafka.streams.TopologyDescription.SubtopologyDescription>, Serializable {
@Override
public int compare(final TopologyDescription.Subtopology subtopology1,
final TopologyDescription.Subtopology subtopology2) {
public int compare(final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology1,
final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology2) {
if (subtopology1.equals(subtopology2)) {
return 0;
}
Expand All @@ -1894,10 +1905,10 @@ public int compare(final TopologyDescription.Subtopology subtopology1,
private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator();

public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
private final TreeSet<TopologyDescription.Subtopology> subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
private final TreeSet<SubtopologyDescription> subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
private final TreeSet<TopologyDescription.GlobalStore> globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR);

public void addSubtopology(final TopologyDescription.Subtopology subtopology) {
public void addSubtopology(final SubtopologyDescription subtopology) {
subtopologies.add(subtopology);
}

Expand All @@ -1906,7 +1917,7 @@ public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) {
}

@Override
public Set<TopologyDescription.Subtopology> subtopologies() {
public Set<SubtopologyDescription> subtopologies() {
return Collections.unmodifiableSet(subtopologies);
}

Expand All @@ -1919,16 +1930,16 @@ public Set<TopologyDescription.GlobalStore> globalStores() {
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Topologies:\n ");
final TopologyDescription.Subtopology[] sortedSubtopologies =
subtopologies.descendingSet().toArray(new Subtopology[0]);
final SubtopologyDescription[] sortedSubtopologies =
subtopologies.descendingSet().toArray(new SubtopologyDescription[0]);
final TopologyDescription.GlobalStore[] sortedGlobalStores =
globalStores.descendingSet().toArray(new GlobalStore[0]);
int expectedId = 0;
int subtopologiesIndex = sortedSubtopologies.length - 1;
int globalStoresIndex = sortedGlobalStores.length - 1;
while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
sb.append(" ");
final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex];
final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
if (subtopology.id() == expectedId) {
sb.append(subtopology);
Expand All @@ -1940,7 +1951,7 @@ public String toString() {
expectedId++;
}
while (subtopologiesIndex != -1) {
final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex];
sb.append(" ");
sb.append(subtopology);
subtopologiesIndex--;
Expand Down Expand Up @@ -2030,7 +2041,6 @@ private void updateSubscribedTopics(final Set<String> topics, final String logPr
setRegexMatchedTopicToStateStore();
}


public synchronized List<String> fullSourceTopicNames() {
return maybeDecorateInternalSourceTopics(sourceTopicNames);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,11 +50,11 @@ public class PartitionGrouper {
* @param metadata metadata of the consuming cluster
* @return The map from generated task ids to the assigned partitions
*/
public Map<TaskId, Set<TopicPartition>> partitionGroups(final Map<Integer, Set<String>> topicGroups, final Cluster metadata) {
public Map<TaskId, Set<TopicPartition>> partitionGroups(final Map<Subtopology, Set<String>> topicGroups, final Cluster metadata) {
final Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();

for (final Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
final Integer topicGroupId = entry.getKey();
for (final Map.Entry<Subtopology, Set<String>> entry : topicGroups.entrySet()) {
final Subtopology subtopology = entry.getKey();
final Set<String> topicGroup = entry.getValue();

final int maxNumPartitions = maxNumPartitions(metadata, topicGroup);
Expand All @@ -66,7 +68,7 @@ public Map<TaskId, Set<TopicPartition>> partitionGroups(final Map<Integer, Set<S
group.add(new TopicPartition(topic, partitionId));
}
}
groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group));
groups.put(new TaskId(subtopology.nodeGroupId, partitionId, subtopology.namedTopology), Collections.unmodifiableSet(group));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.slf4j.Logger;

Expand Down Expand Up @@ -59,7 +60,7 @@ public RepartitionTopics(final InternalTopologyBuilder internalTopologyBuilder,
}

public void setup() {
final Map<Integer, TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
final Map<Subtopology, TopicsInfo> topicGroups = internalTopologyBuilder.topicGroups();
final Map<String, InternalTopicConfig> repartitionTopicMetadata = computeRepartitionTopicConfig(topicGroups, clusterMetadata);

// ensure the co-partitioning topics within the group have the same number of partitions,
Expand Down Expand Up @@ -90,7 +91,7 @@ public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
return Collections.unmodifiableMap(topicPartitionInfos);
}

private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Integer, TopicsInfo> topicGroups,
private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
final Cluster clusterMetadata) {

final Map<String, InternalTopicConfig> repartitionTopicConfigs = new HashMap<>();
Expand Down Expand Up @@ -129,7 +130,7 @@ private void checkIfExternalSourceTopicsExist(final TopicsInfo topicsInfo,
* Computes the number of partitions and sets it for each repartition topic in repartitionTopicMetadata
*/
private void setRepartitionSourceTopicPartitionCount(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
final Map<Integer, TopicsInfo> topicGroups,
final Map<Subtopology, TopicsInfo> topicGroups,
final Cluster clusterMetadata) {
boolean partitionCountNeeded;
do {
Expand Down Expand Up @@ -167,7 +168,7 @@ private void setRepartitionSourceTopicPartitionCount(final Map<String, InternalT
}

private Integer computePartitionCount(final Map<String, InternalTopicConfig> repartitionTopicMetadata,
final Map<Integer, TopicsInfo> topicGroups,
final Map<Subtopology, TopicsInfo> topicGroups,
final Cluster clusterMetadata,
final String repartitionSourceTopic) {
Integer partitionCount = null;
Expand Down
Loading