Skip to content

Commit

Permalink
[improve][admin] Remove duplicate topics name when deleteNamespace (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Jul 4, 2023
1 parent 886f535 commit a3bca68
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,14 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
}))
.thenCompose(topics -> {
List<String> allTopics = topics.get(0);
ArrayList<String> allUserCreatedTopics = new ArrayList<>();
Set<String> allUserCreatedTopics = new HashSet<>();
List<String> allPartitionedTopics = topics.get(1);
ArrayList<String> allUserCreatedPartitionTopics = new ArrayList<>();
Set<String> allUserCreatedPartitionTopics = new HashSet<>();
boolean hasNonSystemTopic = false;
List<String> allSystemTopics = new ArrayList<>();
List<String> allPartitionedSystemTopics = new ArrayList<>();
List<String> topicPolicy = new ArrayList<>();
List<String> partitionedTopicPolicy = new ArrayList<>();
Set<String> allSystemTopics = new HashSet<>();
Set<String> allPartitionedSystemTopics = new HashSet<>();
Set<String> topicPolicy = new HashSet<>();
Set<String> partitionedTopicPolicy = new HashSet<>();
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
Expand Down Expand Up @@ -279,6 +279,12 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
return old;
});
}
allUserCreatedTopics.removeIf(t ->
allPartitionedTopics.contains(TopicName.get(t).getPartitionedTopicName()));
allSystemTopics.removeIf(t ->
allPartitionedTopics.contains(TopicName.get(t).getPartitionedTopicName()));
topicPolicy.removeIf(t ->
allPartitionedTopics.contains(TopicName.get(t).getPartitionedTopicName()));
return markDeleteFuture.thenCompose(__ ->
internalDeleteTopicsAsync(allUserCreatedTopics))
.thenCompose(ignore ->
Expand Down Expand Up @@ -348,7 +354,7 @@ private boolean isDeletedAlongWithUserCreatedTopic(String topic) {
return topic.endsWith(SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
}

private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String> topicNames) {
private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
if (CollectionUtils.isEmpty(topicNames)) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -362,7 +368,7 @@ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String
return FutureUtil.waitForAll(futures);
}

private CompletableFuture<Void> internalDeleteTopicsAsync(List<String> topicNames) {
private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
if (CollectionUtils.isEmpty(topicNames)) {
return CompletableFuture.completedFuture(null);
}
Expand Down

0 comments on commit a3bca68

Please sign in to comment.