-
Hi We are using the pubsub client library to send messages from a C++ application to pubsub topics in GCP and the functionality is working fine most of the time. We are aware of the fact that the Publish() function is synchronous in nature and blocks until there is a final result. It blocks and finally invokes the callback function (via the future object returned by Publish()) in a separate thread. Most of the time it returns immediately as message is dispatched successfully. Sometimes it takes up to few secs (4 sec max as configured) due to attempts on failure. But there was an occasion when the Publish() did not return and blocked forever. Because of the application thread that is responsible for publishing messages was deadlocked and we had ultimately to restart the application which caused massive service failure. I am curious to know if anyone else came across this situation? Is there a solution to this problem? Is there a setting to return after few seconds in case of failure? We are already using the
Regards, |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 4 replies
-
I assume you are referring to: It should not be synchronous. Normally control should return to the application as soon as the message is queued. The callback function is invoked when the response for a batch of messages is acked by the service. The operation may block if the queue is full (or becomes full). In that case this option may be useful: https://cloud.google.com/cpp/docs/reference/pubsub/latest/structgoogle_1_1cloud_1_1pubsub_1_1FullPublisherActionOption The default is to block until there is room in the queue for this message.
Ugh, I am sorry to hear that.
See above. You can configure the client to reject messages that will fill the queue, or to simply ignore full queues, though this is only suitable if you don't care about message ordering. There is always the possibility that there is a bug in the library. But we may need more details to help troubleshoot this, such as the batch settings, the retry settings, the size of the message that cause the problem, etc. |
Beta Was this translation helpful? Give feedback.
-
Dear Coryan Thanks for your help on this topic! The code you suggested did indeed get rid of the blocking behaviour.
I introduce a map and a mutex static unordered_map<std::string, std::shared_ptr<pubsub::PublisherConnection>> pubsubConnections;
static std::shared_mutex pubsubConnectionsMutex; and the callback I currently have is: void logResult(google::cloud::StatusOr<std::string> id, std::string const& topic, std::string const& eventName) {
if (!id) { // in case of error retrieve error status and log error
auto error = std::move(id).status();
LOG_ERROR("pubsub_send_message: Failed to publish message. event: %s, error_message: %s, error_code: %s, error_info: %s\n",
eventName.c_str(), error.message().c_str(), StatusCodeToString(error.code()).c_str(), error.error_info().reason().c_str());
// Delete the existing connection so that next publish operation will establish a new connection.
auto it = pubsubConnections.find(topic);
if (it != pubsubConnections.end()) {
LOG_INFO("pubsub_send_message: Deleting client connection to the pusub service. topicName: %s\n", topic.c_str());
std::unique_lock<std::shared_mutex> uniqueLock(pubsubConnectionsMutex);
it->second.reset();
pubsubConnections.erase(it);
uniqueLock.unlock();
}
return;
}
// success
LOG_INFO("pubsub_send_message: Successfully published message. event: %s, topicName: %s, messageId: %s\n",
eventName.c_str(), topic.c_str(), id->c_str());
} (void)publisher.Publish(std::move(message)).then([topic, name = std::string(eventName)](auto f) {
logResult(f.get(), topic, name);
});
} As far as I understand, the error message indicates that a resource deadlock was detected and avoided during the shutdown of automatically created background threads in the Google Cloud C++ client library. Is there anyway to circumvent this problem? Thanks in advance and best regards, Moritz |
Beta Was this translation helpful? Give feedback.
The
.then()
call is non-blocking too (mostly see below), the call that is definitely blocking is.get()
.Do you need to call
.get()
here? It does not look like you need to. I would write something like: