From f10797abcdbd9e9896e221db84990a9aa0971284 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Sun, 2 Jul 2023 11:10:43 +0800 Subject: [PATCH] [fix][branch-2.10] Fix duplicated deleting topics (#20685) ### Motivation 1. The topics have been deleted twice. https://github.com/apache/pulsar/blob/90369a0da639d43c841974d08ab77faeb7ba5cdd/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L342-L355 2. Similar to https://github.com/apache/pulsar/pull/20683 We need to do Deduplication. --- .../broker/admin/impl/NamespacesBase.java | 22 +++++++++++++------ .../pulsar/broker/admin/AdminApi2Test.java | 12 +++++----- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ae7f36ad19129..f325fd26a9499 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -339,6 +339,8 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth return; } } + noPartitionSystemTopic.removeAll(partitionSystemTopic); + noPartitionedTopicPolicySystemTopic.removeAll(partitionedTopicPolicySystemTopic); deleteSystemTopicFuture = internalDeleteTopicsAsync(noPartitionSystemTopic) .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionSystemTopic)) .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) @@ -348,8 +350,6 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth } deleteSystemTopicFuture - .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic)) - .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic)) .thenCompose(__ -> { List> deleteBundleFutures = Lists.newArrayList(); NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() @@ -523,7 +523,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } String partitionedTopic = topicName.getPartitionedTopicName(); if (!partitionedTopics.contains(partitionedTopic)) { - partitionedTopics.add(partitionedTopic); + if (!partitionedTopics.contains(partitionedTopic) + && !nonPartitionedTopics.contains(partitionedTopic)) { + partitionedTopics.add(partitionedTopic); + } else { + continue; + } } } else { if (pulsar().getBrokerService().isSystemTopic(topicName)) { @@ -534,7 +539,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } continue; } + if (!partitionedTopics.contains(topic) + && !nonPartitionedTopics.contains(topic)) { nonPartitionedTopics.add(topic); + } else { + continue; + } } topicFutures.add(pulsar().getAdminClient().topics().deleteAsync( topic, true, true)); @@ -550,10 +560,6 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } } - for (String partitionedTopic : partitionedTopics) { - topicFutures.add(namespaceResources().getPartitionedTopicResources() - .deletePartitionedTopicAsync(TopicName.get(partitionedTopic))); - } if (log.isDebugEnabled()) { log.debug("Successfully send deletion command of partitioned-topics:{} " @@ -561,6 +567,8 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo partitionedTopics, nonPartitionedTopics, namespaceName); } + allPartitionedSystemTopics.removeAll(allSystemTopics); + partitionedTopicPolicySystemTopic.removeAll(noPartitionedTopicPolicySystemTopic); final CompletableFuture topicFutureEx = FutureUtil.waitForAll(topicFutures) .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics)) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index a928101c460a8..247c95f8418f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1419,7 +1419,8 @@ public Object[][] namespaceAttributes(){ return new Object[][]{ {new NamespaceAttr(false, "non-partitioned", 0, false)}, {new NamespaceAttr(true, "non-partitioned", 0, false)}, - {new NamespaceAttr(true, "partitioned", 3, false)} + {new NamespaceAttr(true, "partitioned", 3, false)}, + {new NamespaceAttr(true, "partitioned", 3, true)} }; } @@ -1472,11 +1473,12 @@ public void testDeleteNamespace(NamespaceAttr namespaceAttr) throws Exception { // Expected: cannot delete non-empty tenant } - // delete topic - admin.topics().deletePartitionedTopic(topic); - + if (!conf.isForceDeleteNamespaceAllowed()) { + // delete topic + admin.topics().deletePartitionedTopic(topic); + } // delete namespace - admin.namespaces().deleteNamespace(namespace, false); + admin.namespaces().deleteNamespace(namespace, conf.isForceDeleteNamespaceAllowed()); assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace)); assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());