From 90045ea8dd06faa601d85c5f8b635e6e8f0eb276 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 17 May 2021 09:19:54 -0700 Subject: [PATCH 1/2] undo renaming --- .../kafka/streams/TopologyDescription.java | 10 +++---- .../internals/InternalTopologyBuilder.java | 30 +++++++++---------- .../kstream/internals/KTableImplTest.java | 4 +-- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index abeb385573370..5d9e1cb61d07e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -26,7 +26,7 @@ /** * A meta representation of a {@link Topology topology}. *

- * The nodes of a topology are grouped into {@link SubtopologyDescription sub-topologies} if they are connected. + * The nodes of a topology are grouped into {@link Subtopology sub-topologies} if they are connected. * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology * {@link Topology#addSource(String, String...) reads} from the same topic. @@ -43,7 +43,7 @@ public interface TopologyDescription { * {@link Topology#connectProcessorAndStateStores(String, String...) state stores} * (i.e., if multiple processors share the same state). */ - interface SubtopologyDescription { + interface Subtopology { /** * Internally assigned unique ID. * @return the ID of the sub-topology @@ -62,10 +62,10 @@ interface SubtopologyDescription { * org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String, * String, org.apache.kafka.streams.processor.api.ProcessorSupplier) global store}. * Adding a global store results in adding a source node and one stateful processor node. - * Note, that all added global stores form a single unit (similar to a {@link SubtopologyDescription}) even if different + * Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different * global stores are not connected to each other. * Furthermore, global stores are available to all processors without connecting them explicitly, and thus global - * stores will never be part of any {@link SubtopologyDescription}. + * stores will never be part of any {@link Subtopology}. */ interface GlobalStore { /** @@ -168,7 +168,7 @@ interface Sink extends Node { * All sub-topologies of the represented topology. * @return set of all sub-topologies */ - Set subtopologies(); + Set subtopologies(); /** * All global stores of the represented topology. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 60ca59dd5134e..9b67c035daf6f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyDescription.Subtopology; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; @@ -28,7 +29,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; @@ -1076,8 +1076,8 @@ public Set allStateStoreName() { * * @return groups of topic names */ - public synchronized Map topicGroups() { - final Map topicGroups = new LinkedHashMap<>(); + public synchronized Map topicGroups() { + final Map topicGroups = new LinkedHashMap<>(); if (nodeGroups == null) { nodeGroups = makeNodeGroups(); @@ -1140,7 +1140,7 @@ public synchronized Map topicGroups() { } } if (!sourceTopics.isEmpty()) { - topicGroups.put(new Subtopology(entry.getKey(), namedTopology), new TopicsInfo( + topicGroups.put(new TopologyMetadata.Subtopology(entry.getKey(), namedTopology), new TopicsInfo( Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableMap(repartitionTopics), @@ -1754,7 +1754,7 @@ public int hashCode() { } } - public final static class SubtopologyDescription implements org.apache.kafka.streams.TopologyDescription.SubtopologyDescription { + public final static class SubtopologyDescription implements Subtopology { private final int id; private final Set nodes; @@ -1891,10 +1891,10 @@ public int compare(final TopologyDescription.GlobalStore globalStore1, private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator(); - private static class SubtopologyComparator implements Comparator, Serializable { + private static class SubtopologyComparator implements Comparator, Serializable { @Override - public int compare(final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology1, - final org.apache.kafka.streams.TopologyDescription.SubtopologyDescription subtopology2) { + public int compare(final Subtopology subtopology1, + final Subtopology subtopology2) { if (subtopology1.equals(subtopology2)) { return 0; } @@ -1905,10 +1905,10 @@ public int compare(final org.apache.kafka.streams.TopologyDescription.Subtopolog private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator(); public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { - private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); + private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); - public void addSubtopology(final SubtopologyDescription subtopology) { + public void addSubtopology(final Subtopology subtopology) { subtopologies.add(subtopology); } @@ -1917,7 +1917,7 @@ public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) { } @Override - public Set subtopologies() { + public Set subtopologies() { return Collections.unmodifiableSet(subtopologies); } @@ -1930,8 +1930,8 @@ public Set globalStores() { public String toString() { final StringBuilder sb = new StringBuilder(); sb.append("Topologies:\n "); - final SubtopologyDescription[] sortedSubtopologies = - subtopologies.descendingSet().toArray(new SubtopologyDescription[0]); + final Subtopology[] sortedSubtopologies = + subtopologies.descendingSet().toArray(new Subtopology[0]); final TopologyDescription.GlobalStore[] sortedGlobalStores = globalStores.descendingSet().toArray(new GlobalStore[0]); int expectedId = 0; @@ -1939,7 +1939,7 @@ public String toString() { int globalStoresIndex = sortedGlobalStores.length - 1; while (subtopologiesIndex != -1 && globalStoresIndex != -1) { sb.append(" "); - final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex]; + final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; if (subtopology.id() == expectedId) { sb.append(subtopology); @@ -1951,7 +1951,7 @@ public String toString() { expectedId++; } while (subtopologiesIndex != -1) { - final SubtopologyDescription subtopology = sortedSubtopologies[subtopologiesIndex]; + final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; sb.append(" "); sb.append(subtopology); subtopologiesIndex--; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index c1814241db6c2..b979d3ecd7734 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -26,7 +26,7 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; -import org.apache.kafka.streams.TopologyDescription.SubtopologyDescription; +import org.apache.kafka.streams.TopologyDescription.Subtopology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyTestDriverWrapper; import org.apache.kafka.streams.kstream.Consumed; @@ -406,7 +406,7 @@ public void shouldEnableSendingOldValuesIfNotMaterializedAlreadyButForcedToMater } private void assertTopologyContainsProcessor(final Topology topology, final String processorName) { - for (final SubtopologyDescription subtopology: topology.describe().subtopologies()) { + for (final Subtopology subtopology: topology.describe().subtopologies()) { for (final TopologyDescription.Node node: subtopology.nodes()) { if (node.name().equals(processorName)) { return; From c79a60fd413c29e2a2a228c4421c2a76d67c3d32 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 17 May 2021 09:34:50 -0700 Subject: [PATCH 2/2] qualify public interface --- .../internals/InternalTopologyBuilder.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 9b67c035daf6f..37d43b41aaa63 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyDescription.Subtopology; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.processor.StateStore; @@ -29,6 +28,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; @@ -1076,8 +1076,8 @@ public Set allStateStoreName() { * * @return groups of topic names */ - public synchronized Map topicGroups() { - final Map topicGroups = new LinkedHashMap<>(); + public synchronized Map topicGroups() { + final Map topicGroups = new LinkedHashMap<>(); if (nodeGroups == null) { nodeGroups = makeNodeGroups(); @@ -1140,7 +1140,7 @@ public synchronized Map topicGroups() } } if (!sourceTopics.isEmpty()) { - topicGroups.put(new TopologyMetadata.Subtopology(entry.getKey(), namedTopology), new TopicsInfo( + topicGroups.put(new Subtopology(entry.getKey(), namedTopology), new TopicsInfo( Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableMap(repartitionTopics), @@ -1754,7 +1754,7 @@ public int hashCode() { } } - public final static class SubtopologyDescription implements Subtopology { + public final static class SubtopologyDescription implements TopologyDescription.Subtopology { private final int id; private final Set nodes; @@ -1891,10 +1891,10 @@ public int compare(final TopologyDescription.GlobalStore globalStore1, private final static GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator(); - private static class SubtopologyComparator implements Comparator, Serializable { + private static class SubtopologyComparator implements Comparator, Serializable { @Override - public int compare(final Subtopology subtopology1, - final Subtopology subtopology2) { + public int compare(final TopologyDescription.Subtopology subtopology1, + final TopologyDescription.Subtopology subtopology2) { if (subtopology1.equals(subtopology2)) { return 0; } @@ -1905,10 +1905,10 @@ public int compare(final Subtopology subtopology1, private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator(); public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { - private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); + private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); - public void addSubtopology(final Subtopology subtopology) { + public void addSubtopology(final TopologyDescription.Subtopology subtopology) { subtopologies.add(subtopology); } @@ -1917,7 +1917,7 @@ public void addGlobalStore(final TopologyDescription.GlobalStore globalStore) { } @Override - public Set subtopologies() { + public Set subtopologies() { return Collections.unmodifiableSet(subtopologies); } @@ -1930,8 +1930,8 @@ public Set globalStores() { public String toString() { final StringBuilder sb = new StringBuilder(); sb.append("Topologies:\n "); - final Subtopology[] sortedSubtopologies = - subtopologies.descendingSet().toArray(new Subtopology[0]); + final TopologyDescription.Subtopology[] sortedSubtopologies = + subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]); final TopologyDescription.GlobalStore[] sortedGlobalStores = globalStores.descendingSet().toArray(new GlobalStore[0]); int expectedId = 0; @@ -1939,7 +1939,7 @@ public String toString() { int globalStoresIndex = sortedGlobalStores.length - 1; while (subtopologiesIndex != -1 && globalStoresIndex != -1) { sb.append(" "); - final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex]; if (subtopology.id() == expectedId) { sb.append(subtopology); @@ -1951,7 +1951,7 @@ public String toString() { expectedId++; } while (subtopologiesIndex != -1) { - final Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; + final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex]; sb.append(" "); sb.append(subtopology); subtopologiesIndex--;