Skip to content

Commit

Permalink
[fix][broker] Allow broker to handle non-recoverable schema error onl…
Browse files Browse the repository at this point in the history
…y if SchemaLedgerForceRecovery flag is enabled (#23428)
  • Loading branch information
rdhabalia authored Oct 10, 2024
1 parent 5506f50 commit bc3e7f6
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,8 @@ public CompletableFuture<Boolean> hasSchema() {
return brokerService.pulsar().getSchemaRegistryService().getSchema(getSchemaId()).thenApply(Objects::nonNull)
.exceptionally(e -> {
Throwable ex = e.getCause();
if (ex instanceof SchemaException || !((SchemaException) ex).isRecoverable()) {
if (brokerService.pulsar().getConfig().isSchemaLedgerForceRecovery()
&& (ex instanceof SchemaException && !((SchemaException) ex).isRecoverable())) {
return false;
}
throw ex instanceof CompletionException ? (CompletionException) ex : new CompletionException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void testAddProducerOnDeletedSchemaLedgerTopic() throws Exception {
final String topicOne = "test-deleted-schema-ledger";
final String fqtnOne = TopicName.get(TopicDomain.persistent.value(), tenant, namespace, topicOne).toString();

//pulsar.getConfig().setManagedLedgerForceRecovery(true);
pulsar.getConfig().setSchemaLedgerForceRecovery(true);
admin.namespaces().createNamespace(tenant + "/" + namespace, Sets.newHashSet("test"));

// (1) create topic with schema
Expand Down

0 comments on commit bc3e7f6

Please sign in to comment.