Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Do not try to open ML when the topic meta does not exist and do not expect to create a new one. #21995 #22004

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -967,43 +967,49 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
getTopicPoliciesBypassSystemTopic(topicName);
return topicPoliciesFuture.exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName, rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil
.failedFuture(new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
.thenCompose(exists -> {
if (!exists && !createIfMissing) {
return CompletableFuture.completedFuture(Optional.empty());
}
return getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
final String errorInfo = String.format("Topic creation encountered an exception by initialize"
+ " topic policies service. topic_name=%s error_message=%s", topicName,
rc.getMessage());
log.error(errorInfo, rc);
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
}).thenCompose(optionalTopicPolicies -> {
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
if (topicName.isPartitioned()) {
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
.thenCompose((metadata) -> {
// Allow crate non-partitioned persistent topic that name includes
// `partition`
if (metadata.partitions == 0
|| topicName.getPartitionIndex() < metadata.partitions) {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
final String errorMsg =
String.format("Illegal topic partition name %s with max allowed "
+ "%d partitions", topicName, metadata.partitions);
log.warn(errorMsg);
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
}).thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent() && createIfMissing) {
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ "but the returned topic is empty", topicName);
return getTopic(topicName, createIfMissing, properties);
}
return CompletableFuture.completedFuture(optionalTopic);
});
});
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testEvents(String topicTypePersistence, String topicTypePartitioned,
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

events.clear();
if (topicTypePartitioned.equals("partitioned")) {
Expand All @@ -150,7 +150,7 @@ public void testEventsWithUnload(String topicTypePersistence, String topicTypePa
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

events.clear();
admin.topics().unload(topicName);
Expand Down Expand Up @@ -182,7 +182,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Expand Down Expand Up @@ -238,7 +238,7 @@ public void testEventsActiveSub(String topicTypePersistence, String topicTypePar
public void testTopicAutoGC(String topicTypePersistence, String topicTypePartitioned) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" + "topic-" + UUID.randomUUID();

createTopicAndVerifyEvents(topicTypePartitioned, topicName);
createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, topicName);

admin.namespaces().setInactiveTopicPolicies(namespace,
new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
Expand All @@ -262,33 +262,36 @@ public void testTopicAutoGC(String topicTypePersistence, String topicTypePartiti
);
}

private void createTopicAndVerifyEvents(String topicTypePartitioned, String topicName) throws Exception {
private void createTopicAndVerifyEvents(String topicDomain, String topicTypePartitioned, String topicName) throws Exception {
final String[] expectedEvents;
if (topicTypePartitioned.equals("partitioned")) {
topicNameToWatch = topicName + "-partition-1";
admin.topics().createPartitionedTopic(topicName, 2);
triggerPartitionsCreation(topicName);

if (topicDomain.equalsIgnoreCase("persistent") || topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};

} else {
topicNameToWatch = topicName;
admin.topics().createNonPartitionedTopic(topicName);

expectedEvents = new String[]{
// Before https://github.com/apache/pulsar/pull/21995, Pulsar will skip create topic if the topic
// was already exists, and the action "check topic exists" will try to load Managed ledger,
// the check triggers two exrtra events: [LOAD__BEFORE, LOAD__FAILURE].
// #21995 fixed this wrong behavior, so remove these two events.
"LOAD__BEFORE",
"LOAD__FAILURE",
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};

}
if (topicTypePartitioned.equals("partitioned")) {
topicNameToWatch = topicName + "-partition-1";
admin.topics().createPartitionedTopic(topicName, 2);
triggerPartitionsCreation(topicName);
} else {
topicNameToWatch = topicName;
admin.topics().createNonPartitionedTopic(topicName);
}

Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3418,4 +3418,32 @@ private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
producer.close();
admin.topics().delete(topic);
}

@Test
public void testGetStatsIfPartitionNotExists() throws Exception {
// create topic.
final String partitionedTp = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
admin.topics().createPartitionedTopic(partitionedTp, 1);
TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
boolean topicExists1 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertTrue(topicExists1);
// Verify topics-stats works.
TopicStats topicStats = admin.topics().getStats(partition0.toString());
assertNotNull(topicStats);

// Delete partition and call topic-stats again.
admin.topics().delete(partition0.toString());
boolean topicExists2 = pulsar.getBrokerService().getTopic(partition0.toString(), false).join().isPresent();
assertFalse(topicExists2);
// Verify: respond 404.
try {
admin.topics().getStats(partition0.toString());
fail("Should respond 404 after the partition was deleted");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("Topic partitions were not yet created"));
}

// cleanup.
admin.topics().deletePartitionedTopic(partitionedTp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,23 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
.create()) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

try (Consumer<byte[]> ignored = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe()) {
} catch (PulsarClientException.LookupException expected) {
String msg = "Namespace bundle for topic (%s) not served by this instance";
} catch (PulsarClientException.TopicDoesNotExistException expected) {
// Since the "policies.deleted" is "true", the value of "isAllowAutoTopicCreationAsync" will be false,
// so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
assertTrue(expected.getMessage().contains(String.format(msg, topic))
assertTrue(expected.getMessage().contains(topic)
|| expected.getMessage().contains(topicPoliciesServiceInitException));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ public void testPersistentPartitionedTopicUnload() throws Exception {

assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
pulsar.getBrokerService().getTopicIfExists(topicName).get();
assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName));
// The map topics should only contain partitions, does not contain partitioned topic.
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));

// ref of partitioned-topic name should be empty
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
Expand Down
Loading