Skip to content

Commit

Permalink
fix(pubsub): faster shutdowns for Publisher (#9991)
Browse files Browse the repository at this point in the history
Cancel the timer (if any) before shutting down.  This can be a problem
for applications that have very long batches (say multiple seconds or
minutes).
  • Loading branch information
coryan authored Oct 7, 2022
1 parent fb40389 commit 5792587
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 9 deletions.
26 changes: 17 additions & 9 deletions google/cloud/pubsub/internal/batching_publisher_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ namespace cloud {
namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

// A helper callable to handle a response, it is a bit large for a lambda, and
// we need move-capture anyways.
// A helper callable to handle a response, it is a bit large for a lambda.
struct Batch {
std::vector<promise<StatusOr<std::string>>> waiters;
std::weak_ptr<BatchingPublisherConnection> weak;
Expand Down Expand Up @@ -49,6 +48,10 @@ struct Batch {
}
};

BatchingPublisherConnection::~BatchingPublisherConnection() {
if (timer_.valid()) timer_.cancel();
}

future<StatusOr<std::string>> BatchingPublisherConnection::Publish(
PublishParams p) {
auto const bytes = MessageSize(p.message);
Expand Down Expand Up @@ -134,8 +137,8 @@ void BatchingPublisherConnection::MaybeFlush(std::unique_lock<std::mutex> lk) {
return;
}
// If the batch is empty obviously we do not need a timer, and if it has more
// than one element then we have setup a timer previously and there is no need
// to set it again.
// than one element then we have set up a timer previously and there is no
// need to set it again.
if (pending_.messages_size() != 1) return;
auto const expiration = batch_expiration_ =
std::chrono::system_clock::now() + opts_.get<pubsub::MaxHoldTimeOption>();
Expand All @@ -146,11 +149,16 @@ void BatchingPublisherConnection::MaybeFlush(std::unique_lock<std::mutex> lk) {
// `weak_from_this()`.
auto weak = std::weak_ptr<BatchingPublisherConnection>(shared_from_this());
// Note that at this point the lock is released, so whether the timer
// schedules later on schedules in this thread has no effect.
cq_.MakeDeadlineTimer(expiration)
.then([weak](future<StatusOr<std::chrono::system_clock::time_point>>) {
if (auto self = weak.lock()) self->OnTimer();
});
// schedules later on schedules in this thread has no effect. In addition,
// the assignment to `timer_` is safe. It is only used from the destructor,
// and if the destructor and this function are running at the same time, then
// we had a massive problem already.
timer_ =
cq_.MakeDeadlineTimer(expiration)
.then(
[weak](future<StatusOr<std::chrono::system_clock::time_point>>) {
if (auto self = weak.lock()) self->OnTimer();
});
}

void BatchingPublisherConnection::OnTimer() {
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/pubsub/internal/batching_publisher_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class BatchingPublisherConnection
: public pubsub::PublisherConnection,
public std::enable_shared_from_this<BatchingPublisherConnection> {
public:
~BatchingPublisherConnection() override;

static std::shared_ptr<BatchingPublisherConnection> Create(
pubsub::Topic topic, Options opts, std::string ordering_key,
std::shared_ptr<BatchSink> sink, CompletionQueue cq) {
Expand Down Expand Up @@ -79,6 +81,7 @@ class BatchingPublisherConnection
google::pubsub::v1::PublishRequest pending_;
std::size_t current_bytes_ = 0;
std::chrono::system_clock::time_point batch_expiration_;
future<void> timer_;

Status corked_on_status_;
};
Expand Down
34 changes: 34 additions & 0 deletions google/cloud/pubsub/internal/batching_publisher_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,40 @@ std::vector<std::string> MessagesData(
return data;
}

TEST(BatchingPublisherConnectionTest, FastDestructor) {
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
pubsub::Topic const topic("test-project", "test-topic");

AsyncSequencer<void> async;
// This test will never get a chance to flush its message.
EXPECT_CALL(*mock, AsyncPublish).Times(0);

google::cloud::internal::AutomaticallyCreatedBackgroundThreads background;
// Make this so large that the test times out before the message hold expires.
// This ensures that the two messages will be sent in one batch.
auto constexpr kMaxHoldTime = std::chrono::hours(24);
auto const ordering_key = std::string{};
auto publisher = BatchingPublisherConnection::Create(
topic,
DefaultPublisherOptions(
Options{}
.set<pubsub::MaxBatchMessagesOption>(4)
.set<pubsub::MaxHoldTimeOption>(kMaxHoldTime)),
ordering_key, mock, background.cq());

// Publishing a message starts the batch timer.
auto pending = publisher->Publish(
{pubsub::MessageBuilder{}.SetData("test-data-0").Build()});

auto const start = std::chrono::steady_clock::now();
publisher.reset();
auto const elapsed = std::chrono::steady_clock::now() - start;
// Considering that the timer is configured to wait 24 hours, shutting down in
// 30s is good enough. It also avoids flakiness introduced by more precise
// measurements.
EXPECT_LE(elapsed, std::chrono::seconds(30));
}

TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) {
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
pubsub::Topic const topic("test-project", "test-topic");
Expand Down

0 comments on commit 5792587

Please sign in to comment.