Skip to content

Commit

Permalink
[fix][admin] Listen partitioned topic creation event (#23680)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece authored Dec 8, 2024
1 parent 24c337f commit 0a2ffe4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
import org.apache.pulsar.broker.web.PulsarWebResource;
Expand Down Expand Up @@ -166,6 +168,10 @@ public CompletableFuture<Void> validatePoliciesReadOnlyAccessAsync() {

protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
if (!topicName.isPersistent()) {
for (int i = 0; i < numPartitions; i++) {
pulsar().getBrokerService().getTopicEventsDispatcher()
.notify(topicName.getPartition(i).toString(), TopicEvent.CREATE, EventStage.SUCCESS);
}
return CompletableFuture.completedFuture(null);
}
List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
Expand Down Expand Up @@ -201,6 +207,8 @@ private CompletableFuture<Void> tryCreatePartitionAsync(final int partition) {
}
return null;
});
pulsar().getBrokerService().getTopicEventsDispatcher()
.notifyOnCompletion(result, topicName.getPartition(partition).toString(), TopicEvent.CREATE);
return result;
}

Expand Down Expand Up @@ -594,6 +602,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
throw new RestException(Status.CONFLICT, "This topic already exists");
}
})
.thenRun(() -> {
for (int i = 0; i < numPartitions; i++) {
pulsar().getBrokerService().getTopicEventsDispatcher()
.notify(topicName.getPartition(i).toString(), TopicEvent.CREATE,
EventStage.BEFORE);
}
})
.thenCompose(__ -> provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
.thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
.thenRun(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,23 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti
private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) throws Exception {
final String[] expectedEvents;
if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
if (topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
} else {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
}
} else {
expectedEvents = new String[]{
// Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic
Expand Down

0 comments on commit 0a2ffe4

Please sign in to comment.