Skip to content

Commit

Permalink
[improve][broker] Improve exception for topic does not have schema to…
Browse files Browse the repository at this point in the history
… check (apache#22974)

(cherry picked from commit 4c84788)
  • Loading branch information
shibd committed Jul 1, 2024
1 parent 7b2e724 commit bbb37b3
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -1176,7 +1178,16 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
|| (!producers.isEmpty())
|| (numActiveConsumersWithoutAutoSchema != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
return checkSchemaCompatibleForConsumer(schema)
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotExistSchemaException) {
throw FutureUtil.wrapToCompletionException(
new IncompatibleSchemaException("Failed to add schema to an active topic"
+ " with empty(BYTES) schema: new schema type " + schema.getType()));
}
throw FutureUtil.wrapToCompletionException(realCause);
});
} else {
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
Expand Down Expand Up @@ -3705,7 +3707,16 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
|| (userCreatedProducerCount > 0)
|| (numActiveConsumersWithoutAutoSchema != 0)
|| (ledger.getTotalSize() != 0)) {
return checkSchemaCompatibleForConsumer(schema);
return checkSchemaCompatibleForConsumer(schema)
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotExistSchemaException) {
throw FutureUtil.wrapToCompletionException(
new IncompatibleSchemaException("Failed to add schema to an active topic"
+ " with empty(BYTES) schema: new schema type " + schema.getType()));
}
throw FutureUtil.wrapToCompletionException(realCause);
});
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
Expand Down Expand Up @@ -393,7 +394,7 @@ public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, Schem
return checkCompatibilityWithAll(schemaId, schemaData, strategy);
}
} else {
return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check"));
return FutureUtil.failedFuture(new NotExistSchemaException("Topic does not have schema to check"));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema.exceptions;

/**
* Exception is thrown when an schema not exist.
*/
public class NotExistSchemaException extends SchemaException {

private static final long serialVersionUID = -8342983749283749283L;

public NotExistSchemaException() {
super("The schema does not exist");
}

public NotExistSchemaException(String message) {
super(message);
}

public NotExistSchemaException(String message, Throwable e) {
super(message, e);
}

public NotExistSchemaException(Throwable e) {
super(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -119,6 +120,11 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "topicDomain")
public static Object[] topicDomain() {
return new Object[] { "persistent://", "non-persistent://" };
}

@Test
public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
final String tenant = PUBLIC_TENANT;
Expand Down Expand Up @@ -1330,19 +1336,19 @@ private void testIncompatibleSchema() throws Exception {
* the new consumer to register new schema. But before we can solve this problem, we need to modify
* "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES".
*/
@Test
public void testAutoProduceAndSpecifiedConsumer() throws Exception {
@Test(dataProvider = "topicDomain")
public void testAutoProduceAndSpecifiedConsumer(String domain) throws Exception {
final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
final String topicName = "persistent://" + namespace + "/tp_" + randomName(16);
final String topicName = domain + namespace + "/tp_" + randomName(16);
admin.topics().createNonPartitionedTopic(topicName);

Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
try {
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
fail("Should throw ex: Topic does not have schema to check");
fail("Should throw ex: Failed to add schema to an active topic with empty(BYTES) schema");
} catch (Exception ex){
assertTrue(ex.getMessage().contains("Topic does not have schema to check"));
assertTrue(ex.getMessage().contains("Failed to add schema to an active topic with empty(BYTES) schema"));
}

// Cleanup.
Expand Down

0 comments on commit bbb37b3

Please sign in to comment.