Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 11496][C++] Allow partitioned producers to start lazily #11570

Merged
merged 6 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,27 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
HashingScheme getHashingScheme() const;

/**
* This config affects producers of partitioned topics only. It controls whether
* producers register and connect immediately to the owner broker of each partition
* or start lazily on demand. The internal producer of one partition is always
* started eagerly, chosen by the routing policy, but the internal producers of
* any additional partitions are started on demand, upon receiving their first
* message.
* Using this mode can reduce the strain on brokers for topics with large numbers of
* partitions and when the SinglePartition routing policy is used without keyed messages.
* Because producer connection can be on demand, this can produce extra send latency
* for the first messages of a given partition.
* @param true/false as to whether to start partition producers lazily
* @return
*/
ProducerConfiguration& setLazyStartPartitionedProducers(bool);

/**
* The getter associated with setLazyStartPartitionedProducers()
*/
bool getLazyStartPartitionedProducers() const;

/**
* The setter associated with getBlockIfQueueFull()
*/
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ
PULSAR_PUBLIC pulsar_hashing_scheme
pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers);

PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
pulsar_producer_configuration_t *conf);

PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full(
pulsar_producer_configuration_t *conf, int blockIfQueueFull);

Expand Down
14 changes: 12 additions & 2 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
mutex_(),
creationTimestamp_(TimeUtils::now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
state_(NotStarted),
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }

void HandlerBase::start() { grabCnx(); }
void HandlerBase::start() {
Lock lock(mutex_);
// guard against concurrent state changes such as closing
if (state_ == NotStarted) {
state_ = Pending;
lock.unlock();

grabCnx();
}
}

void HandlerBase::grabCnx() {
Lock lock(mutex_);
Expand Down Expand Up @@ -106,6 +115,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
scheduleReconnection(handler);
break;

case NotStarted:
case Closing:
case Closed:
case Failed:
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class HandlerBase {

enum State
{
NotStarted,
Pending,
Ready,
Closing,
Expand Down
91 changes: 76 additions & 15 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
return getNumPartitions();
}

ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
using namespace std::placeholders;
std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));

if (lazy) {
createLazyPartitionProducer(partition);
} else {
producer->getProducerCreatedFuture().addListener(
std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
}

LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
return producer;
Expand All @@ -104,12 +109,28 @@ void PartitionedProducerImpl::start() {
// create producer per partition
// Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
// when `state_` is Ready
for (unsigned int i = 0; i < getNumPartitions(); i++) {
producers_.push_back(newInternalProducer(i));
}

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
if (conf_.getLazyStartPartitionedProducers()) {
// start one producer now, to ensure authz errors occur now
// if the SinglePartition router is used, then this producer will serve
// all non-keyed messages in the future
Message msg = MessageBuilder().setContent("x").build();
short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));

for (unsigned int i = 0; i < getNumPartitions(); i++) {
bool lazy = (short)i != partition;
producers_.push_back(newInternalProducer(i, lazy));
}

producers_[partition]->start();
} else {
for (unsigned int i = 0; i < getNumPartitions(); i++) {
producers_.push_back(newInternalProducer(i, false));
}

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->start();
}
}
}

Expand Down Expand Up @@ -147,8 +168,27 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
}
}

void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) {
const auto numPartitions = getNumPartitions();
assert(numProducersCreated_ <= numPartitions);
assert(partitionIndex <= numPartitions);
numProducersCreated_++;
if (numProducersCreated_ == numPartitions) {
state_ = Ready;
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
partitionedProducerCreatedPromise_.setValue(shared_from_this());
}
}

// override
void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (!assertState(Ready)) {
callback(ResultAlreadyClosed, msg.getMessageId());
return;
}

// get partition for this message from router policy
Lock producersLock(producersMutex_);
short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
Expand All @@ -161,7 +201,14 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// find a producer for that partition, index should start from 0
ProducerImplPtr producer = producers_[partition];

// if the producer is not started (lazy producer), then kick-off the start process
if (!producer->isStarted()) {
producer->start();
Vanlightly marked this conversation as resolved.
Show resolved Hide resolved
}

producersLock.unlock();

// send message on that partition
producer->sendAsync(msg, callback);
}
Expand All @@ -175,6 +222,11 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
lock.unlock();
}

bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) {
Lock lock(mutex_);
return state_ == state;
}

const std::string& PartitionedProducerImpl::getProducerName() const {
Lock producersLock(producersMutex_);
return producers_[0]->getProducerName();
Expand Down Expand Up @@ -285,7 +337,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
void PartitionedProducerImpl::triggerFlush() {
Lock producersLock(producersMutex_);
for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->triggerFlush();
if ((*prod)->isStarted()) {
(*prod)->triggerFlush();
}
}
}

Expand Down Expand Up @@ -322,7 +376,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
};

for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
(*prod)->flushAsync(subFlushCallback);
if ((*prod)->isStarted()) {
(*prod)->flushAsync(subFlushCallback);
} else {
subFlushCallback(ResultOk);
}
}
}

Expand Down Expand Up @@ -355,8 +413,11 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));

for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
auto producer = newInternalProducer(i);
producer->start();
auto producer = newInternalProducer(i, conf_.getLazyStartPartitionedProducers());

if (!conf_.getLazyStartPartitionedProducers()) {
producer->start();
}
producers_.push_back(producer);
}
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
Expand All @@ -379,8 +440,8 @@ bool PartitionedProducerImpl::isConnected() const {
Lock producersLock(producersMutex_);
const auto producers = producers_;
producersLock.unlock();
for (const auto& producer : producers_) {
if (!producer->isConnected()) {
for (const auto& producer : producers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the producer has been created, but no messages have been sent, this will return true, even though there has been no connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The question is what people do with this method. Is it an indicator of a problem? Without lazy producers, false means we were once connected but now aren't and so can be considered a transient problem. But with lazy producers, false may mean we've not even tried, so false does not mean anything. So I decided to return true to signal absence of a problem.

Definitely interested to hear opinions on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I have no strong opinion here. "connected" is a weird concept in a distributed system like this.

if (producer->isStarted() && !producer->isConnected()) {
return false;
}
}
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
uint64_t getNumberOfConnectedProducer() override;
void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
const unsigned int partitionIndex);

void createLazyPartitionProducer(const unsigned int partitionIndex);
void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
CloseCallback callback);

Expand Down Expand Up @@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;

ProducerImplPtr newInternalProducer(unsigned int partition) const;
ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);

MessageRoutingPolicyPtr routerPolicy_;

Expand All @@ -129,6 +129,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
bool assertState(const PartitionedProducerState state);
};

} // namespace pulsar
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void Producer::flushAsync(FlushCallback callback) {
void Producer::producerFailMessages(Result result) {
if (impl_) {
ProducerImpl* producerImpl = static_cast<ProducerImpl*>(impl_.get());
producerImpl->failPendingMessages(result);
producerImpl->failPendingMessages(result, true);
}
}

Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}

ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers(
bool useLazyStartPartitionedProducers) {
impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers;
return *this;
}

bool ProducerConfiguration::getLazyStartPartitionedProducers() const {
return impl_->useLazyStartPartitionedProducers;
}

ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
impl_->schemaInfo = schemaInfo;
return *this;
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct ProducerConfigurationImpl {
ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
MessageRoutingPolicyPtr messageRouter;
ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
bool useLazyStartPartitionedProducers{false};
bool blockIfQueueFull{false};
bool batchingEnabled{true};
unsigned int batchingMaxMessages{1000};
Expand Down
Loading