Skip to content

Commit

Permalink
KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
Browse files Browse the repository at this point in the history
Trigger task reconfiguration when:
- topic-partitions are created or deleted on source cluster
- topic-partitions are missing on target cluster

Authors: Mickael Maison <mickael.maison@gmail.com>, Edoardo Comar <ecomar@uk.ibm.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
  • Loading branch information
mimaison authored Oct 19, 2020
1 parent d71fd88 commit 270881c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public MirrorSourceConnector() {
this.replicationPolicy = replicationPolicy;
this.topicFilter = topicFilter;
this.configPropertyFilter = configPropertyFilter;
}
}

@Override
public void start(Map<String, String> props) {
Expand Down Expand Up @@ -202,6 +202,7 @@ List<TopicPartition> findTargetTopicPartitions()
throws InterruptedException, ExecutionException {
Set<String> topics = listTopics(targetAdminClient).stream()
.filter(t -> sourceAndTarget.source().equals(replicationPolicy.topicSource(t)))
.filter(t -> !t.equals(config.checkpointsTopic()))
.collect(Collectors.toSet());
return describeTopics(targetAdminClient, topics).stream()
.flatMap(MirrorSourceConnector::expandTopicDescription)
Expand All @@ -211,21 +212,44 @@ List<TopicPartition> findTargetTopicPartitions()
// visible for testing
void refreshTopicPartitions()
throws InterruptedException, ExecutionException {
knownSourceTopicPartitions = findSourceTopicPartitions();
knownTargetTopicPartitions = findTargetTopicPartitions();
List<TopicPartition> upstreamTargetTopicPartitions = knownTargetTopicPartitions.stream()

List<TopicPartition> sourceTopicPartitions = findSourceTopicPartitions();
List<TopicPartition> targetTopicPartitions = findTargetTopicPartitions();

Set<TopicPartition> sourceTopicPartitionsSet = new HashSet<>(sourceTopicPartitions);
Set<TopicPartition> knownSourceTopicPartitionsSet = new HashSet<>(knownSourceTopicPartitions);

Set<TopicPartition> upstreamTargetTopicPartitions = targetTopicPartitions.stream()
.map(x -> new TopicPartition(replicationPolicy.upstreamTopic(x.topic()), x.partition()))
.collect(Collectors.toList());
.collect(Collectors.toSet());

Set<TopicPartition> missingInTarget = new HashSet<>(sourceTopicPartitions);
missingInTarget.removeAll(upstreamTargetTopicPartitions);

knownTargetTopicPartitions = targetTopicPartitions;

// Detect if topic-partitions were added or deleted from the source cluster
// or if topic-partitions are missing from the target cluster
if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) {

Set<TopicPartition> newTopicPartitions = sourceTopicPartitionsSet;
newTopicPartitions.removeAll(knownSourceTopicPartitions);

Set<TopicPartition> deletedTopicPartitions = knownSourceTopicPartitionsSet;
deletedTopicPartitions.removeAll(sourceTopicPartitions);

log.info("Found {} new topic-partitions on {}. " +
"Found {} deleted topic-partitions on {}. " +
"Found {} topic-partitions missing on {}.",
newTopicPartitions.size(), sourceAndTarget.source(),
deletedTopicPartitions.size(), sourceAndTarget.source(),
missingInTarget.size(), sourceAndTarget.target());

log.trace("Found new topic-partitions on {}: {}", sourceAndTarget.source(), newTopicPartitions);
log.trace("Found deleted topic-partitions on {}: {}", sourceAndTarget.source(), deletedTopicPartitions);
log.trace("Found missing topic-partitions on {}: {}", sourceAndTarget.target(), missingInTarget);

Set<TopicPartition> newTopicPartitions = new HashSet<>(knownSourceTopicPartitions);
newTopicPartitions.removeAll(upstreamTargetTopicPartitions);
Set<TopicPartition> deadTopicPartitions = new HashSet<>(upstreamTargetTopicPartitions);
deadTopicPartitions.removeAll(knownSourceTopicPartitions);
if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) {
log.info("Found {} topic-partitions on {}. {} are new. {} were removed. Previously had {}.",
knownSourceTopicPartitions.size(), sourceAndTarget.source(), newTopicPartitions.size(),
deadTopicPartitions.size(), knownSourceTopicPartitions.size());
log.trace("Found new topic-partitions: {}", newTopicPartitions);
knownSourceTopicPartitions = sourceTopicPartitions;
computeAndCreateTopicPartitions();
context.requestTaskReconfiguration();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void testRefreshTopicPartitions() throws Exception {
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);

List<TopicPartition> sourceTopicPartitions = Arrays.asList(new TopicPartition("topic", 0));
List<TopicPartition> sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(Collections.emptyList()).when(connector).findTargetTopicPartitions();
doNothing().when(connector).createTopicPartitions(any(), any(), any());
Expand All @@ -205,11 +205,38 @@ public void testRefreshTopicPartitions() throws Exception {
eq(expectedNewTopics),
eq(Collections.emptyMap()));

List<TopicPartition> targetTopicPartitions = Arrays.asList(new TopicPartition("source.topic", 0));
List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
connector.refreshTopicPartitions();

// once target topic is created, refreshTopicPartitions() will NOT call computeAndCreateTopicPartitions() again
verify(connector, times(2)).computeAndCreateTopicPartitions();
}

@Test
public void testRefreshTopicPartitionsTopicOnTargetFirst() throws Exception {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
connector.initialize(mock(ConnectorContext.class));
connector = spy(connector);

List<TopicPartition> sourceTopicPartitions = Collections.emptyList();
List<TopicPartition> targetTopicPartitions = Collections.singletonList(new TopicPartition("source.topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();
doReturn(targetTopicPartitions).when(connector).findTargetTopicPartitions();
doNothing().when(connector).createTopicPartitions(any(), any(), any());

// partitions appearing on the target cluster should not cause reconfiguration
connector.refreshTopicPartitions();
connector.refreshTopicPartitions();
verify(connector, times(0)).computeAndCreateTopicPartitions();

sourceTopicPartitions = Collections.singletonList(new TopicPartition("topic", 0));
doReturn(sourceTopicPartitions).when(connector).findSourceTopicPartitions();

// when partitions are added to the source cluster, reconfiguration is triggered
connector.refreshTopicPartitions();
verify(connector, times(1)).computeAndCreateTopicPartitions();

}
}

0 comments on commit 270881c

Please sign in to comment.