Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): implement GUAC for Publisher #7440

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions google/cloud/pubsub/internal/create_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

std::shared_ptr<grpc::Channel> CreateChannel(Options const& opts,
int channel_id) {
auto channel_arguments = internal::MakeChannelArguments(opts);
channel_arguments.SetInt(GRPC_ARG_CHANNEL_ID, channel_id);
return grpc::CreateCustomChannel(opts.get<EndpointOption>(),
opts.get<GrpcCredentialOption>(),
MakeChannelArguments(opts, channel_id));
}

grpc::ChannelArguments MakeChannelArguments(Options const& opts,
int channel_id) {
auto args = internal::MakeChannelArguments(opts);
args.SetInt(GRPC_ARG_CHANNEL_ID, channel_id);
// Pub/Sub messages are often larger than the default gRPC limit (4MiB). How
// much bigger is a bit of a guess. The application-level payload cannot be
// larger than 10MiB, but there is the overhead in the protos, and the gRPC
Expand All @@ -31,12 +38,9 @@ std::shared_ptr<grpc::Channel> CreateChannel(Options const& opts,
// reasonable overhead, and (c) typically applications have dozens of channels
// and rarely more than 100, so even if too generous it is unlikely to be
// material.
channel_arguments.SetMaxSendMessageSize(16 * 1024 * 1024);
channel_arguments.SetMaxReceiveMessageSize(16 * 1024 * 1024);

return grpc::CreateCustomChannel(opts.get<EndpointOption>(),
opts.get<GrpcCredentialOption>(),
std::move(channel_arguments));
args.SetMaxSendMessageSize(16 * 1024 * 1024);
args.SetMaxReceiveMessageSize(16 * 1024 * 1024);
return args;
}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/pubsub/internal/create_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
std::shared_ptr<grpc::Channel> CreateChannel(Options const& opts,
int channel_id);

/// Initialize Channel Arguments configured by @p opts and @p channel_id
grpc::ChannelArguments MakeChannelArguments(Options const& opts,
int channel_id);

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
Expand Down
106 changes: 71 additions & 35 deletions google/cloud/pubsub/publisher_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#include "google/cloud/pubsub/publisher_connection.h"
#include "google/cloud/pubsub/internal/batching_publisher_connection.h"
#include "google/cloud/pubsub/internal/create_channel.h"
#include "google/cloud/pubsub/internal/default_batch_sink.h"
#include "google/cloud/pubsub/internal/defaults.h"
#include "google/cloud/pubsub/internal/flow_controlled_publisher_connection.h"
#include "google/cloud/pubsub/internal/non_constructible.h"
#include "google/cloud/pubsub/internal/ordering_key_publisher_connection.h"
#include "google/cloud/pubsub/internal/publisher_auth.h"
#include "google/cloud/pubsub/internal/publisher_logging.h"
#include "google/cloud/pubsub/internal/publisher_metadata.h"
#include "google/cloud/pubsub/internal/publisher_round_robin.h"
Expand Down Expand Up @@ -55,6 +57,57 @@ class ContainingPublisherConnection : public PublisherConnection {
std::shared_ptr<BackgroundThreads> background_;
std::shared_ptr<PublisherConnection> child_;
};

std::shared_ptr<pubsub_internal::PublisherStub> DecoratePublisherStub(
Options const& opts,
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children) {
std::shared_ptr<pubsub_internal::PublisherStub> stub =
std::make_shared<pubsub_internal::PublisherRoundRobin>(
std::move(children));
if (auth->RequiresConfigureContext()) {
stub = std::make_shared<pubsub_internal::PublisherAuth>(std::move(auth),
std::move(stub));
}
stub = std::make_shared<pubsub_internal::PublisherMetadata>(std::move(stub));
if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
stub = std::make_shared<pubsub_internal::PublisherLogging>(
std::move(stub), opts.get<GrpcTracingOptionsOption>());
}
return stub;
}

std::shared_ptr<pubsub::PublisherConnection> ConnectionFromDecoratedStub(
pubsub::Topic topic, Options opts,
std::shared_ptr<BackgroundThreads> background,
std::shared_ptr<pubsub_internal::PublisherStub> stub) {
auto make_connection = [&]() -> std::shared_ptr<pubsub::PublisherConnection> {
auto cq = background->cq();
std::shared_ptr<pubsub_internal::BatchSink> sink =
pubsub_internal::DefaultBatchSink::Create(stub, cq, opts);
if (opts.get<pubsub::MessageOrderingOption>()) {
auto factory = [topic, opts, sink, cq](std::string const& key) {
return pubsub_internal::BatchingPublisherConnection::Create(
topic, opts, key,
pubsub_internal::SequentialBatchSink::Create(sink), cq);
};
return pubsub_internal::OrderingKeyPublisherConnection::Create(
std::move(factory));
}
return pubsub_internal::RejectsWithOrderingKey::Create(
pubsub_internal::BatchingPublisherConnection::Create(
std::move(topic), opts, {}, std::move(sink), std::move(cq)));
};
auto connection = make_connection();
if (opts.get<pubsub::FullPublisherActionOption>() !=
pubsub::FullPublisherAction::kIgnored) {
connection = pubsub_internal::FlowControlledPublisherConnection::Create(
std::move(opts), std::move(connection));
}
return std::make_shared<pubsub::ContainingPublisherConnection>(
std::move(background), std::move(connection));
}
} // namespace

