From 0fa7215c5628fa4c3058d01620901b2081d8d4a6 Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Tue, 13 Jul 2021 18:28:13 +0200 Subject: [PATCH 1/6] Allow partitioned producers to start lazily Avoids performing lookups and producer registration on all partitions of a partitioned topic when using SinglePartition routing without keyed messages --- .../include/pulsar/ProducerConfiguration.h | 19 +++ .../include/pulsar/c/producer_configuration.h | 6 + pulsar-client-cpp/lib/HandlerBase.cc | 14 ++- pulsar-client-cpp/lib/HandlerBase.h | 1 + .../lib/PartitionedProducerImpl.cc | 69 +++++++++-- .../lib/PartitionedProducerImpl.h | 5 +- .../lib/ProducerConfiguration.cc | 10 ++ .../lib/ProducerConfigurationImpl.h | 1 + pulsar-client-cpp/lib/ProducerImpl.cc | 71 +++++++++-- pulsar-client-cpp/lib/ProducerImpl.h | 1 + .../lib/c/c_ProducerConfiguration.cc | 10 ++ pulsar-client-cpp/python/pulsar/__init__.py | 12 ++ pulsar-client-cpp/python/src/config.cc | 2 + pulsar-client-cpp/tests/BasicEndToEndTest.cc | 101 ++++++++++++++-- .../tests/PartitionsUpdateTest.cc | 44 ++++--- pulsar-client-cpp/tests/ProducerTest.cc | 113 ++++++++++++++++++ pulsar-client-cpp/tests/PulsarFriend.h | 5 + 17 files changed, 431 insertions(+), 53 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 3306b271d8c21..9148c2c8476b3 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -262,6 +262,25 @@ class PULSAR_PUBLIC ProducerConfiguration { */ HashingScheme getHashingScheme() const; + /** + * This config affects producers of partitioned topics only. It controls whether + * producers register and connect immediately to the owner broker of each partition + * or start lazily on demand. Lazy starts occur when a message needs to be routed + * to a partition that the producer has not yet registered and connected to. + * Using this mode can reduce the strain on brokers for topics with large numbers of + * partitions and when the SinglePartition routing policy is used without keyed messages. + * Because producer registration and connection is on demand, this can produce extra + * latency while the registration is being carried out. + * @param true/false as to whether to start partition producers lazily + * @return + */ + ProducerConfiguration& setLazyStartPartitionedProducers(bool); + + /** + * The getter associated with setLazyStartPartitionedProducers() + */ + bool getLazyStartPartitionedProducers() const; + /** * The setter associated with getBlockIfQueueFull() */ diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h index 17653d42807d5..cf62baafe1ff3 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h @@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ PULSAR_PUBLIC pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf); +PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers); + +PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf); + PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full( pulsar_producer_configuration_t *conf, int blockIfQueueFull); diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index de9929d6a73b5..d7025ad004b15 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -35,14 +35,23 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, mutex_(), creationTimestamp_(TimeUtils::now()), operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())), - state_(Pending), + state_(NotStarted), backoff_(backoff), epoch_(0), timer_(executor_->createDeadlineTimer()) {} HandlerBase::~HandlerBase() { timer_->cancel(); } -void HandlerBase::start() { grabCnx(); } +void HandlerBase::start() { + Lock lock(mutex_); + // guard against concurrent state changes such as closing + if (state_ == NotStarted) { + state_ = Pending; + lock.unlock(); + + grabCnx(); + } +} void HandlerBase::grabCnx() { Lock lock(mutex_); @@ -106,6 +115,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con scheduleReconnection(handler); break; + case NotStarted: case Closing: case Closed: case Failed: diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index 93abba944eb88..eeb8ebe1c5e8d 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -97,6 +97,7 @@ class HandlerBase { enum State { + NotStarted, Pending, Ready, Closing, diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 4e01263d6aac1..b2d9ce6bd7b43 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const { return getNumPartitions(); } -ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const { +ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) { using namespace std::placeholders; std::string topicPartitionName = topicName_->getTopicPartitionName(partition); auto producer = std::make_shared(client_, topicPartitionName, conf_, partition); - producer->getProducerCreatedFuture().addListener( - std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, - const_cast(this)->shared_from_this(), _1, _2, partition)); + + if (conf_.getLazyStartPartitionedProducers()) { + createLazyPartitionProducer(partition); + } else { + producer->getProducerCreatedFuture().addListener( + std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, + const_cast(this)->shared_from_this(), _1, _2, partition)); + } LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName); return producer; @@ -108,8 +113,10 @@ void PartitionedProducerImpl::start() { producers_.push_back(newInternalProducer(i)); } - for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { - (*prod)->start(); + if (!conf_.getLazyStartPartitionedProducers()) { + for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { + (*prod)->start(); + } } } @@ -147,6 +154,20 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result } } +void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) { + const auto numPartitions = getNumPartitions(); + assert(numProducersCreated_ <= numPartitions); + assert(partitionIndex <= numPartitions); + numProducersCreated_++; + if (numProducersCreated_ == numPartitions) { + state_ = Ready; + if (partitionsUpdateTimer_) { + runPartitionUpdateTask(); + } + partitionedProducerCreatedPromise_.setValue(shared_from_this()); + } +} + // override void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) { // get partition for this message from router policy @@ -161,7 +182,19 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac } // find a producer for that partition, index should start from 0 ProducerImplPtr producer = producers_[partition]; + + // if the producer is not started (lazy producer), then kick-off the start process + // but only if we're still in the Ready state (i.e not closing or closed) + if (!producer->isStarted()) { + if (assertState(Ready)) { + producer->start(); + } else { + callback(ResultAlreadyClosed, msg.getMessageId()); + } + } + producersLock.unlock(); + // send message on that partition producer->sendAsync(msg, callback); } @@ -175,6 +208,11 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) { lock.unlock(); } +bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) { + Lock lock(mutex_); + return state_ == state; +} + const std::string& PartitionedProducerImpl::getProducerName() const { Lock producersLock(producersMutex_); return producers_[0]->getProducerName(); @@ -285,7 +323,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; } void PartitionedProducerImpl::triggerFlush() { Lock producersLock(producersMutex_); for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { - (*prod)->triggerFlush(); + if ((*prod)->isStarted()) { + (*prod)->triggerFlush(); + } } } @@ -322,7 +362,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { }; for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { - (*prod)->flushAsync(subFlushCallback); + if ((*prod)->isStarted()) { + (*prod)->flushAsync(subFlushCallback); + } else { + subFlushCallback(ResultOk); + } } } @@ -356,7 +400,10 @@ void PartitionedProducerImpl::handleGetPartitions(Result result, for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) { auto producer = newInternalProducer(i); - producer->start(); + + if (!conf_.getLazyStartPartitionedProducers()) { + producer->start(); + } producers_.push_back(producer); } // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()` @@ -379,8 +426,8 @@ bool PartitionedProducerImpl::isConnected() const { Lock producersLock(producersMutex_); const auto producers = producers_; producersLock.unlock(); - for (const auto& producer : producers_) { - if (!producer->isConnected()) { + for (const auto& producer : producers) { + if (producer->isStarted() && !producer->isConnected()) { return false; } } diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h index c097190fca333..de8144b7220e8 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h @@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase, uint64_t getNumberOfConnectedProducer() override; void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr, const unsigned int partitionIndex); - + void createLazyPartitionProducer(const unsigned int partitionIndex); void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex, CloseCallback callback); @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase, unsigned int getNumPartitions() const; unsigned int getNumPartitionsWithLock() const; - ProducerImplPtr newInternalProducer(unsigned int partition) const; + ProducerImplPtr newInternalProducer(unsigned int partition); MessageRoutingPolicyPtr routerPolicy_; @@ -129,6 +129,7 @@ class PartitionedProducerImpl : public ProducerImplBase, void runPartitionUpdateTask(); void getPartitionMetadata(); void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata); + bool assertState(const PartitionedProducerState state); }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc index 61217f5f47966..3e027eeb1975f 100644 --- a/pulsar-client-cpp/lib/ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc @@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key) return *this; } +ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers( + bool useLazyStartPartitionedProducers) { + impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers; + return *this; +} + +bool ProducerConfiguration::getLazyStartPartitionedProducers() const { + return impl_->useLazyStartPartitionedProducers; +} + ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) { impl_->schemaInfo = schemaInfo; return *this; diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h index fa6b755822c63..a41b2507ea43c 100644 --- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -37,6 +37,7 @@ struct ProducerConfigurationImpl { ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition}; MessageRoutingPolicyPtr messageRouter; ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash}; + bool useLazyStartPartitionedProducers{false}; bool blockIfQueueFull{false}; bool batchingEnabled{true}; unsigned int batchingMaxMessages{1000}; diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index c7a45513ab2f7..67046173e3d7d 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -109,7 +109,7 @@ ProducerImpl::~ProducerImpl() { LOG_DEBUG(getName() << "~ProducerImpl"); cancelTimers(); printStats(); - if (state_ == Ready) { + if (state_ == Ready || state_ == Pending) { LOG_WARN(getName() << "Destroyed producer which was not properly closed"); } } @@ -159,7 +159,11 @@ void ProducerImpl::connectionFailed(Result result) { // Keep a reference to ensure object is kept alive ProducerImplPtr ptr = shared_from_this(); - if (producerCreatedPromise_.setFailed(result)) { + if (conf_.getLazyStartPartitionedProducers()) { + // if producers are lazy, then they should always try to restart + // so don't change the state and allow reconnections + return; + } else if (producerCreatedPromise_.setFailed(result)) { Lock lock(mutex_); state_ = Failed; } @@ -169,6 +173,17 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r const ResponseData& responseData) { LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result)); + // make sure we're still in the Pending/Ready state, closeAsync could have been invoked + // while waiting for this response if using lazy producers + Lock lock(mutex_); + if (state_ != Ready && state_ != Pending) { + lock.unlock(); + LOG_DEBUG("Producer created response received but producer already closed"); + failPendingMessages(ResultAlreadyClosed); + return; + } + lock.unlock(); + if (result == ResultOk) { // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately @@ -320,9 +335,14 @@ void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, Send void ProducerImpl::flushAsync(FlushCallback callback) { if (batchMessageContainer_) { Lock lock(mutex_); - auto failures = batchMessageAndSend(callback); - lock.unlock(); - failures.complete(); + + if (state_ == Ready) { + auto failures = batchMessageAndSend(callback); + lock.unlock(); + failures.complete(); + } else { + callback(ResultAlreadyClosed); + } } else { callback(ResultOk); } @@ -331,9 +351,11 @@ void ProducerImpl::flushAsync(FlushCallback callback) { void ProducerImpl::triggerFlush() { if (batchMessageContainer_) { Lock lock(mutex_); - auto failures = batchMessageAndSend(); - lock.unlock(); - failures.complete(); + if (state_ == Ready) { + auto failures = batchMessageAndSend(); + lock.unlock(); + failures.complete(); + } } } @@ -389,7 +411,8 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { } Lock lock(mutex_); - if (state_ != Ready) { + // producers may be lazily starting and be in the pending state + if (state_ != Ready && state_ != Pending) { lock.unlock(); releaseSemaphore(payloadSize); cb(ResultAlreadyClosed, msg.getMessageId()); @@ -550,10 +573,14 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e return; } LOG_DEBUG(getName() << " - Batch Message Timer expired"); + + // ignore if the producer is already closing/closed Lock lock(mutex_); - auto failures = batchMessageAndSend(); - lock.unlock(); - failures.complete(); + if (state_ == Pending || state_ == Ready) { + auto failures = batchMessageAndSend(); + lock.unlock(); + failures.complete(); + } } void ProducerImpl::printStats() { @@ -568,16 +595,25 @@ void ProducerImpl::printStats() { void ProducerImpl::closeAsync(CloseCallback callback) { Lock lock(mutex_); + // if the producer was never started then there is nothing to clean up + if (state_ == NotStarted) { + state_ = Closed; + callback(ResultOk); + return; + } + // Keep a reference to ensure object is kept alive ProducerImplPtr ptr = shared_from_this(); cancelTimers(); - if (state_ != Ready) { + if (state_ != Ready && state_ != Pending) { + state_ = Closed; lock.unlock(); if (callback) { callback(ResultAlreadyClosed); } + return; } LOG_INFO(getName() << "Closing producer for topic " << topic_); @@ -631,6 +667,10 @@ void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerI } else { LOG_ERROR(getName() << "Failed to close producer: " << strResult(result)); } + + // ensure any remaining send callbacks are called before calling the close callback + failPendingMessages(ResultAlreadyClosed); + if (callback) { callback(result); } @@ -828,5 +868,10 @@ bool ProducerImpl::isConnected() const { uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; } +bool ProducerImpl::isStarted() const { + Lock lock(mutex_); + return state_ != NotStarted; +} + } // namespace pulsar /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index 2c51d41f12788..a819edc6c09a0 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -71,6 +71,7 @@ class ProducerImpl : public HandlerBase, void flushAsync(FlushCallback callback) override; bool isConnected() const override; uint64_t getNumberOfConnectedProducer() override; + bool isStarted() const; bool removeCorruptMessage(uint64_t sequenceId); diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc index 7bc7915c035e0..f26f63a593b08 100644 --- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc @@ -135,6 +135,16 @@ void pulsar_producer_configuration_set_message_router(pulsar_producer_configurat conf->conf.setMessageRouter(std::make_shared(router, ctx)); } +void pulsar_producer_configuration_set_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers) { + conf->conf.setLazyStartPartitionedProducers(useLazyStartPartitionedProducers); +} + +int pulsar_producer_configuration_get_lazy_start_partitioned_producers( + pulsar_producer_configuration_t *conf) { + return conf->conf.getLazyStartPartitionedProducers(); +} + void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf, int blockIfQueueFull) { conf->conf.setBlockIfQueueFull(blockIfQueueFull); diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 514ca11dc7d0d..8e26adf8e6314 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -464,6 +464,7 @@ def create_producer(self, topic, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, + lazy_start_partitioned_producers=False, properties=None, batching_type=BatchingType.Default, encryption_key=None, @@ -518,6 +519,15 @@ def create_producer(self, topic, * `message_routing_mode`: Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`, other option is `PartitionsRoutingMode.UseSinglePartition` + * `lazy_start_partitioned_producers`: + This config affects producers of partitioned topics only. It controls whether + producers register and connect immediately to the owner broker of each partition + or start lazily on demand. Lazy starts occur when a message needs to be routed + to a partition that the producer has not yet registered and connected to. + Using this mode can reduce the strain on brokers for topics with large numbers of + partitions and when the SinglePartition routing policy is used without keyed messages. + Because producer registration and connection is on demand, this can produce extra + latency while the registration is being carried out. * `properties`: Sets the properties for the producer. The properties associated with a producer can be used for identify a producer at broker side. @@ -558,6 +568,7 @@ def create_producer(self, topic, _check_type(BatchingType, batching_type, 'batching_type') _check_type_or_none(str, encryption_key, 'encryption_key') _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') + _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') conf = _pulsar.ProducerConfiguration() conf.send_timeout_millis(send_timeout_millis) @@ -571,6 +582,7 @@ def create_producer(self, topic, conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) conf.partitions_routing_mode(message_routing_mode) conf.batching_type(batching_type) + conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) if producer_name: conf.producer_name(producer_name) if initial_sequence_id: diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 0b30713f64883..ec945590301d9 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -233,6 +233,8 @@ void export_config() { .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>()) .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>()) + .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers) + .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, return_self<>()) .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy()) .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>()) .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, return_value_policy()) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 8e7eb6b3300df..a3ff1c50c0526 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -521,20 +521,21 @@ TEST(BasicEndToEndTest, testInvalidUrlPassed) { ASSERT_EQ(ResultConnectError, result); } -TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { +void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, std::string topicName) { Client client(lookupUrl); - std::string topicName = "testPartitionedProducerConsumer"; // call admin api to make it partitioned - std::string url = - adminUrl + "admin/v2/persistent/public/default/testPartitionedProducerConsumer/partitions"; + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + makeDeleteRequest(url); int res = makePutRequest(url, "3"); LOG_INFO("res = " << res); ASSERT_FALSE(res != 204 && res != 409); + ProducerConfiguration conf; + conf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers); Producer producer; - Result result = client.createProducer(topicName, producer); + Result result = client.createProducer(topicName, conf, producer); ASSERT_EQ(ResultOk, result); Consumer consumer; @@ -561,6 +562,14 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { client.shutdown(); } +TEST(BasicEndToEndTest, testPartitionedProducerConsumer) { + testPartitionedProducerConsumer(false, "testPartitionedProducerConsumer"); +} + +TEST(BasicEndToEndTest, testPartitionedLazyProducerConsumer) { + testPartitionedProducerConsumer(true, "testPartitionedProducerConsumerLazy"); +} + TEST(BasicEndToEndTest, testPartitionedProducerConsumerSubscriptionName) { Client client(lookupUrl); std::string topicName = "testPartitionedProducerConsumerSubscriptionName" + unique_str(); @@ -1247,6 +1256,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { oldConnections.push_back(clientConnectionPtr); clientConnectionPtr->close(); } + LOG_INFO("checking message " << i); ASSERT_EQ(producer.send(msg), ResultOk); } @@ -1280,6 +1290,61 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { } } +void testHandlerReconnectionPartitionProducers(bool lazyStartPartitionedProducers, bool batchingEnabled) { + Client client(adminUrl); + std::string uniqueChunk = unique_str(); + std::string topicName = "testHandlerReconnectionLogicLazyProducers" + uniqueChunk; + + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + int res = makePutRequest(url, "1"); + + ProducerConfiguration producerConf; + producerConf.setSendTimeout(10000); + producerConf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers); + producerConf.setBatchingEnabled(batchingEnabled); + Producer producer; + + ASSERT_EQ(client.createProducer(topicName, producerConf, producer), ResultOk); + + std::vector oldConnections; + + int numOfMessages = 10; + std::string propertyName = "msgIndex"; + for (int i = 0; i < numOfMessages; i++) { + std::string messageContent = "msg-" + std::to_string(i); + Message msg = + MessageBuilder().setContent(messageContent).setProperty(propertyName, std::to_string(i)).build(); + if (i % 3 == 1) { + ProducerImpl &pImpl = PulsarFriend::getInternalProducerImpl(producer, 0); + ClientConnectionPtr clientConnectionPtr; + do { + ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(pImpl); + clientConnectionPtr = clientConnectionWeakPtr.lock(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } while (!clientConnectionPtr); + oldConnections.push_back(clientConnectionPtr); + clientConnectionPtr->close(); + } + ASSERT_EQ(producer.send(msg), ResultOk); + } +} + +TEST(BasicEndToEndTest, testHandlerReconnectionPartitionedProducersWithoutBatching) { + testHandlerReconnectionPartitionProducers(false, false); +} + +TEST(BasicEndToEndTest, testHandlerReconnectionPartitionedProducersWithBatching) { + testHandlerReconnectionPartitionProducers(false, true); +} + +TEST(BasicEndToEndTest, testHandlerReconnectionLazyPartitionedProducersWithoutBatching) { + testHandlerReconnectionPartitionProducers(true, false); +} + +TEST(BasicEndToEndTest, testHandlerReconnectionLazyPartitionedProducersWithBatching) { + testHandlerReconnectionPartitionProducers(true, true); +} + TEST(BasicEndToEndTest, testRSAEncryption) { ClientConfiguration config; Client client(lookupUrl); @@ -2027,12 +2092,16 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) { std::string url4 = adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsNotMatchPubSub4/partitions"; + makeDeleteRequest(url1); int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url2); res = makePutRequest(url2, "3"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url3); res = makePutRequest(url3, "4"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url4); res = makePutRequest(url4, "4"); ASSERT_FALSE(res != 204 && res != 409); @@ -2136,10 +2205,13 @@ TEST(BasicEndToEndTest, testpatternMultiTopicsHttpConsumerPubSub) { std::string url3 = adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub3/partitions"; + makeDeleteRequest(url1); int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url2); res = makePutRequest(url2, "3"); ASSERT_FALSE(res != 204 && res != 409); + makeDeleteRequest(url3); res = makePutRequest(url3, "4"); ASSERT_FALSE(res != 204 && res != 409); @@ -2262,6 +2334,7 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { auto createProducer = [&client](Producer &producer, const std::string &topic, int numPartitions) { if (numPartitions > 0) { const std::string url = adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions"; + makeDeleteRequest(url); int res = makePutRequest(url, std::to_string(numPartitions)); ASSERT_TRUE(res == 204 || res == 409); } @@ -2446,7 +2519,7 @@ static void simpleCallback(Result code, const MessageId &msgId) { LOG_INFO("Received code: " << code << " -- MsgID: " << msgId); } -TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { +void testSyncFlushBatchMessagesPartitionedTopic(bool lazyStartPartitionedProducers) { Client client(lookupUrl); std::string uniqueChunk = unique_str(); std::string topicName = "persistent://public/default/partition-testSyncFlushBatchMessages" + uniqueChunk; @@ -2468,6 +2541,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { // set batch message number numOfMessages, and max delay 60s producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions); producerConfiguration.setBatchingMaxPublishDelayMs(60000); + producerConfiguration.setLazyStartPartitionedProducers(lazyStartPartitionedProducers); Result result = client.createProducer(topicName, producerConfiguration, producer); ASSERT_EQ(ResultOk, result); @@ -2543,6 +2617,14 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { client.shutdown(); } +TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) { + testSyncFlushBatchMessagesPartitionedTopic(false); +} + +TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopicLazyProducers) { + testSyncFlushBatchMessagesPartitionedTopic(true); +} + TEST(BasicEndToEndTest, testGetTopicPartitions) { Client client(lookupUrl); std::string topicName = "persistent://public/default/testGetPartitions"; @@ -2660,7 +2742,7 @@ TEST(BasicEndToEndTest, testFlushInProducer) { client.shutdown(); } -TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { +void testFlushInPartitionedProducer(bool lazyStartPartitionedProducers) { Client client(lookupUrl); std::string uniqueChunk = unique_str(); std::string topicName = @@ -2685,6 +2767,7 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions); producerConfiguration.setBatchingMaxPublishDelayMs(60000); producerConfiguration.setMessageRouter(std::make_shared()); + producerConfiguration.setLazyStartPartitionedProducers(lazyStartPartitionedProducers); Result result = client.createProducer(topicName, producerConfiguration, producer); ASSERT_EQ(ResultOk, result); @@ -2763,6 +2846,10 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { client.shutdown(); } +TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { testFlushInPartitionedProducer(false); } + +TEST(BasicEndToEndTest, testFlushInLazyPartitionedProducer) { testFlushInPartitionedProducer(true); } + TEST(BasicEndToEndTest, testReceiveAsync) { ClientConfiguration config; Client client(lookupUrl); diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc index af473a2c36cd2..845e44771bd71 100644 --- a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc +++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc @@ -32,11 +32,6 @@ using namespace pulsar; static const std::string serviceUrl = "pulsar://localhost:6650"; static const std::string adminUrl = "http://localhost:8080/"; -static const std::string topicNameSuffix = "public/default/partitions-update-test-topic"; -static const std::string topicName = "persistent://" + topicNameSuffix; -static const std::string topicOperateUrl = - adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions"; - static ClientConfiguration newClientConfig(bool enablePartitionsUpdate) { ClientConfiguration clientConfig; if (enablePartitionsUpdate) { @@ -55,14 +50,16 @@ class PartitionsSet { public: size_t size() const { return names_.size(); } - Result initProducer(bool enablePartitionsUpdate) { + Result initProducer(std::string topicName, bool enablePartitionsUpdate, + bool lazyStartPartitionedProducers) { clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate))); - const auto producerConfig = - ProducerConfiguration().setMessageRouter(std::make_shared()); + const auto producerConfig = ProducerConfiguration() + .setMessageRouter(std::make_shared()) + .setLazyStartPartitionedProducers(lazyStartPartitionedProducers); return clientForProducer_->createProducer(topicName, producerConfig, producer_); } - Result initConsumer(bool enablePartitionsUpdate) { + Result initConsumer(std::string topicName, bool enablePartitionsUpdate) { clientForConsumer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate))); return clientForConsumer_->subscribe(topicName, "SubscriptionName", consumer_); } @@ -118,7 +115,10 @@ TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) { ASSERT_EQ(static_cast(-1), clientConfig.getPartitionsUpdateInterval()); } -TEST(PartitionsUpdateTest, testPartitionsUpdate) { +void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicNameSuffix) { + std::string topicName = "persistent://" + topicNameSuffix; + std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions"; + // Ensure `topicName` doesn't exist before created makeDeleteRequest(topicOperateUrl); // Create a 2 partitions topic @@ -128,8 +128,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { PartitionsSet partitionsSet; // 1. Both producer and consumer enable partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(true)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, lazyStartPartitionedProducers)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true)); res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -140,8 +140,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { partitionsSet.close(); // 2. Only producer enables partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(true)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false)); res = makePostRequest(topicOperateUrl, "5"); // update partitions to 5 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -152,8 +152,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { partitionsSet.close(); // 3. Only consumer enables partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(false)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true)); res = makePostRequest(topicOperateUrl, "7"); // update partitions to 7 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -164,8 +164,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { partitionsSet.close(); // 4. Both producer and consumer disables partitions update - ASSERT_EQ(ResultOk, partitionsSet.initProducer(false)); - ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false)); + ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false)); + ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false)); res = makePostRequest(topicOperateUrl, "10"); // update partitions to 10 ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; @@ -175,3 +175,11 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) { ASSERT_EQ(7, partitionsSet.size()); partitionsSet.close(); } + +TEST(PartitionsUpdateTest, testPartitionsUpdate) { + testPartitionsUpdate(false, "public/default/partitions-update-test-topic"); +} + +TEST(PartitionsUpdateTest, testPartitionsUpdateWithLazyProducers) { + testPartitionsUpdate(true, "public/default/partitions-update-test-topic-lazy"); +} diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 61cac57fa59b1..9b5a8ab53ccf7 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -18,11 +18,13 @@ */ #include #include +#include #include "HttpHelper.h" #include "lib/Future.h" #include "lib/Utils.h" +#include "lib/Latch.h" #include "lib/LogUtils.h" DECLARE_LOG_OBJECT() @@ -126,3 +128,114 @@ TEST(ProducerTest, testIsConnected) { client.close(); } + +TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) { + Client client(serviceUrl); + const std::string partitionedTopic = + "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr)); + + int res = makePutRequest( + adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ProducerConfiguration producerConfiguration; + producerConfiguration.setLazyStartPartitionedProducers(true); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer)); + + Message msg = MessageBuilder().setContent("test").build(); + + Promise promiseClose; + producer.closeAsync(WaitForCallback(promiseClose)); + + Promise promise; + producer.sendAsync(msg, WaitForCallbackValue(promise)); + + MessageId mi; + ASSERT_EQ(ResultAlreadyClosed, promise.getFuture().get(mi)); + + Result result; + promiseClose.getFuture().get(result); + ASSERT_EQ(ResultOk, result); +} + +TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) { + // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called + // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed. + for (int run = 0; run < 20; run++) { + Client client(serviceUrl); + const std::string partitionedTopic = + "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr)); + + int res = makePutRequest( + adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + ProducerConfiguration producerConfiguration; + producerConfiguration.setLazyStartPartitionedProducers(true); + producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); + producerConfiguration.setBatchingEnabled(true); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer)); + + int sendCount = 100; + std::vector> promises(sendCount); + Promise promiseClose; + + // only call closeAsync once at least 10 messages have been sent + Latch sendStartLatch(10); + Latch closeLatch(1); + int closedAt = 0; + + std::thread t1([&]() { + for (int i = 0; i < sendCount; i++) { + sendStartLatch.countdown(); + Message msg = MessageBuilder().setContent("test").build(); + + if (closeLatch.getCount() == 0 && closedAt == 0) { + closedAt = i; + LOG_INFO("closedAt set to " << closedAt) + } + + producer.sendAsync(msg, WaitForCallbackValue(promises[i])); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + }); + + std::thread t2([&]() { + sendStartLatch.wait(std::chrono::milliseconds(1000)); + LOG_INFO("Closing"); + producer.closeAsync(WaitForCallback(promiseClose)); + LOG_INFO("Close called"); + closeLatch.countdown(); + Result result; + promiseClose.getFuture().get(result); + ASSERT_EQ(ResultOk, result); + LOG_INFO("Closed"); + }); + + t1.join(); + t2.join(); + + // make sure that all messages after the moment when closeAsync was invoked + // return AlreadyClosed + for (int i = 0; i < sendCount; i++) { + LOG_DEBUG("Checking " << i) + + // whether a message was sent successfully or not, it's callback + // must have been invoked + ASSERT_EQ(true, promises[i].isComplete()); + MessageId mi; + Result res = promises[i].getFuture().get(mi); + LOG_DEBUG("Result is " << res); + + // for the messages sent after closeAsync was invoked, they + // should all return ResultAlreadyClosed + if (i >= closedAt) { + ASSERT_EQ(ResultAlreadyClosed, res); + } + } + + client.close(); + } +} \ No newline at end of file diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index ab507ac88f154..aed7096366ad8 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -61,6 +61,11 @@ class PulsarFriend { return *producerImpl; } + static ProducerImpl& getInternalProducerImpl(Producer producer, int index) { + PartitionedProducerImpl* producerImpl = static_cast(producer.impl_.get()); + return *(producerImpl->producers_[index]); + } + static void producerFailMessages(Producer producer, Result result) { producer.producerFailMessages(result); } From 1f54174feea0f944801774a217bd5d49b4274449 Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Tue, 10 Aug 2021 15:27:24 +0200 Subject: [PATCH 2/6] Lazy producers start send timeout timer on first message Lazy producers connect on demand on their first message. The send timeout timer must be started on the first message as there is no guarantee the connection will complete --- pulsar-client-cpp/lib/ProducerImpl.cc | 34 +++++++++++++++++++-------- pulsar-client-cpp/lib/ProducerImpl.h | 2 ++ 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 67046173e3d7d..c5c11a5671f33 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -212,14 +212,9 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r shared_from_this(), std::placeholders::_1)); } - // Initialize the sendTimer only once per producer and only when producer timeout is - // configured. Set the timeout as configured value and asynchronously wait for the - // timeout to happen. - if (!sendTimer_ && conf_.getSendTimeout() > 0) { - sendTimer_ = executor_->createDeadlineTimer(); - sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout())); - sendTimer_->async_wait( - std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1)); + // if the producer is lazy the send timeout timer is already running + if (!conf_.getLazyStartPartitionedProducers()) { + startSendTimeoutTimer(); } producerCreatedPromise_.setValue(shared_from_this()); @@ -684,7 +679,7 @@ uint64_t ProducerImpl::getProducerId() const { return producerId_; } void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) { Lock lock(mutex_); - if (state_ != Ready) { + if (state_ != Pending && state_ != Ready) { return; } @@ -826,7 +821,15 @@ void ProducerImpl::disconnectProducer() { scheduleReconnection(shared_from_this()); } -void ProducerImpl::start() { HandlerBase::start(); } +void ProducerImpl::start() { + HandlerBase::start(); + + if (conf_.getLazyStartPartitionedProducers()) { + // we need to kick it off now as it is possible that the connection may take + // longer than sendTimeout to connect + startSendTimeoutTimer(); + } +} void ProducerImpl::shutdown() { Lock lock(mutex_); @@ -872,6 +875,17 @@ bool ProducerImpl::isStarted() const { Lock lock(mutex_); return state_ != NotStarted; } +void ProducerImpl::startSendTimeoutTimer() { + // Initialize the sendTimer only once per producer and only when producer timeout is + // configured. Set the timeout as configured value and asynchronously wait for the + // timeout to happen. + if (!sendTimer_ && conf_.getSendTimeout() > 0) { + sendTimer_ = executor_->createDeadlineTimer(); + sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout())); + sendTimer_->async_wait( + std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1)); + } +} } // namespace pulsar /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index a819edc6c09a0..fdf2cf17cea37 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -94,6 +94,8 @@ class ProducerImpl : public HandlerBase, void batchMessageTimeoutHandler(const boost::system::error_code& ec); + void startSendTimeoutTimer(); + friend class PulsarFriend; friend class Producer; From adf13f6c50e9e436f1b4923112b6cd6183401c22 Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Tue, 10 Aug 2021 20:09:31 +0200 Subject: [PATCH 3/6] Added documentation for cpp client lazy producers --- site2/docs/client-libraries-cpp.md | 229 +++++++++++++++++++++++++---- 1 file changed, 202 insertions(+), 27 deletions(-) diff --git a/site2/docs/client-libraries-cpp.md b/site2/docs/client-libraries-cpp.md index 7184e76637952..8473029628066 100644 --- a/site2/docs/client-libraries-cpp.md +++ b/site2/docs/client-libraries-cpp.md @@ -253,52 +253,227 @@ pulsar+ssl://pulsar.us-west.example.com:6651 ## Create a consumer -To use Pulsar as a consumer, you need to create a consumer on the C++ client. The following is an example. +To use Pulsar as a consumer, you need to create a consumer on the C++ client. There are two main ways of using the consumer: +- Blocking style: synchronously calling `receive(msg)`. +- Non-blocking (event based) style: using a message listener. + +### Blocking example + +The benefit of this approach is that it is the simplest code. Simply keeps calling `receive(msg)` which blocks until a message is received. + +This example starts a subscription at the earliest offset and consumes 100 messages. ```c++ -Client client("pulsar://localhost:6650"); +#include + +using namespace pulsar; + +int main() { + Client client("pulsar://localhost:6650"); -Consumer consumer; -Result result = client.subscribe("my-topic", "my-subscription-name", consumer); -if (result != ResultOk) { - LOG_ERROR("Failed to subscribe: " << result); - return -1; + Consumer consumer; + ConsumerConfiguration config; + config.setSubscriptionInitialPosition(InitialPositionEarliest); + Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer); + if (result != ResultOk) { + std::cout << "Failed to subscribe: " << result << std::endl; + return -1; + } + + Message msg; + int ctr = 0; + // consume 100 messages + while (ctr < 100) { + consumer.receive(msg); + std::cout << "Received: " << msg + << " with payload '" << msg.getDataAsString() << "'" << std::endl; + + consumer.acknowledge(msg); + ctr++; + } + + std::cout << "Finished consuming synchronously!" << std::endl; + + client.close(); + return 0; } +``` + +### Consumer with a message listener + +We can avoid the need to run a loop with blocking calls with an event based style by using a message listener which is invoked for each message that is received. + +This example starts a subscription at the earliest offset and consumes 100 messages. -Message msg; +```c++ +#include +#include +#include + +using namespace pulsar; -while (true) { - consumer.receive(msg); - LOG_INFO("Received: " << msg - << " with payload '" << msg.getDataAsString() << "'"); +std::atomic messagesReceived; - consumer.acknowledge(msg); +void handleAckComplete(Result res) { + std::cout << "Ack res: " << res << std::endl; } -client.close(); +void listener(Consumer consumer, const Message& msg) { + std::cout << "Got message " << msg << " with content '" << msg.getDataAsString() << "'" << std::endl; + messagesReceived++; + consumer.acknowledgeAsync(msg.getMessageId(), handleAckComplete); +} + +int main() { + Client client("pulsar://localhost:6650"); + + Consumer consumer; + ConsumerConfiguration config; + config.setMessageListener(listener); + config.setSubscriptionInitialPosition(InitialPositionEarliest); + Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer); + if (result != ResultOk) { + std::cout << "Failed to subscribe: " << result << std::endl; + return -1; + } + + // wait for 100 messages to be consumed + while (messagesReceived < 100) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::cout << "Finished consuming asynchronously!" << std::endl; + + client.close(); + return 0; +} ``` ## Create a producer -To use Pulsar as a producer, you need to create a producer on the C++ client. The following is an example. +To use Pulsar as a producer, you need to create a producer on the C++ client. There are two main ways of using a producer: +- Blocking style where each call to `send` waits for an ack from the broker. +- Non-blocking asynchronous style where `sendAsync` is called instead of `send` and a callback is supplied for when the ack is received from the broker. + +### Simple blocking example + +This example sends 100 messages using the blocking style. While simple, it does not produce high throughput as it waits for each ack to come back before sending the next message. + +```c++ +#include +#include + +using namespace pulsar; + +int main() { + Client client("pulsar://localhost:6650"); + + Result result = client.createProducer("persistent://public/default/my-topic", producer); + if (result != ResultOk) { + std::cout << "Error creating producer: " << result << std::endl; + return -1; + } + + // Send 100 messages synchronously + int ctr = 0; + while (ctr < 100) { + std::string content = "msg" + std::to_string(ctr); + Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build(); + Result result = producer.send(msg); + if (result != ResultOk) { + std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl; + } else { + std::cout << "The message " << content << " sent successfully" << std::endl; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ctr++; + } + + std::cout << "Finished producing synchronously!" << std::endl; + + client.close(); + return 0; +} +``` + +### Non-blocking example + +This example sends 100 messages using the non-blocking style calling `sendAsync` instead of `send`. This allows the producer to have multiple messages inflight at a time which increases throughput. + +The producer configuration `blockIfQueueFull` is useful here to avoid `ResultProducerQueueIsFull` errors when the internal queue for outgoing send requests becomes full. Once the internal queue is full, `sendAsync` becomes blocking which can make your code simpler. + +Without this configuration, the result code `ResultProducerQueueIsFull` is passed to the callback. You must decide how to deal with that (retry, discard etc). ```c++ -Client client("pulsar://localhost:6650"); +#include +#include + +using namespace pulsar; -Producer producer; -Result result = client.createProducer("my-topic", producer); -if (result != ResultOk) { - LOG_ERROR("Error creating producer: " << result); - return -1; +std::atomic acksReceived; + +void callback(Result code, const MessageId& msgId, std::string msgContent) { + // message processing logic here + std::cout << "Received ack for msg: " << msgContent << " with code: " + << code << " -- MsgID: " << msgId << std::endl; + acksReceived++; } -// Publish 10 messages to the topic -for (int i = 0; i < 10; i++){ - Message msg = MessageBuilder().setContent("my-message").build(); - Result res = producer.send(msg); - LOG_INFO("Message sent: " << res); +int main() { + Client client("pulsar://localhost:6650"); + + ProducerConfiguration producerConf; + producerConf.setBlockIfQueueFull(true); + Producer producer; + Result result = client.createProducer("persistent://public/default/my-topic", + producerConf, producer); + if (result != ResultOk) { + std::cout << "Error creating producer: " << result << std::endl; + return -1; + } + + // Send 100 messages asynchronously + int ctr = 0; + while (ctr < 100) { + std::string content = "msg" + std::to_string(ctr); + Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build(); + producer.sendAsync(msg, std::bind(callback, + std::placeholders::_1, std::placeholders::_2, content)); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + ctr++; + } + + // wait for 100 messages to be acked + while (acksReceived < 100) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + std::cout << "Finished producing asynchronously!" << std::endl; + + client.close(); + return 0; } -client.close(); +``` + +### Partitioned topics and lazy producers + +When scaling out a Pulsar topic, you may configure a topic to have hundreds of partitions. Likewise, you may have also scaled out your producers so there are hundreds or even thousands of producers. This can put some strain on the Pulsar brokers as when you create a producer on a partitioned topic, internally it creates one internal producer per partition which involves communications to the brokers for each one. So for a topic with 1000 partitions and 1000 producers, it ends up creating 1,000,000 internal producers across the producer applications, each of which has to communicate with a broker to find out which broker it should connect to and then perform the connection handshake. + +You can reduce the load caused by this combination of a large number of partitions and many producers by doing the following: +- use SinglePartition partition routing mode (this ensures that all messages are only sent to a single, randomly selected partition) +- use non-keyed messages (when messages are keyed, routing is based on the hash of the key and so messages will end up being sent to multiple partitions) +- use lazy producers (this ensures that an internal producer is only created on demand when a message needs to be routed to a partition) + +With our example above, that reduces the number of internal producers spread out over the 1000 producer apps from 1,000,000 to just 1000. + +Note that there can be extra latency for the first message sent. If you set a low send timeout, this timeout could be reached if the initial connection handshake is slow to complete. + +```c++ +ProducerConfiguration producerConf; +producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition); +producerConf.setLazyStartPartitionedProducers(true); ``` ## Enable authentication in connection URLs From eb79fb0689c2b57a1c199632c81460b34615798a Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Wed, 11 Aug 2021 16:25:30 +0200 Subject: [PATCH 4/6] Fixed additional race condition in lazy producers Ensure that the state is checked always in PartitionedProducer::sendAsync to avoid buffering of messages after closeAsync called. Removed sequential locking in ProducerImpl:: handleCreateProducer that allowed for state to go back to Ready after closeAsync called. --- .../lib/PartitionedProducerImpl.cc | 12 +++++----- pulsar-client-cpp/lib/Producer.cc | 2 +- pulsar-client-cpp/lib/ProducerImpl.cc | 23 +++++++++++-------- pulsar-client-cpp/lib/ProducerImpl.h | 2 +- pulsar-client-cpp/tests/ProducerTest.cc | 2 ++ 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index b2d9ce6bd7b43..70ace08b1aa60 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -170,6 +170,11 @@ void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partition // override void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) { + if (!assertState(Ready)) { + callback(ResultAlreadyClosed, msg.getMessageId()); + return; + } + // get partition for this message from router policy Lock producersLock(producersMutex_); short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_)); @@ -184,13 +189,8 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac ProducerImplPtr producer = producers_[partition]; // if the producer is not started (lazy producer), then kick-off the start process - // but only if we're still in the Ready state (i.e not closing or closed) if (!producer->isStarted()) { - if (assertState(Ready)) { - producer->start(); - } else { - callback(ResultAlreadyClosed, msg.getMessageId()); - } + producer->start(); } producersLock.unlock(); diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index 26f6ee84d59e6..acd021b1165ee 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -114,7 +114,7 @@ void Producer::flushAsync(FlushCallback callback) { void Producer::producerFailMessages(Result result) { if (impl_) { ProducerImpl* producerImpl = static_cast(impl_.get()); - producerImpl->failPendingMessages(result); + producerImpl->failPendingMessages(result, true); } } diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index c5c11a5671f33..529cce19920da 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -177,19 +177,16 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r // while waiting for this response if using lazy producers Lock lock(mutex_); if (state_ != Ready && state_ != Pending) { - lock.unlock(); LOG_DEBUG("Producer created response received but producer already closed"); - failPendingMessages(ResultAlreadyClosed); + failPendingMessages(ResultAlreadyClosed, false); return; } - lock.unlock(); if (result == ResultOk) { // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString()); - Lock lock(mutex_); cnx->registerProducer(producerId_, shared_from_this()); producerName_ = responseData.producerName; schemaVersion_ = responseData.schemaVersion; @@ -220,6 +217,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r producerCreatedPromise_.setValue(shared_from_this()); } else { + lock.unlock(); + // Producer creation failed if (result == ResultTimeout) { // Creating the producer has timed out. We need to ensure the broker closes the producer @@ -232,7 +231,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r if (producerCreatedPromise_.isComplete()) { if (result == ResultProducerBlockedQuotaExceededException) { LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer"); - failPendingMessages(ResultProducerBlockedQuotaExceededException); + failPendingMessages(ResultProducerBlockedQuotaExceededException, true); } else if (result == ResultProducerBlockedQuotaExceededError) { LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic"); } @@ -286,8 +285,12 @@ std::shared_ptr ProducerImpl::getPendingCallback return getPendingCallbacksWhenFailed(); } -void ProducerImpl::failPendingMessages(Result result) { - getPendingCallbacksWhenFailedWithLock()->complete(result); +void ProducerImpl::failPendingMessages(Result result, bool withLock) { + if (withLock) { + getPendingCallbacksWhenFailedWithLock()->complete(result); + } else { + getPendingCallbacksWhenFailed()->complete(result); + } } void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { @@ -602,6 +605,9 @@ void ProducerImpl::closeAsync(CloseCallback callback) { cancelTimers(); + // ensure any remaining send callbacks are called before calling the close callback + failPendingMessages(ResultAlreadyClosed, false); + if (state_ != Ready && state_ != Pending) { state_ = Closed; lock.unlock(); @@ -663,9 +669,6 @@ void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerI LOG_ERROR(getName() << "Failed to close producer: " << strResult(result)); } - // ensure any remaining send callbacks are called before calling the close callback - failPendingMessages(ResultAlreadyClosed); - if (callback) { callback(result); } diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index fdf2cf17cea37..d29efed1a13ae 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -162,7 +162,7 @@ class ProducerImpl : public HandlerBase, std::shared_ptr getPendingCallbacksWhenFailed(); std::shared_ptr getPendingCallbacksWhenFailedWithLock(); - void failPendingMessages(Result result); + void failPendingMessages(Result result, bool withLock); MessageCryptoPtr msgCrypto_; DeadlineTimerPtr dataKeyGenTImer_; diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc index 9b5a8ab53ccf7..210f01345d4af 100644 --- a/pulsar-client-cpp/tests/ProducerTest.cc +++ b/pulsar-client-cpp/tests/ProducerTest.cc @@ -163,6 +163,7 @@ TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) { // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed. for (int run = 0; run < 20; run++) { + LOG_INFO("Start of run " << run); Client client(serviceUrl); const std::string partitionedTopic = "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr)); @@ -237,5 +238,6 @@ TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) { } client.close(); + LOG_INFO("End of run " << run); } } \ No newline at end of file From 87224b1eea76e6b0e547afe0d3c206b3056f25f5 Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Fri, 13 Aug 2021 13:23:37 +0200 Subject: [PATCH 5/6] Ensure one producer starts when lazy In order to ensure than authz errors are returned during producer creation, start one producer eagerly when using lazy partitioned producers. When UseSinglePartition mode is used, the eagerly created producer will be the only producer created (when non-keyed messages are sent). --- .../include/pulsar/ProducerConfiguration.h | 10 ++++--- .../lib/PartitionedProducerImpl.cc | 28 ++++++++++++++----- .../lib/PartitionedProducerImpl.h | 2 +- pulsar-client-cpp/python/pulsar/__init__.py | 10 ++++--- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 9 ++++-- 5 files changed, 40 insertions(+), 19 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 9148c2c8476b3..5c2792aadeaa3 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -265,12 +265,14 @@ class PULSAR_PUBLIC ProducerConfiguration { /** * This config affects producers of partitioned topics only. It controls whether * producers register and connect immediately to the owner broker of each partition - * or start lazily on demand. Lazy starts occur when a message needs to be routed - * to a partition that the producer has not yet registered and connected to. + * or start lazily on demand. The internal producer of one partition is always + * started eagerly, chosen by the routing policy, but the internal producers of + * any additional partitions are started on demand, upon receiving their first + * message. * Using this mode can reduce the strain on brokers for topics with large numbers of * partitions and when the SinglePartition routing policy is used without keyed messages. - * Because producer registration and connection is on demand, this can produce extra - * latency while the registration is being carried out. + * Because producer connection can be on demand, this can produce extra send latency + * for the first messages of a given partition. * @param true/false as to whether to start partition producers lazily * @return */ diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 70ace08b1aa60..94bf3534f9bda 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -87,12 +87,12 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const { return getNumPartitions(); } -ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) { +ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) { using namespace std::placeholders; std::string topicPartitionName = topicName_->getTopicPartitionName(partition); auto producer = std::make_shared(client_, topicPartitionName, conf_, partition); - if (conf_.getLazyStartPartitionedProducers()) { + if (lazy) { createLazyPartitionProducer(partition); } else { producer->getProducerCreatedFuture().addListener( @@ -109,11 +109,25 @@ void PartitionedProducerImpl::start() { // create producer per partition // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased // when `state_` is Ready - for (unsigned int i = 0; i < getNumPartitions(); i++) { - producers_.push_back(newInternalProducer(i)); - } - if (!conf_.getLazyStartPartitionedProducers()) { + if (conf_.getLazyStartPartitionedProducers()) { + // start one producer now, to ensure authz errors occur now + // if the SinglePartition router is used, then this producer will serve + // all non-keyed messages in the future + Message msg = MessageBuilder().setContent("x").build(); + short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_)); + + for (unsigned int i = 0; i < getNumPartitions(); i++) { + bool lazy = (short)i != partition; + producers_.push_back(newInternalProducer(i, lazy)); + } + + producers_[partition]->start(); + } else { + for (unsigned int i = 0; i < getNumPartitions(); i++) { + producers_.push_back(newInternalProducer(i, false)); + } + for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { (*prod)->start(); } @@ -399,7 +413,7 @@ void PartitionedProducerImpl::handleGetPartitions(Result result, topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions)); for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) { - auto producer = newInternalProducer(i); + auto producer = newInternalProducer(i, conf_.getLazyStartPartitionedProducers()); if (!conf_.getLazyStartPartitionedProducers()) { producer->start(); diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h index de8144b7220e8..60881f2ccc7eb 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase, unsigned int getNumPartitions() const; unsigned int getNumPartitionsWithLock() const; - ProducerImplPtr newInternalProducer(unsigned int partition); + ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy); MessageRoutingPolicyPtr routerPolicy_; diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 8e26adf8e6314..385d7ca914948 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -522,12 +522,14 @@ def create_producer(self, topic, * `lazy_start_partitioned_producers`: This config affects producers of partitioned topics only. It controls whether producers register and connect immediately to the owner broker of each partition - or start lazily on demand. Lazy starts occur when a message needs to be routed - to a partition that the producer has not yet registered and connected to. + or start lazily on demand. The internal producer of one partition is always + started eagerly, chosen by the routing policy, but the internal producers of + any additional partitions are started on demand, upon receiving their first + message. Using this mode can reduce the strain on brokers for topics with large numbers of partitions and when the SinglePartition routing policy is used without keyed messages. - Because producer registration and connection is on demand, this can produce extra - latency while the registration is being carried out. + Because producer connection can be on demand, this can produce extra send latency + for the first messages of a given partition. * `properties`: Sets the properties for the producer. The properties associated with a producer can be used for identify a producer at broker side. diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index a3ff1c50c0526..f3ff78af3dd3c 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -2534,6 +2534,8 @@ void testSyncFlushBatchMessagesPartitionedTopic(bool lazyStartPartitionedProduce Producer producer; int numOfMessages = 20; + // lazy partitioned producers make a single call to the message router during createProducer + int initPart = lazyStartPartitionedProducers ? 1 : 0; ProducerConfiguration tempProducerConfiguration; tempProducerConfiguration.setMessageRouter(std::make_shared()); ProducerConfiguration producerConfiguration = tempProducerConfiguration; @@ -2583,7 +2585,7 @@ void testSyncFlushBatchMessagesPartitionedTopic(bool lazyStartPartitionedProduce LOG_INFO("sending first part messages in async, should timeout to receive"); Message m; - ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 5000)); + ASSERT_EQ(ResultTimeout, consumer[initPart].receive(m, 5000)); for (int i = numOfMessages / numberOfPartitions / 2; i < numOfMessages; i++) { std::string messageContent = prefix + std::to_string(i); @@ -2607,11 +2609,12 @@ void testSyncFlushBatchMessagesPartitionedTopic(bool lazyStartPartitionedProduce std::string messageContent = prefix + std::to_string(i); Message msg = MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build(); - producer.send(msg); + ASSERT_EQ(ResultOk, producer.send(msg)); LOG_DEBUG("sync sending message " << messageContent); } + LOG_INFO("sending first part messages in sync, should not timeout to receive"); - ASSERT_EQ(ResultOk, consumer[0].receive(m, 5000)); + ASSERT_EQ(ResultOk, consumer[initPart].receive(m, 10000)); producer.close(); client.shutdown(); From 59f989aebd50cb27335c03f64a662810fc60515e Mon Sep 17 00:00:00 2001 From: Jack Vanlightly Date: Fri, 13 Aug 2021 13:54:34 +0200 Subject: [PATCH 6/6] Fail pending messages if producer creation fails In the case that an internal producer is created lazily and it fails, then fail any pending requests immediately. --- pulsar-client-cpp/lib/ProducerImpl.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 529cce19920da..f81e205475de4 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -246,6 +246,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r scheduleReconnection(shared_from_this()); } else { LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); + failPendingMessages(result, true); producerCreatedPromise_.setFailed(result); Lock lock(mutex_); state_ = Failed;