Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): limit ModifyAckDeadlineRequest size #10032

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@
#include "google/cloud/pubsub/internal/extend_leases_with_retry.h"
#include "google/cloud/internal/async_retry_loop.h"
#include "google/cloud/log.h"
#include <iterator>
#include <ostream>

namespace google {
namespace cloud {
namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {
// NOLINTNEXTLINE(misc-no-recursion)
future<std::vector<Status>> WaitAll(std::vector<future<Status>> v) {
if (v.empty()) return make_ready_future(std::vector<Status>{});
auto back = std::move(v.back());
v.pop_back();
return WaitAll(std::move(v)).then([b = std::move(back)](auto f) mutable {
return b.then([list = f.get()](auto g) mutable {
list.push_back(g.get());
return list;
});
});
}

future<Status> Reduce(std::vector<future<Status>> v) {
return WaitAll(std::move(v)).then([](auto f) {
auto ready = f.get();
for (auto& s : ready) {
if (!s.ok()) return std::move(s);
}
return Status{};
});
}

} // namespace

StreamingSubscriptionBatchSource::StreamingSubscriptionBatchSource(
CompletionQueue cq,
Expand Down Expand Up @@ -119,8 +145,21 @@ future<Status> StreamingSubscriptionBatchSource::BulkNack(
request.set_subscription(subscription_full_name_);
for (auto& a : ack_ids) *request.add_ack_ids() = std::move(a);
request.set_ack_deadline_seconds(0);
return stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), request);

auto requests =
SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage);
if (requests.size() == 1) {
return stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), requests.front());
}

std::vector<future<Status>> pending(requests.size());
std::transform(requests.begin(), requests.end(), pending.begin(),
[this](auto const& request) {
return stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), request);
});
return Reduce(std::move(pending));
}

void StreamingSubscriptionBatchSource::ExtendLeases(
Expand All @@ -133,14 +172,17 @@ void StreamingSubscriptionBatchSource::ExtendLeases(
request.add_ack_ids(std::move(a));
}
std::unique_lock<std::mutex> lk(mu_);
auto split = SplitModifyAckDeadline(std::move(request), kMaxAckIdsPerMessage);
if (exactly_once_delivery_enabled_.value_or(false)) {
lk.unlock();
(void)ExtendLeasesWithRetry(stub_, cq_, std::move(request));
for (auto& r : split) (void)ExtendLeasesWithRetry(stub_, cq_, std::move(r));
return;
}
lk.unlock();
(void)stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), request);
for (auto& r : split) {
(void)stub_->AsyncModifyAckDeadline(
cq_, absl::make_unique<grpc::ClientContext>(), r);
}
}

void StreamingSubscriptionBatchSource::StartStream(
Expand Down Expand Up @@ -370,7 +412,7 @@ void StreamingSubscriptionBatchSource::ShutdownStream(
lk.unlock();
auto weak = WeakFromThis();
// There are no pending reads or writes, and something (probable a read or
// write error) recommends we shutdown the stream
// write error) recommends we shut down the stream
stream->Finish().then([weak, stream](future<Status> f) {
if (auto self = weak.lock()) self->OnFinish(f.get());
});
Expand Down Expand Up @@ -454,6 +496,30 @@ void StreamingSubscriptionBatchSource::ChangeState(
stream_state_ = s;
}

std::vector<google::pubsub::v1::ModifyAckDeadlineRequest>
SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request,
int max_ack_ids) {
// We expect this to be the common case.
if (request.ack_ids_size() <= max_ack_ids) return {std::move(request)};

std::vector<google::pubsub::v1::ModifyAckDeadlineRequest> result;
auto& source = *request.mutable_ack_ids();
while (request.ack_ids_size() > max_ack_ids) {
google::pubsub::v1::ModifyAckDeadlineRequest r;
r.set_subscription(request.subscription());
r.set_ack_deadline_seconds(request.ack_deadline_seconds());

auto begin = source.begin();
auto end = std::next(source.begin(), max_ack_ids);
r.mutable_ack_ids()->Reserve(max_ack_ids);
for (auto i = begin; i != end; ++i) r.add_ack_ids(std::move(*i));
source.erase(begin, end);
result.push_back(std::move(r));
}
if (!request.ack_ids().empty()) result.push_back(std::move(request));
return result;
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
} // namespace cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ class StreamingSubscriptionBatchSource
kFinishing,
};

// The maximum size for `ModifyAckDeadlineRequest` is 512 KB:
// https://cloud.google.com/pubsub/quotas#resource_limits
// Typical ack ids are less than 200 bytes. This value is safe, but there is
// no need to over optimize it:
// - Google does not charge for these messages
// - The value is reached rarely
// - The CPU costs saved between 2,048 ids per message vs. the theoretical
// maximum are minimal
static int constexpr kMaxAckIdsPerMessage = 2048;

