diff --git a/index.d.ts b/index.d.ts index 161dabce..bd471469 100644 --- a/index.d.ts +++ b/index.d.ts @@ -96,6 +96,7 @@ export interface ConsumerConfig { autoAckOldestChunkedMessageOnQueueFull?: number; schema?: SchemaInfo; batchIndexAckEnabled?: boolean; + regexSubscriptionMode?: RegexSubscriptionMode; } export class Consumer { @@ -251,6 +252,11 @@ export type ConsumerCryptoFailureAction = 'DISCARD' | 'CONSUME'; +export type RegexSubscriptionMode = + 'PersistentOnly' | + 'NonPersistentOnly' | + 'AllTopics'; + export type SchemaType = 'None' | 'String' | diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc index 69f97d99..be646aee 100644 --- a/src/ConsumerConfig.cc +++ b/src/ConsumerConfig.cc @@ -31,6 +31,7 @@ static const std::string CFG_TOPICS_PATTERN = "topicsPattern"; static const std::string CFG_SUBSCRIPTION = "subscription"; static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType"; static const std::string CFG_INIT_POSITION = "subscriptionInitialPosition"; +static const std::string CFG_REGEX_SUBSCRIPTION_MODE = "regexSubscriptionMode"; static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs"; static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs"; static const std::string CFG_RECV_QUEUE = "receiverQueueSize"; @@ -53,6 +54,11 @@ static const std::map SUBSCRIPTION_TYPE = { {"KeyShared", pulsar_ConsumerKeyShared}, {"Failover", pulsar_ConsumerFailover}}; +static const std::map REGEX_SUBSCRIPTION_MODE = { + {"PersistentOnly", pulsar_consumer_regex_sub_mode_PersistentOnly}, + {"NonPersistentOnly", pulsar_consumer_regex_sub_mode_NonPersistentOnly}, + {"AllTopics", pulsar_consumer_regex_sub_mode_AllTopics}}; + static const std::map INIT_POSITION = { {"Latest", initial_position_latest}, {"Earliest", initial_position_earliest}}; @@ -111,6 +117,16 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag } } + if (consumerConfig.Has(CFG_REGEX_SUBSCRIPTION_MODE) && + consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).IsString()) { + std::string regexSubscriptionMode = + consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).ToString().Utf8Value(); + if (REGEX_SUBSCRIPTION_MODE.count(regexSubscriptionMode)) { + pulsar_consumer_configuration_set_regex_subscription_mode( + this->cConsumerConfig.get(), REGEX_SUBSCRIPTION_MODE.at(regexSubscriptionMode)); + } + } + if (consumerConfig.Has(CFG_CONSUMER_NAME) && consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) { std::string consumerName = consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value(); if (!consumerName.empty()) diff --git a/tests/consumer.test.js b/tests/consumer.test.js index 828cb888..d976cb24 100644 --- a/tests/consumer.test.js +++ b/tests/consumer.test.js @@ -197,6 +197,56 @@ const Pulsar = require('../index.js'); 'Failed to receive message: TimeOut', ); }); + + test('Regex subscription', async () => { + const topicName1 = 'persistent://public/default/regex-sub-1'; + const topicName2 = 'persistent://public/default/regex-sub-2'; + const topicName3 = 'non-persistent://public/default/regex-sub-3'; + const topicName4 = 'persistent://public/default/no-match-regex-sub-2'; + const producer1 = await client.createProducer({ + topic: topicName1, + }); + const producer2 = await client.createProducer({ + topic: topicName2, + }); + const producer3 = await client.createProducer({ + topic: topicName3, + }); + const producer4 = await client.createProducer({ + topic: topicName4, + }); + + const consumer = await client.subscribe({ + topicsPattern: 'persistent://public/default/regex-sub.*', + subscription: 'sub1', + subscriptionType: 'Shared', + regexSubscriptionMode: 'AllTopics', + }); + + const num = 10; + for (let i = 0; i < num; i += 1) { + const msg = `my-message-${i}`; + await producer1.send({ data: Buffer.from(msg) }); + await producer2.send({ data: Buffer.from(msg) }); + await producer3.send({ data: Buffer.from(msg) }); + await producer4.send({ data: Buffer.from(msg) }); + } + const results = []; + for (let i = 0; i < 3 * num; i += 1) { + const msg = await consumer.receive(); + results.push(msg.getData().toString()); + } + expect(results.length).toEqual(3 * num); + // assert no more msgs. + await expect(consumer.receive(1000)).rejects.toThrow( + 'Failed to receive message: TimeOut', + ); + await producer1.close(); + await producer2.close(); + await producer3.close(); + await producer4.close(); + await consumer.close(); + }); }); }); })();