Skip to content

Commit

Permalink
[fix][branch-2.10] Fix duplicated deleting topics (#20685)
Browse files Browse the repository at this point in the history
  • Loading branch information
liangyepianzhou authored Jul 2, 2023
1 parent 3a2e593 commit f10797a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth
return;
}
}
noPartitionSystemTopic.removeAll(partitionSystemTopic);
noPartitionedTopicPolicySystemTopic.removeAll(partitionedTopicPolicySystemTopic);
deleteSystemTopicFuture = internalDeleteTopicsAsync(noPartitionSystemTopic)
.thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionSystemTopic))
.thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
Expand All @@ -348,8 +350,6 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth
}

deleteSystemTopicFuture
.thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
.thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
.thenCompose(__ -> {
List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
Expand Down Expand Up @@ -523,7 +523,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
partitionedTopics.add(partitionedTopic);
if (!partitionedTopics.contains(partitionedTopic)
&& !nonPartitionedTopics.contains(partitionedTopic)) {
partitionedTopics.add(partitionedTopic);
} else {
continue;
}
}
} else {
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
Expand All @@ -534,7 +539,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}
continue;
}
if (!partitionedTopics.contains(topic)
&& !nonPartitionedTopics.contains(topic)) {
nonPartitionedTopics.add(topic);
} else {
continue;
}
}
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
Expand All @@ -550,17 +560,15 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}
}

for (String partitionedTopic : partitionedTopics) {
topicFutures.add(namespaceResources().getPartitionedTopicResources()
.deletePartitionedTopicAsync(TopicName.get(partitionedTopic)));
}

if (log.isDebugEnabled()) {
log.debug("Successfully send deletion command of partitioned-topics:{} "
+ "and non-partitioned-topics:{} in namespace:{}.",
partitionedTopics, nonPartitionedTopics, namespaceName);
}

allPartitionedSystemTopics.removeAll(allSystemTopics);
partitionedTopicPolicySystemTopic.removeAll(noPartitionedTopicPolicySystemTopic);
final CompletableFuture<Throwable> topicFutureEx =
FutureUtil.waitForAll(topicFutures)
.thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,8 @@ public Object[][] namespaceAttributes(){
return new Object[][]{
{new NamespaceAttr(false, "non-partitioned", 0, false)},
{new NamespaceAttr(true, "non-partitioned", 0, false)},
{new NamespaceAttr(true, "partitioned", 3, false)}
{new NamespaceAttr(true, "partitioned", 3, false)},
{new NamespaceAttr(true, "partitioned", 3, true)}
};
}

Expand Down Expand Up @@ -1472,11 +1473,12 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception {
// Expected: cannot delete non-empty tenant
}

// delete topic
admin.topics().deletePartitionedTopic(topic);

if (!conf.isForceDeleteNamespaceAllowed()) {
// delete topic
admin.topics().deletePartitionedTopic(topic);
}
// delete namespace
admin.namespaces().deleteNamespace(namespace, false);
admin.namespaces().deleteNamespace(namespace, conf.isForceDeleteNamespaceAllowed());
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());

Expand Down

0 comments on commit f10797a

Please sign in to comment.