Skip to content

Commit

Permalink
[Broker] Check allowAutoSubscriptionCreation when creating init sub (
Browse files Browse the repository at this point in the history
…apache#14458)

Master Issue: apache#13408 

### Motivation

In apache#13355, we added support for creating initial subscription when creating the producer. But the broker didn't check if the subscription can be created automatically. The initial subscription will be created even if the `allowAutoSubscriptionCreation` is disabled.
  • Loading branch information
RobertIndie authored and nicklixinyang committed Apr 20, 2022
1 parent ca36bfa commit f60b531
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,20 @@ protected void handleProducer(final CommandProducer cmdProducer) {
if (!Strings.isNullOrEmpty(initialSubscriptionName)
&& topic.isPersistent()
&& !topic.getSubscriptions().containsKey(initialSubscriptionName)) {
if (!this.getBrokerService().isAllowAutoSubscriptionCreation(topicName)) {
String msg =
"Could not create the initial subscription due to the auto subscription "
+ "creation is not allowed.";
if (producerFuture.completeExceptionally(
new BrokerServiceException.NotAllowedException(msg))) {
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, msg, initialSubscriptionName, topicName);
commandSender.sendErrorResponse(requestId,
ServerError.NotAllowedError, msg);
}
producers.remove(producerId, producerFuture);
return;
}
createInitSubFuture =
topic.createSubscription(initialSubscriptionName, InitialPosition.Earliest,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.fail;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
Expand Down Expand Up @@ -167,4 +168,27 @@ public void testCreateInitialSubscriptionWhenExisting() throws PulsarClientExcep

Assert.assertTrue(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
}

@Test
public void testInitialSubscriptionCreationWithAutoCreationDisable()
throws PulsarAdminException, PulsarClientException {
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final TopicName topic =
TopicName.get("persistent", "public", "default",
"testInitialSubscriptionCreationWithAutoCreationDisable");
final String initialSubscriptionName = "init-sub";
admin.topics().createNonPartitionedTopic(topic.toString());
try {
Producer<byte[]> producer = ((ProducerBuilderImpl<byte[]>) pulsarClient.newProducer())
.initialSubscriptionName(initialSubscriptionName)
.topic(topic.toString())
.create();
fail("Should not pass");
} catch (PulsarClientException.NotAllowedException exception) {
// ok
}

Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class DeadLetterPolicy {
/**
* Name of the initial subscription name of the dead letter topic.
* If this field is not set, the initial subscription for the dead letter topic will not be created.
* If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer will fail
* to be created.
*/
private String initialSubscriptionName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,9 @@ public ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean lazyStartP
/**
* Use this config to automatically create an initial subscription when creating the topic.
* If this field is not set, the initial subscription will not be created.
* This method is limited to internal use
* If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the producer will fail to
* be created.
* This method is limited to internal use. This method will only be used when the consumer creates the dlq producer.
*
* @param initialSubscriptionName Name of the initial subscription of the topic.
* @return the producer builder implementation instance
Expand Down
2 changes: 2 additions & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ message CommandProducer {

// Name of the initial subscription of the topic.
// If this field is not set, the initial subscription will not be created.
// If this field is set but the broker's `allowAutoSubscriptionCreation`
// is disabled, the producer will fail to be created.
optional string initial_subscription_name = 13;
}

Expand Down
2 changes: 1 addition & 1 deletion site2/docs/concepts-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
```
By default, there is no subscription during a DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages. To automatically create an initial subscription for the DLQ, you can specify the `initialSubscriptionName` parameter.
By default, there is no subscription during a DLQ topic creation. Without a just-in-time subscription to the DLQ topic, you may lose messages. To automatically create an initial subscription for the DLQ, you can specify the `initialSubscriptionName` parameter. If this parameter is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer will fail to be created.
```java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
Expand Down

0 comments on commit f60b531

Please sign in to comment.