Skip to content

Commit

Permalink
Migrate tests to use Subtopology
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed May 13, 2021
1 parent 66d81f7 commit 6245684
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.Objects;

public class TopologyMetadata {
//TODO KAFKA-12648: the TopologyMetadata class is filled in by Pt. 2 (PR #10683)

Expand All @@ -27,5 +29,23 @@ public Subtopology(final int nodeGroupId, final String namedTopology) {
this.nodeGroupId = nodeGroupId;
this.namedTopology = namedTopology;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Subtopology that = (Subtopology) o;
return nodeGroupId == that.nodeGroupId &&
Objects.equals(namedTopology, that.namedTopology);
}

@Override
public int hashCode() {
return Objects.hash(nodeGroupId, namedTopology);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import java.util.Set;

import static java.time.Duration.ofMillis;

import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
Expand Down Expand Up @@ -210,8 +212,8 @@ public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {

assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));
assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.size(), equalTo(2));
for (final InternalTopicConfig config : internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.values()) {
assertThat(internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2));
for (final InternalTopicConfig config : internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) {
assertThat(
config.getProperties(Collections.emptyMap(), 0).get("test"),
equalTo("property")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;

import org.junit.Test;

import java.util.Collections;
Expand All @@ -28,6 +30,8 @@
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;

import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
Expand Down Expand Up @@ -80,8 +84,8 @@ public class ChangelogTopicsTest {
@Test
public void shouldNotContainChangelogsForStatelessTasks() {
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
final Map<Integer, TopicsInfo> topicGroups = mkMap(mkEntry(0, TOPICS_INFO2));
final Map<Integer, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(0, mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO2));
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, mkSet(TASK_0_0, TASK_0_1, TASK_0_2)));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
Expand All @@ -100,9 +104,9 @@ public void shouldNotContainChangelogsForStatelessTasks() {
public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated() {
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.andStubReturn(mkSet(CHANGELOG_TOPIC_NAME1));
final Map<Integer, TopicsInfo> topicGroups = mkMap(mkEntry(0, TOPICS_INFO1));
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Integer, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(0, tasks));
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
Expand All @@ -122,9 +126,9 @@ public void shouldNotContainAnyPreExistingChangelogsIfChangelogIsNewlyCreated()
public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.andStubReturn(Collections.emptySet());
final Map<Integer, TopicsInfo> topicGroups = mkMap(mkEntry(0, TOPICS_INFO1));
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Integer, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(0, tasks));
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
Expand All @@ -149,9 +153,9 @@ public void shouldOnlyContainPreExistingNonSourceBasedChangelogs() {
@Test
public void shouldOnlyContainPreExistingSourceBasedChangelogs() {
expect(internalTopicManager.makeReady(Collections.emptyMap())).andStubReturn(Collections.emptySet());
final Map<Integer, TopicsInfo> topicGroups = mkMap(mkEntry(0, TOPICS_INFO3));
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO3));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Integer, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(0, tasks));
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
Expand All @@ -176,9 +180,9 @@ public void shouldOnlyContainPreExistingSourceBasedChangelogs() {
public void shouldContainBothTypesOfPreExistingChangelogs() {
expect(internalTopicManager.makeReady(mkMap(mkEntry(CHANGELOG_TOPIC_NAME1, CHANGELOG_TOPIC_CONFIG))))
.andStubReturn(Collections.emptySet());
final Map<Integer, TopicsInfo> topicGroups = mkMap(mkEntry(0, TOPICS_INFO4));
final Map<Subtopology, TopicsInfo> topicGroups = mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO4));
final Set<TaskId> tasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
final Map<Integer, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(0, tasks));
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup = mkMap(mkEntry(SUBTOPOLOGY_0, tasks));
replay(internalTopicManager);

final ChangelogTopics changelogTopics =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.SubtopologyDescription;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
Expand Down Expand Up @@ -54,6 +55,10 @@
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_2;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -571,12 +576,12 @@ public void testTopicGroups() {

builder.addProcessor("processor-3", new MockApiProcessorSupplier<>(), "source-3", "source-4");

final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();

final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap()));
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(SUBTOPOLOGY_1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(SUBTOPOLOGY_2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap()));

assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
Expand Down Expand Up @@ -608,21 +613,21 @@ public void testTopicGroupsByStateStore() {
builder.connectProcessorAndStateStores("processor-5", "store-3");
builder.buildTopology();

final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();

final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1");
final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
expectedTopicGroups.put(SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo(
Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
Collections.emptyMap(),
Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap()))));
expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
expectedTopicGroups.put(SUBTOPOLOGY_1, new InternalTopologyBuilder.TopicsInfo(
Collections.emptySet(), mkSet("topic-3", "topic-4"),
Collections.emptyMap(),
Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap()))));
expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
expectedTopicGroups.put(SUBTOPOLOGY_2, new InternalTopologyBuilder.TopicsInfo(
Collections.emptySet(), mkSet("topic-5"),
Collections.emptyMap(),
Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap()))));
Expand Down Expand Up @@ -838,7 +843,7 @@ public void shouldAddInternalTopicConfigForWindowStores() {
"processor"
);
builder.buildTopology();
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000);
Expand All @@ -863,7 +868,7 @@ public void shouldAddInternalTopicConfigForNonWindowStores() {
builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
builder.addStateStore(storeBuilder, "processor");
builder.buildTopology();
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
Expand Down Expand Up @@ -904,11 +909,11 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() {
builder.addSubscribedTopicsFromMetadata(updatedTopics, null);
builder.setApplicationId("test-id");

final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo"));
assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A"));
assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B"));
assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3"));
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
assertTrue(topicGroups.get(SUBTOPOLOGY_0).sourceTopics.contains("topic-foo"));
assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-A"));
assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-B"));
assertTrue(topicGroups.get(SUBTOPOLOGY_2).sourceTopics.contains("topic-3"));
}

@Test
Expand Down Expand Up @@ -1094,9 +1099,9 @@ public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesArePr
builder.addInternalTopic("topic-1z", new InternalTopicProperties(numberOfPartitions));
builder.addSource(null, "source-1", null, null, null, "topic-1z");

final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();

final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(0).repartitionSourceTopics;
final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;

assertEquals(
repartitionSourceTopics.get("Z-topic-1z"),
Expand All @@ -1115,9 +1120,9 @@ public void shouldHandleWhenTopicPropertiesNumberOfPartitionsIsNull() {
builder.addInternalTopic("topic-1t", InternalTopicProperties.empty());
builder.addSource(null, "source-1", null, null, null, "topic-1t");

final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();

final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(0).repartitionSourceTopics;
final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;

assertEquals(
repartitionSourceTopics.get("T-topic-1t"),
Expand All @@ -1134,9 +1139,9 @@ public void shouldHaveCorrectInternalTopicConfigWhenInternalTopicPropertiesAreNo
builder.addInternalTopic("topic-1y", InternalTopicProperties.empty());
builder.addSource(null, "source-1", null, null, null, "topic-1y");

final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();

final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(0).repartitionSourceTopics;
final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;

assertEquals(
repartitionSourceTopics.get("Y-topic-1y"),
Expand Down
Loading

0 comments on commit 6245684

Please sign in to comment.