PublisherConnection::~PublisherConnection() = default;
Expand All @@ -81,14 +134,21 @@ std::shared_ptr<PublisherConnection> MakePublisherConnection(Topic topic,
PolicyOptionList, PublisherOptionList>(
opts, __func__);
opts = pubsub_internal::DefaultPublisherOptions(std::move(opts));

auto background = internal::MakeBackgroundThreadsFactory(opts)();
auto auth = google::cloud::internal::CreateAuthenticationStrategy(
background->cq(), opts);
std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children(
opts.get<GrpcNumChannelsOption>());
int id = 0;
std::generate(children.begin(), children.end(), [&id, &opts] {
return pubsub_internal::CreateDefaultPublisherStub(opts, id++);
std::generate(children.begin(), children.end(), [&id, &opts, &auth] {
return pubsub_internal::CreateDefaultPublisherStub(
auth->CreateChannel(opts.get<EndpointOption>(),
pubsub_internal::MakeChannelArguments(opts, id++)));
});
return pubsub_internal::MakePublisherConnection(
std::move(topic), std::move(opts), std::move(children));
auto stub = DecoratePublisherStub(opts, std::move(auth), std::move(children));
return ConnectionFromDecoratedStub(std::move(topic), std::move(opts),
std::move(background), std::move(stub));
}

std::shared_ptr<PublisherConnection> MakePublisherConnection(
Expand All @@ -112,38 +172,14 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
std::shared_ptr<pubsub::PublisherConnection> MakePublisherConnection(
pubsub::Topic topic, Options opts,
std::vector<std::shared_ptr<PublisherStub>> stubs) {
if (stubs.empty()) return nullptr;
std::shared_ptr<PublisherStub> stub =
std::make_shared<PublisherRoundRobin>(std::move(stubs));
stub = std::make_shared<PublisherMetadata>(std::move(stub));
if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
stub = std::make_shared<PublisherLogging>(
std::move(stub), opts.get<GrpcTracingOptionsOption>());
}

auto background = internal::MakeBackgroundThreadsFactory(opts)();
auto make_connection = [&]() -> std::shared_ptr<pubsub::PublisherConnection> {
auto cq = background->cq();
std::shared_ptr<BatchSink> sink = DefaultBatchSink::Create(stub, cq, opts);
if (opts.get<pubsub::MessageOrderingOption>()) {
auto factory = [topic, opts, sink, cq](std::string const& key) {
return BatchingPublisherConnection::Create(
topic, opts, key, SequentialBatchSink::Create(sink), cq);
};
return OrderingKeyPublisherConnection::Create(std::move(factory));
}
return RejectsWithOrderingKey::Create(BatchingPublisherConnection::Create(
std::move(topic), opts, {}, std::move(sink), std::move(cq)));
};
auto connection = make_connection();
if (opts.get<pubsub::FullPublisherActionOption>() !=
pubsub::FullPublisherAction::kIgnored) {
connection = FlowControlledPublisherConnection::Create(
std::move(opts), std::move(connection));
}
return std::make_shared<pubsub::ContainingPublisherConnection>(
std::move(background), std::move(connection));
auto auth = google::cloud::internal::CreateAuthenticationStrategy(
background->cq(), opts);
auto stub =
pubsub::DecoratePublisherStub(opts, std::move(auth), std::move(stubs));
return pubsub::ConnectionFromDecoratedStub(std::move(topic), std::move(opts),
std::move(background),
std::move(stub));
}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
Expand Down
6 changes: 2 additions & 4 deletions google/cloud/pubsub/publisher_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ std::shared_ptr<PublisherConnection> MakeTestPublisherConnection(
Options opts = {}) {
opts = pubsub_internal::DefaultPublisherOptions(
pubsub_testing::MakeTestOptions(std::move(opts)));
std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children{
std::move(mock)};
return MakePublisherConnection(std::move(topic), std::move(opts),
std::move(children));
return pubsub_internal::MakePublisherConnection(
std::move(topic), std::move(opts), {std::move(mock)});
}

TEST(PublisherConnectionTest, Basic) {
Expand Down