private:
// C++17 adds weak_from_this(), we cannot use the same name as (1) some
// versions of the standard library include `weak_from_this()` even with
Expand Down Expand Up @@ -136,6 +146,11 @@ class StreamingSubscriptionBatchSource
std::ostream& operator<<(std::ostream& os,
StreamingSubscriptionBatchSource::StreamState s);

/// Split @p request such that each request has at most @p max_ack_ids.
std::vector<google::pubsub::v1::ModifyAckDeadlineRequest>
SplitModifyAckDeadline(google::pubsub::v1::ModifyAckDeadlineRequest request,
int max_ack_ids);

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
} // namespace cloud
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
#include "google/cloud/pubsub/subscription.h"
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/credentials.h"
#include "google/cloud/internal/background_threads_impl.h"
#include "google/cloud/log.h"
#include "google/cloud/testing_util/async_sequencer.h"
#include "google/cloud/testing_util/is_proto_equal.h"
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
#include "google/cloud/testing_util/status_matchers.h"
#include <gmock/gmock.h>
Expand All @@ -36,6 +38,7 @@ using ::google::cloud::internal::AutomaticallyCreatedBackgroundThreads;
using ::google::cloud::internal::RunAsyncBase;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::IsOk;
using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::MockCompletionQueueImpl;
using ::google::cloud::testing_util::StatusIs;
using ::testing::_;
Expand All @@ -44,6 +47,7 @@ using ::testing::AtLeast;
using ::testing::AtMost;
using ::testing::ByMove;
using ::testing::ElementsAre;
using ::testing::ElementsAreArray;
using ::testing::HasSubstr;
using ::testing::Property;
using ::testing::Return;
Expand Down Expand Up @@ -115,6 +119,7 @@ std::shared_ptr<StreamingSubscriptionBatchSource> MakeTestBatchSource(
auto subscription = pubsub::Subscription("test-project", "test-subscription");
auto opts = DefaultSubscriberOptions(pubsub_testing::MakeTestOptions(
Options{}
.set<UnifiedCredentialsOption>(MakeInsecureCredentials())
.set<pubsub::MaxOutstandingMessagesOption>(100)
.set<pubsub::MaxOutstandingBytesOption>(100 * 1024 * 1024L)
.set<pubsub::MaxHoldTimeOption>(std::chrono::seconds(300))));
Expand Down Expand Up @@ -1120,6 +1125,208 @@ TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesWithRetry) {
EXPECT_THAT(done.get(), IsOk());
}

TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadlineSmall) {
auto constexpr kMaxIds = 3;

std::vector<std::string> bulk_nacks{"fake-001", "fake-002", "fake-003"};
ModifyRequest request;
request.set_subscription(
"projects/test-project/subscriptions/test-subscription");
request.set_ack_deadline_seconds(12345);
for (auto id : bulk_nacks) request.add_ack_ids(std::move(id));

auto const actual = SplitModifyAckDeadline(request, kMaxIds);
EXPECT_THAT(actual, ElementsAre(IsProtoEqual(request)));
}

TEST(StreamingSubscriptionBatchSourceTest, SplitModifyAckDeadline) {
auto constexpr kMaxIds = 3;

std::vector<std::string> bulk_nacks{
"fake-001", "fake-002", "fake-003", "fake-004",
"fake-005", "fake-006", "fake-007",
};
ModifyRequest request;
request.set_subscription(
"projects/test-project/subscriptions/test-subscription");
request.set_ack_deadline_seconds(12345);
for (auto id : bulk_nacks) request.add_ack_ids(std::move(id));

std::vector<ModifyRequest> expected(3);
for (auto& e : expected) {
e.set_subscription(request.subscription());
e.set_ack_deadline_seconds(request.ack_deadline_seconds());
}
expected[0].add_ack_ids("fake-001");
expected[0].add_ack_ids("fake-002");
expected[0].add_ack_ids("fake-003");

expected[1].add_ack_ids("fake-004");
expected[1].add_ack_ids("fake-005");
expected[1].add_ack_ids("fake-006");

expected[2].add_ack_ids("fake-007");

auto const actual = SplitModifyAckDeadline(std::move(request), kMaxIds);
EXPECT_THAT(actual,
ElementsAre(IsProtoEqual(expected[0]), IsProtoEqual(expected[1]),
IsProtoEqual(expected[2])));
}

