Skip to content

Commit

Permalink
feat(pubsub): implement GUAC for Publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan committed Oct 11, 2021
1 parent e38b7c1 commit 105960b
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 47 deletions.
20 changes: 12 additions & 8 deletions google/cloud/pubsub/internal/create_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {

std::shared_ptr<grpc::Channel> CreateChannel(Options const& opts,
int channel_id) {
auto channel_arguments = internal::MakeChannelArguments(opts);
channel_arguments.SetInt(GRPC_ARG_CHANNEL_ID, channel_id);
return grpc::CreateCustomChannel(opts.get<EndpointOption>(),
opts.get<GrpcCredentialOption>(),
MakeChannelArguments(opts, channel_id));
}

grpc::ChannelArguments MakeChannelArguments(Options const& opts,
int channel_id) {
auto args = internal::MakeChannelArguments(opts);
args.SetInt(GRPC_ARG_CHANNEL_ID, channel_id);
// Pub/Sub messages are often larger than the default gRPC limit (4MiB). How
// much bigger is a bit of a guess. The application-level payload cannot be
// larger than 10MiB, but there is the overhead in the protos, and the gRPC
Expand All @@ -31,12 +38,9 @@ std::shared_ptr<grpc::Channel> CreateChannel(Options const& opts,
// reasonable overhead, and (c) typically applications have dozens of channels
// and rarely more than 100, so even if too generous it is unlikely to be
// material.
channel_arguments.SetMaxSendMessageSize(16 * 1024 * 1024);
channel_arguments.SetMaxReceiveMessageSize(16 * 1024 * 1024);

return grpc::CreateCustomChannel(opts.get<EndpointOption>(),
opts.get<GrpcCredentialOption>(),
std::move(channel_arguments));
args.SetMaxSendMessageSize(16 * 1024 * 1024);
args.SetMaxReceiveMessageSize(16 * 1024 * 1024);
return args;
}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/pubsub/internal/create_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
std::shared_ptr<grpc::Channel> CreateChannel(Options const& opts,
int channel_id);

/// Initialize Channel Arguments configured by @p opts and @p channel_id
grpc::ChannelArguments MakeChannelArguments(Options const& opts,
int channel_id);

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
} // namespace pubsub_internal
} // namespace cloud
Expand Down
106 changes: 71 additions & 35 deletions google/cloud/pubsub/publisher_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#include "google/cloud/pubsub/publisher_connection.h"
#include "google/cloud/pubsub/internal/batching_publisher_connection.h"
#include "google/cloud/pubsub/internal/create_channel.h"
#include "google/cloud/pubsub/internal/default_batch_sink.h"
#include "google/cloud/pubsub/internal/defaults.h"
#include "google/cloud/pubsub/internal/flow_controlled_publisher_connection.h"
#include "google/cloud/pubsub/internal/non_constructible.h"
#include "google/cloud/pubsub/internal/ordering_key_publisher_connection.h"
#include "google/cloud/pubsub/internal/publisher_auth.h"
#include "google/cloud/pubsub/internal/publisher_logging.h"
#include "google/cloud/pubsub/internal/publisher_metadata.h"
#include "google/cloud/pubsub/internal/publisher_round_robin.h"
Expand Down Expand Up @@ -55,6 +57,57 @@ class ContainingPublisherConnection : public PublisherConnection {
std::shared_ptr<BackgroundThreads> background_;
std::shared_ptr<PublisherConnection> child_;
};

std::shared_ptr<pubsub_internal::PublisherStub> DecoratePublisherStub(
Options const& opts,
std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children) {
std::shared_ptr<pubsub_internal::PublisherStub> stub =
std::make_shared<pubsub_internal::PublisherRoundRobin>(
std::move(children));
if (auth->RequiresConfigureContext()) {
stub = std::make_shared<pubsub_internal::PublisherAuth>(std::move(auth),
std::move(stub));
}
stub = std::make_shared<pubsub_internal::PublisherMetadata>(std::move(stub));
if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
stub = std::make_shared<pubsub_internal::PublisherLogging>(
std::move(stub), opts.get<GrpcTracingOptionsOption>());
}
return stub;
}

std::shared_ptr<pubsub::PublisherConnection> ConnectionFromDecoratedStub(
pubsub::Topic topic, Options opts,
std::shared_ptr<BackgroundThreads> background,
std::shared_ptr<pubsub_internal::PublisherStub> stub) {
auto make_connection = [&]() -> std::shared_ptr<pubsub::PublisherConnection> {
auto cq = background->cq();
std::shared_ptr<pubsub_internal::BatchSink> sink =
pubsub_internal::DefaultBatchSink::Create(stub, cq, opts);
if (opts.get<pubsub::MessageOrderingOption>()) {
auto factory = [topic, opts, sink, cq](std::string const& key) {
return pubsub_internal::BatchingPublisherConnection::Create(
topic, opts, key,
pubsub_internal::SequentialBatchSink::Create(sink), cq);
};
return pubsub_internal::OrderingKeyPublisherConnection::Create(
std::move(factory));
}
return pubsub_internal::RejectsWithOrderingKey::Create(
pubsub_internal::BatchingPublisherConnection::Create(
std::move(topic), opts, {}, std::move(sink), std::move(cq)));
};
auto connection = make_connection();
if (opts.get<pubsub::FullPublisherActionOption>() !=
pubsub::FullPublisherAction::kIgnored) {
connection = pubsub_internal::FlowControlledPublisherConnection::Create(
std::move(opts), std::move(connection));
}
return std::make_shared<pubsub::ContainingPublisherConnection>(
std::move(background), std::move(connection));
}
} // namespace

