Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOTFIX: undo renaming of public part of Subtopology API #10713

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* A meta representation of a {@link Topology topology}.
* <p>
* The nodes of a topology are grouped into {@link SubtopologyDescription sub-topologies} if they are connected.
* The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected.
* In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
* sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
* {@link Topology#addSource(String, String...) reads} from the same topic.
Expand All @@ -43,7 +43,7 @@ public interface TopologyDescription {
* {@link Topology#connectProcessorAndStateStores(String, String...) state stores}
* (i.e., if multiple processors share the same state).
*/
interface SubtopologyDescription {
interface Subtopology {
/**
* Internally assigned unique ID.
* @return the ID of the sub-topology
Expand All @@ -62,10 +62,10 @@ interface SubtopologyDescription {
* org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
* String, org.apache.kafka.streams.processor.api.ProcessorSupplier) global store}.
* Adding a global store results in adding a source node and one stateful processor node.
* Note, that all added global stores form a single unit (similar to a {@link SubtopologyDescription}) even if different
* Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
* global stores are not connected to each other.
* Furthermore, global stores are available to all processors without connecting them explicitly, and thus global
* stores will never be part of any {@link SubtopologyDescription}.
* stores will never be part of any {@link Subtopology}.
*/
interface GlobalStore {
/**
Expand Down Expand Up @@ -168,7 +168,7 @@ interface Sink extends Node {
* All sub-topologies of the represented topology.
* @return set of all sub-topologies
*/
Set<SubtopologyDescription> subtopologies();
Set<Subtopology> subtopologies();

/**
* All global stores of the represented topology.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1754,7 +1754,7 @@ public int hashCode() {
}
}

public final static class SubtopologyDescription implements org.apache.kafka.streams.TopologyDescription.SubtopologyDescription {
public final static class SubtopologyDescription implements TopologyDescription.Subtopology {
private final int id;
private final Set<TopologyDescription.Node> nodes;

Expand Down Expand Up @@ -1891,10 +1891,10 @@ public int compare(final TopologyDescription.GlobalStore globalStore1,

private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator();

private static class SubtopologyComparator implements Comparator<org.apache.kafka.streams.TopologyDescription.SubtopologyDescription>, Serializable {
private static class SubtopologyComparator implements Comparator<TopologyDescription.Subtopology>, Serializable {
@Override
public int compare(final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology1,
final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology2) {
public int compare(final TopologyDescription.Subtopology subtopology1,
final TopologyDescription.Subtopology subtopology2) {
if (subtopology1.equals(subtopology2)) {
return 0;
}
Expand All @@ -1905,10 +1905,10 @@ public int compare(final org.apache.kafka.streams.TopologyDescription.Subtopolog
private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator();

public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription {
private final TreeSet<SubtopologyDescription> subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
private final TreeSet<TopologyDescription.Subtopology> subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
private final TreeSet<TopologyDescription.GlobalStore> globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR);

public void addSubtopology(final SubtopologyDescription subtopology) {
public void addSubtopology(final TopologyDescription.Subtopology subtopology) {
subtopologies.add(subtopology);
}

Expand All @@ -1917,7 +1917,7 @@ public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) {
}

@Override
public Set<SubtopologyDescription> subtopologies() {
public Set<TopologyDescription.Subtopology> subtopologies() {
return Collections.unmodifiableSet(subtopologies);
}

Expand All @@ -1930,16 +1930,16 @@ public Set<TopologyDescription.GlobalStore> globalStores() {
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Topologies:\n ");
final SubtopologyDescription[] sortedSubtopologies =
subtopologies.descendingSet().toArray(new SubtopologyDescription[0]);
final TopologyDescription.Subtopology[] sortedSubtopologies =
subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]);
final TopologyDescription.GlobalStore[] sortedGlobalStores =
globalStores.descendingSet().toArray(new GlobalStore[0]);
int expectedId = 0;
int subtopologiesIndex = sortedSubtopologies.length - 1;
int globalStoresIndex = sortedGlobalStores.length - 1;
while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
sb.append(" ");
final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex];
final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
if (subtopology.id() == expectedId) {
sb.append(subtopology);
Expand All @@ -1951,7 +1951,7 @@ public String toString() {
expectedId++;
}
while (subtopologiesIndex != -1) {
final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex];
final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
sb.append(" ");
sb.append(subtopology);
subtopologiesIndex--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyDescription.SubtopologyDescription;
import org.apache.kafka.streams.TopologyDescription.Subtopology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.kstream.Consumed;
Expand Down Expand Up @@ -406,7 +406,7 @@ public void shouldEnableSendingOldValuesIfNotMaterializedAlreadyButForcedToMater
}

private void assertTopologyContainsProcessor(final Topology topology, final String processorName) {
for (final SubtopologyDescription subtopology: topology.describe().subtopologies()) {
for (final Subtopology subtopology: topology.describe().subtopologies()) {
for (final TopologyDescription.Node node: subtopology.nodes()) {
if (node.name().equals(processorName)) {
return;
Expand Down