std::unique_ptr<pubsub_testing::MockAsyncPullStream> MakeUnusedStream(
bool enable_exactly_once) {
auto start_response = []() { return make_ready_future(true); };
auto write_response = [](google::pubsub::v1::StreamingPullRequest const&,
grpc::WriteOptions const&) {
return make_ready_future(true);
};
auto read_response = [enable_exactly_once]() {
using Response = ::google::pubsub::v1::StreamingPullResponse;
Response response;
if (enable_exactly_once) {
response.mutable_subscription_properties()
->set_exactly_once_delivery_enabled(true);
}
return make_ready_future(absl::make_optional(std::move(response)));
};
auto finish_response = []() { return make_ready_future(Status{}); };

auto stream = absl::make_unique<pubsub_testing::MockAsyncPullStream>();
EXPECT_CALL(*stream, Start).WillOnce(start_response);
EXPECT_CALL(*stream, Write).WillRepeatedly(write_response);
EXPECT_CALL(*stream, Read).WillRepeatedly(read_response);
EXPECT_CALL(*stream, Cancel).Times(AtMost(1));
EXPECT_CALL(*stream, Finish).Times(AtMost(1)).WillRepeatedly(finish_response);
return stream;
}

TEST(StreamingSubscriptionBatchSourceTest, BulkNackMultipleRequests) {
auto constexpr kMaxIds =
StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage;

std::vector<std::vector<std::string>> groups;
auto make_ids = [](std::string const& prefix, int count) {
std::vector<std::string> ids(count);
std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable {
return prefix + std::to_string(++count);
});
return ids;
};
groups.push_back(make_ids("group-1-", kMaxIds));
groups.push_back(make_ids("group-2-", kMaxIds));
groups.push_back(make_ids("group-3-", 2));

auto make_on_modify = [](std::vector<std::string> e) {
return [expected_ids = std::move(e)](auto, auto, auto const& request) {
EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids));
return make_ready_future(Status{});
};
};

AutomaticallyCreatedBackgroundThreads background;
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

EXPECT_CALL(*mock, AsyncStreamingPull)
.WillOnce([&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>,
google::pubsub::v1::StreamingPullRequest const&) {
return MakeUnusedStream(false);
});

EXPECT_CALL(
*mock,
AsyncModifyAckDeadline(
_, _,
Property(&ModifyRequest::subscription,
"projects/test-project/subscriptions/test-subscription")))
.WillOnce(make_on_modify(groups[0]))
.WillOnce(make_on_modify(groups[1]))
.WillOnce(make_on_modify(groups[2]));

auto shutdown = std::make_shared<SessionShutdownManager>();
auto uut = MakeTestBatchSource(background.cq(), shutdown, mock);

auto done = shutdown->Start({});
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});

std::vector<std::string> nacks;
for (auto& ids : groups) {
nacks.insert(nacks.end(), ids.begin(), ids.end());
}

uut->BulkNack(nacks);

shutdown->MarkAsShutdown("test", {});
}

void CheckExtendLeasesMultipleRequests(bool enable_exactly_once) {
auto constexpr kMaxIds =
StreamingSubscriptionBatchSource::kMaxAckIdsPerMessage;

std::vector<std::vector<std::string>> groups;
auto make_ids = [](std::string const& prefix, int count) {
std::vector<std::string> ids(count);
std::generate(ids.begin(), ids.end(), [&prefix, count = 0]() mutable {
return prefix + std::to_string(++count);
});
return ids;
};
groups.push_back(make_ids("group-1-", kMaxIds));
groups.push_back(make_ids("group-2-", kMaxIds));
groups.push_back(make_ids("group-3-", 2));

auto make_on_modify = [](std::vector<std::string> e) {
return [expected_ids = std::move(e)](auto, auto, auto const& request) {
EXPECT_THAT(request.ack_ids(), ElementsAreArray(expected_ids));
return make_ready_future(Status{});
};
};

AutomaticallyCreatedBackgroundThreads background;
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

EXPECT_CALL(*mock, AsyncStreamingPull)
.WillOnce([&](google::cloud::CompletionQueue&,
std::unique_ptr<grpc::ClientContext>,
google::pubsub::v1::StreamingPullRequest const&) {
return MakeUnusedStream(enable_exactly_once);
});

EXPECT_CALL(
*mock,
AsyncModifyAckDeadline(
_, _,
Property(&ModifyRequest::subscription,
"projects/test-project/subscriptions/test-subscription")))
.WillOnce(make_on_modify(groups[0]))
.WillOnce(make_on_modify(groups[1]))
.WillOnce(make_on_modify(groups[2]));

auto shutdown = std::make_shared<SessionShutdownManager>();
auto uut = MakeTestBatchSource(background.cq(), shutdown, mock);

auto done = shutdown->Start({});
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});

std::vector<std::string> acks;
for (auto& ids : groups) {
acks.insert(acks.end(), ids.begin(), ids.end());
}

uut->ExtendLeases(acks, std::chrono::seconds(60));

shutdown->MarkAsShutdown("test", {});
}

TEST(StreamingSubscriptionBatchSourceTest, ExtendLeasesMultipleRequests) {
CheckExtendLeasesMultipleRequests(false);
}

TEST(StreamingSubscriptionBatchSourceTest,
ExtendLeasesMultipleRequestsWithExactlyOnce) {
CheckExtendLeasesMultipleRequests(true);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub_internal
Expand Down