PublisherConnection::~PublisherConnection() = default;
Expand All @@ -81,14 +134,21 @@ std::shared_ptr<PublisherConnection> MakePublisherConnection(Topic topic,
PolicyOptionList, PublisherOptionList>(
opts, __func__);
opts = pubsub_internal::DefaultPublisherOptions(std::move(opts));

auto background = internal::MakeBackgroundThreadsFactory(opts)();
auto auth = google::cloud::internal::CreateAuthenticationStrategy(
background->cq(), opts);
std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children(
opts.get<GrpcNumChannelsOption>());
int id = 0;
std::generate(children.begin(), children.end(), [&id, &opts] {
return pubsub_internal::CreateDefaultPublisherStub(opts, id++);
std::generate(children.begin(), children.end(), [&id, &opts, &auth] {
return pubsub_internal::CreateDefaultPublisherStub(
auth->CreateChannel(opts.get<EndpointOption>(),
pubsub_internal::MakeChannelArguments(opts, id++)));
});
return pubsub_internal::MakePublisherConnection(
std::move(topic), std::move(opts), std::move(children));
auto stub = DecoratePublisherStub(opts, std::move(auth), std::move(children));
return ConnectionFromDecoratedStub(std::move(topic), std::move(opts),
std::move(background), std::move(stub));
}

std::shared_ptr<PublisherConnection> MakePublisherConnection(
Expand All @@ -112,38 +172,14 @@ inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
std::shared_ptr<pubsub::PublisherConnection> MakePublisherConnection(
pubsub::Topic topic, Options opts,
std::vector<std::shared_ptr<PublisherStub>> stubs) {
if (stubs.empty()) return nullptr;
std::shared_ptr<PublisherStub> stub =
std::make_shared<PublisherRoundRobin>(std::move(stubs));
stub = std::make_shared<PublisherMetadata>(std::move(stub));
if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
GCP_LOG(INFO) << "Enabled logging for gRPC calls";
stub = std::make_shared<PublisherLogging>(
std::move(stub), opts.get<GrpcTracingOptionsOption>());
}

auto background = internal::MakeBackgroundThreadsFactory(opts)();
auto make_connection = [&]() -> std::shared_ptr<pubsub::PublisherConnection> {
auto cq = background->cq();
std::shared_ptr<BatchSink> sink = DefaultBatchSink::Create(stub, cq, opts);
if (opts.get<pubsub::MessageOrderingOption>()) {
auto factory = [topic, opts, sink, cq](std::string const& key) {
return BatchingPublisherConnection::Create(
topic, opts, key, SequentialBatchSink::Create(sink), cq);
};
return OrderingKeyPublisherConnection::Create(std::move(factory));
}
return RejectsWithOrderingKey::Create(BatchingPublisherConnection::Create(
std::move(topic), opts, {}, std::move(sink), std::move(cq)));
};
auto connection = make_connection();
if (opts.get<pubsub::FullPublisherActionOption>() !=
pubsub::FullPublisherAction::kIgnored) {
connection = FlowControlledPublisherConnection::Create(
std::move(opts), std::move(connection));
}
return std::make_shared<pubsub::ContainingPublisherConnection>(
std::move(background), std::move(connection));
auto auth = google::cloud::internal::CreateAuthenticationStrategy(
background->cq(), opts);
auto stub =
pubsub::DecoratePublisherStub(opts, std::move(auth), std::move(stubs));
return pubsub::ConnectionFromDecoratedStub(std::move(topic), std::move(opts),
std::move(background),
std::move(stub));
}

} // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
Expand Down
6 changes: 2 additions & 4 deletions google/cloud/pubsub/publisher_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ std::shared_ptr<PublisherConnection> MakeTestPublisherConnection(
Options opts = {}) {
opts = pubsub_internal::DefaultPublisherOptions(
pubsub_testing::MakeTestOptions(std::move(opts)));
std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children{
std::move(mock)};
return MakePublisherConnection(std::move(topic), std::move(opts),
std::move(children));
return pubsub_internal::MakePublisherConnection(
std::move(topic), std::move(opts), {std::move(mock)});
}

TEST(PublisherConnectionTest, Basic) {
Expand Down

0 comments on commit 105960b

Please sign in to comment.