Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): implement per-call options for Subscriber #10043

Merged
merged 4 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google/cloud/pubsub/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ add_library(
snapshot.h
snapshot_builder.cc
snapshot_builder.h
subscriber.cc
subscriber.h
subscriber_connection.cc
subscriber_connection.h
Expand Down
1 change: 1 addition & 0 deletions google/cloud/pubsub/google_cloud_cpp_pubsub.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ google_cloud_cpp_pubsub_srcs = [
"schema_admin_connection.cc",
"snapshot.cc",
"snapshot_builder.cc",
"subscriber.cc",
"subscriber_connection.cc",
"subscriber_options.cc",
"subscription.cc",
Expand Down
1 change: 1 addition & 0 deletions google/cloud/pubsub/mocks/mock_subscriber_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class MockSubscriberConnection : public pubsub::SubscriberConnection {
MOCK_METHOD(future<Status>, ExactlyOnceSubscribe,
(pubsub::SubscriberConnection::ExactlyOnceSubscribeParams),
(override));
MOCK_METHOD(google::cloud::Options, options, (), (override));
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
42 changes: 42 additions & 0 deletions google/cloud/pubsub/subscriber.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/pubsub/subscriber.h"

namespace google {
namespace cloud {
namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

Subscriber::Subscriber(std::shared_ptr<SubscriberConnection> connection,
Options opts)
: connection_(std::move(connection)),
options_(
internal::MergeOptions(std::move(opts), connection_->options())) {}

future<Status> Subscriber::Subscribe(ApplicationCallback cb, Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
return connection_->Subscribe({std::move(cb)});
}

future<Status> Subscriber::Subscribe(ExactlyOnceApplicationCallback cb,
Options opts) {
internal::OptionsSpan span(internal::MergeOptions(std::move(opts), options_));
return connection_->ExactlyOnceSubscribe({std::move(cb)});
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
} // namespace cloud
} // namespace google
20 changes: 12 additions & 8 deletions google/cloud/pubsub/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
*/
class Subscriber {
public:
explicit Subscriber(std::shared_ptr<SubscriberConnection> connection)
: connection_(std::move(connection)) {}
explicit Subscriber(std::shared_ptr<SubscriberConnection> connection,
Options opts = {});

/**
* Creates a new session to receive messages from @p subscription.
Expand All @@ -114,14 +114,15 @@ class Subscriber {
* @snippet samples.cc subscribe
*
* @param cb the callable invoked when messages are received.
* @param opts any option overrides to use in this call. These options take
* precedence over the options passed in the constructor, and over any
* options provided in the `PublisherConnection` initialization.
* @return a future that is satisfied when the session will no longer receive
* messages. For example, because there was an unrecoverable error trying
* to receive data. Calling `.cancel()` in this object will (eventually)
* terminate the session and satisfy the future.
*/
future<Status> Subscribe(ApplicationCallback cb) {
return connection_->Subscribe({std::move(cb)});
}
future<Status> Subscribe(ApplicationCallback cb, Options opts = {});

/**
* Creates a new session to receive messages from @p subscription using
Expand All @@ -145,17 +146,20 @@ class Subscriber {
* @snippet samples.cc exactly-once-subscribe
*
* @param cb the callable invoked when messages are received.
* @param opts any option overrides to use in this call. These options take
* precedence over the options passed in the constructor, and over any
* options provided in the `PublisherConnection` initialization.
* @return a future that is satisfied when the session will no longer receive
* messages. For example, because there was an unrecoverable error trying
* to receive data. Calling `.cancel()` in this object will (eventually)
* terminate the session and satisfy the future.
*/
future<Status> Subscribe(ExactlyOnceApplicationCallback cb) {
return connection_->ExactlyOnceSubscribe({std::move(cb)});
}
future<Status> Subscribe(ExactlyOnceApplicationCallback cb,
Options opts = {});

private:
std::shared_ptr<SubscriberConnection> connection_;
google::cloud::Options options_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
14 changes: 8 additions & 6 deletions google/cloud/pubsub/subscriber_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ class SubscriberConnectionImpl : public pubsub::SubscriberConnection {
~SubscriberConnectionImpl() override = default;

future<Status> Subscribe(SubscribeParams p) override {
return CreateSubscriptionSession(subscription_, opts_, stub_,
background_->cq(), MakeClientId(),
std::move(p.callback));
return CreateSubscriptionSession(
subscription_, google::cloud::internal::CurrentOptions(), stub_,
background_->cq(), MakeClientId(), std::move(p.callback));
}

future<Status> ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p) override {
return CreateSubscriptionSession(subscription_, opts_, stub_,
background_->cq(), MakeClientId(),
std::move(p.callback));
return CreateSubscriptionSession(
subscription_, google::cloud::internal::CurrentOptions(), stub_,
background_->cq(), MakeClientId(), std::move(p.callback));
}

Options options() override { return opts_; }

private:
std::string MakeClientId() {
std::lock_guard<std::mutex> lk(mu_);
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/pubsub/subscriber_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class SubscriberConnection {
* simplify the use of mocks.
*/
virtual future<Status> ExactlyOnceSubscribe(ExactlyOnceSubscribeParams p);

/// Returns the configuration parameters for this object
virtual Options options() { return Options{}; }
};

/**
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/pubsub/subscriber_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ TEST(SubscriberConnectionTest, Basic) {
waiter.set_value();
};
std::thread t([&cq] { cq.Run(); });
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Subscribe({handler});
waiter.get_future().wait();
response.cancel();
Expand Down Expand Up @@ -132,6 +133,7 @@ TEST(SubscriberConnectionTest, ExactlyOnce) {
waiter.set_value();
};
std::thread t([&cq] { cq.Run(); });
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->ExactlyOnceSubscribe({callback});
waiter.get_future().wait();
response.cancel();
Expand Down Expand Up @@ -187,6 +189,7 @@ TEST(SubscriberConnectionTest, PullFailure) {

auto subscriber = MakeTestSubscriberConnection(subscription, mock);
auto handler = [&](Message const&, AckHandler const&) {};
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Subscribe({handler});
EXPECT_THAT(response.get(),
StatusIs(StatusCode::kPermissionDenied, HasSubstr("uh-oh")));
Expand Down Expand Up @@ -227,6 +230,7 @@ TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsLogging) {
waiter.set_value();
};
std::thread t([&cq] { cq.Run(); });
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Subscribe({handler});
waiter.get_future().wait();
response.cancel();
Expand Down Expand Up @@ -283,6 +287,7 @@ TEST(SubscriberConnectionTest, MakeSubscriberConnectionSetupsMetadata) {
if (received_one.test_and_set()) return;
waiter.set_value();
};
google::cloud::internal::OptionsSpan span(subscriber->options());
auto response = subscriber->Subscribe({handler});
waiter.get_future().wait();
response.cancel();
Expand Down
117 changes: 117 additions & 0 deletions google/cloud/pubsub/subscriber_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,25 @@ namespace pubsub {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

using ::testing::Return;

struct TestOptionA {
using Type = std::string;
};

struct TestOptionB {
using Type = std::string;
};

struct TestOptionC {
using Type = std::string;
};

/// @test Verify Subscriber::Subscribe() works, including mocks.
TEST(SubscriberTest, SubscribeSimple) {
Subscription const subscription("test-project", "test-subscription");
auto mock = std::make_shared<pubsub_mocks::MockSubscriberConnection>();
EXPECT_CALL(*mock, options);
EXPECT_CALL(*mock, Subscribe)
.WillOnce([&](SubscriberConnection::SubscribeParams const& p) {
{
Expand Down Expand Up @@ -65,6 +80,7 @@ TEST(SubscriberTest, SubscribeSimple) {
TEST(SubscriberTest, SubscribeWithOptions) {
Subscription const subscription("test-project", "test-subscription");
auto mock = std::make_shared<pubsub_mocks::MockSubscriberConnection>();
EXPECT_CALL(*mock, options);
EXPECT_CALL(*mock, Subscribe)
.WillOnce([&](SubscriberConnection::SubscribeParams const&) {
return make_ready_future(Status{});
Expand All @@ -76,6 +92,107 @@ TEST(SubscriberTest, SubscribeWithOptions) {
ASSERT_STATUS_OK(status);
}

TEST(SubscriberTest, OptionsNoOverrides) {
Subscription const subscription("test-project", "test-subscription");
auto mock = std::make_shared<pubsub_mocks::MockSubscriberConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(Return(Options{}
.set<TestOptionA>("test-a")
.set<TestOptionB>("test-b")
.set<TestOptionC>("test-c")));
EXPECT_CALL(*mock, Subscribe).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "test-a");
EXPECT_EQ(current.get<TestOptionB>(), "test-b");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return make_ready_future(Status{});
});
EXPECT_CALL(*mock, ExactlyOnceSubscribe).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "test-a");
EXPECT_EQ(current.get<TestOptionB>(), "test-b");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return make_ready_future(Status{});
});

Subscriber subscriber(mock);
ASSERT_STATUS_OK(
subscriber.Subscribe([](Message const&, AckHandler const&) {}).get());
ASSERT_STATUS_OK(
subscriber.Subscribe([](Message const&, ExactlyOnceAckHandler const&) {})
.get());
}

TEST(SubscriberTest, OptionsClientOverrides) {
Subscription const subscription("test-project", "test-subscription");
auto mock = std::make_shared<pubsub_mocks::MockSubscriberConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(Return(Options{}
.set<TestOptionA>("test-a")
.set<TestOptionB>("test-b")
.set<TestOptionC>("test-c")));
EXPECT_CALL(*mock, Subscribe).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "override-a");
EXPECT_EQ(current.get<TestOptionB>(), "test-b");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return make_ready_future(Status{});
});
EXPECT_CALL(*mock, ExactlyOnceSubscribe).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "override-a");
EXPECT_EQ(current.get<TestOptionB>(), "test-b");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return make_ready_future(Status{});
});

Subscriber subscriber(mock, Options{}.set<TestOptionA>("override-a"));
ASSERT_STATUS_OK(
subscriber.Subscribe([](Message const&, AckHandler const&) {}).get());
ASSERT_STATUS_OK(
subscriber.Subscribe([](Message const&, ExactlyOnceAckHandler const&) {})
.get());
}

TEST(SubscriberTest, OptionsFunctionOverrides) {
Subscription const subscription("test-project", "test-subscription");
auto mock = std::make_shared<pubsub_mocks::MockSubscriberConnection>();
EXPECT_CALL(*mock, options)
.WillRepeatedly(Return(Options{}
.set<TestOptionA>("test-a")
.set<TestOptionB>("test-b")
.set<TestOptionC>("test-c")));
EXPECT_CALL(*mock, Subscribe).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "override-a1");
EXPECT_EQ(current.get<TestOptionB>(), "override-b1");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return make_ready_future(Status{});
});
EXPECT_CALL(*mock, ExactlyOnceSubscribe).WillOnce([](auto const&) {
auto const& current = google::cloud::internal::CurrentOptions();
EXPECT_EQ(current.get<TestOptionA>(), "override-a2");
EXPECT_EQ(current.get<TestOptionB>(), "override-b2");
EXPECT_EQ(current.get<TestOptionC>(), "test-c");
return make_ready_future(Status{});
});

Subscriber subscriber(mock, Options{}.set<TestOptionA>("override-a"));
ASSERT_STATUS_OK(subscriber
.Subscribe([](Message const&, AckHandler const&) {},
Options{}
.set<TestOptionA>("override-a1")
.set<TestOptionB>("override-b1"))
.get());
ASSERT_STATUS_OK(
subscriber
.Subscribe([](Message const&, ExactlyOnceAckHandler const&) {},
Options{}
.set<TestOptionA>("override-a2")
.set<TestOptionB>("override-b2"))
.get());
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace pubsub
Expand Down