Skip to content

Commit

Permalink
feat grpc: add channels for each high load grpc method
Browse files Browse the repository at this point in the history
add a static config option for creating separate grpc channels for high load grpc methods
commit_hash:dd1783477390776f5a8282f95e8997cf60169e54
  • Loading branch information
ArkadyRudenko committed Oct 29, 2024
1 parent f4b4551 commit d4e39cb
Show file tree
Hide file tree
Showing 29 changed files with 250 additions and 95 deletions.
3 changes: 2 additions & 1 deletion .mapping.json
Original file line number Diff line number Diff line change
Expand Up @@ -1844,6 +1844,7 @@
"grpc/include/userver/ugrpc/client/channels.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/channels.hpp",
"grpc/include/userver/ugrpc/client/client_factory.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_factory.hpp",
"grpc/include/userver/ugrpc/client/client_factory_component.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_factory_component.hpp",
"grpc/include/userver/ugrpc/client/client_factory_settings.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_factory_settings.hpp",
"grpc/include/userver/ugrpc/client/client_qos.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/client_qos.hpp",
"grpc/include/userver/ugrpc/client/common_component.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/common_component.hpp",
"grpc/include/userver/ugrpc/client/exceptions.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/client/exceptions.hpp",
Expand Down Expand Up @@ -1879,6 +1880,7 @@
"grpc/include/userver/ugrpc/impl/statistics.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/statistics.hpp",
"grpc/include/userver/ugrpc/impl/statistics_scope.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/statistics_scope.hpp",
"grpc/include/userver/ugrpc/impl/statistics_storage.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/statistics_storage.hpp",
"grpc/include/userver/ugrpc/impl/to_string.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/impl/to_string.hpp",
"grpc/include/userver/ugrpc/proto_json.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/proto_json.hpp",
"grpc/include/userver/ugrpc/protobuf_visit.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/protobuf_visit.hpp",
"grpc/include/userver/ugrpc/server/call.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/call.hpp",
Expand Down Expand Up @@ -1981,7 +1983,6 @@
"grpc/src/ugrpc/impl/statistics_storage.cpp":"taxi/uservices/userver/grpc/src/ugrpc/impl/statistics_storage.cpp",
"grpc/src/ugrpc/impl/status.cpp":"taxi/uservices/userver/grpc/src/ugrpc/impl/status.cpp",
"grpc/src/ugrpc/impl/status.hpp":"taxi/uservices/userver/grpc/src/ugrpc/impl/status.hpp",
"grpc/src/ugrpc/impl/to_string.hpp":"taxi/uservices/userver/grpc/src/ugrpc/impl/to_string.hpp",
"grpc/src/ugrpc/proto_json.cpp":"taxi/uservices/userver/grpc/src/ugrpc/proto_json.cpp",
"grpc/src/ugrpc/protobuf_visit.cpp":"taxi/uservices/userver/grpc/src/ugrpc/protobuf_visit.cpp",
"grpc/src/ugrpc/server/call.cpp":"taxi/uservices/userver/grpc/src/ugrpc/server/call.cpp",
Expand Down
21 changes: 18 additions & 3 deletions grpc/functional_tests/basic_chaos/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,35 @@
#include <userver/yaml_config/merge_schemas.hpp>

#include <userver/ugrpc/client/client_factory_component.hpp>
#include <userver/ugrpc/client/impl/client_data.hpp>
#include <userver/ugrpc/client/simple_client_component.hpp>

#include <samples/greeter_client.usrv.pb.hpp>

namespace samples {

using Client = api::GreeterServiceClient;

// It is used only to test the count of dedicated channels
using GreeterClientComponent = ugrpc::client::SimpleClientComponent<Client>;

class GreeterClient final : public components::ComponentBase {
public:
static constexpr std::string_view kName = "greeter-client";

GreeterClient(const components::ComponentConfig& config, const components::ComponentContext& context)
: ComponentBase(config, context),
client_factory_(context.FindComponent<ugrpc::client::ClientFactoryComponent>().GetFactory()),
client_(client_factory_.MakeClient<api::GreeterServiceClient>("greeter", config["endpoint"].As<std::string>())
) {}
client_(client_factory_.MakeClient<Client>("greeter", config["endpoint"].As<std::string>())) {
// Tests dedicated-channel-count from SimpleClientComponent
auto& client =
context.FindComponent<ugrpc::client::SimpleClientComponent<Client>>("greeter-client-component").GetClient();
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(0) == 3);
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(1) == 0);
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(2) == 2);
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(3) == 0);
UASSERT(ugrpc::client::impl::GetClientData(client).GetDedicatedChannelCount(4) == 0);
}

inline std::string SayHello(std::string name, bool is_small_timeout);

Expand All @@ -39,7 +54,7 @@ class GreeterClient final : public components::ComponentBase {
inline static std::unique_ptr<grpc::ClientContext> CreateClientContext(bool is_small_timeout);

ugrpc::client::ClientFactory& client_factory_;
api::GreeterServiceClient client_;
Client client_;
};

yaml_config::Schema GreeterClient::GetStaticConfigSchema() {
Expand Down
1 change: 1 addition & 0 deletions grpc/functional_tests/basic_chaos/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ int main(int argc, char* argv[]) {
.Append<congestion_control::Component>()
.Append<samples::GreeterServiceComponent>()
.Append<samples::GreeterClient>()
.Append<samples::GreeterClientComponent>("greeter-client-component")
.Append<samples::GreeterHttpHandler>();
return utils::DaemonMain(argc, argv, component_list);
}
8 changes: 8 additions & 0 deletions grpc/functional_tests/basic_chaos/static_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ components_manager:
greeter-client:
endpoint: '[::]:8081'

greeter-client-component:
endpoint: '[::]:8081'
client-name: greeter-client
dedicated-channel-counts:
SayHello: 3
SayHelloRequestStream: 2
factory-component: grpc-client-factory

# http server
server:
listener:
Expand Down
29 changes: 6 additions & 23 deletions grpc/include/userver/ugrpc/client/client_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <userver/storages/secdist/secdist.hpp>
#include <userver/testsuite/grpc_control.hpp>

#include <userver/ugrpc/client/client_factory_settings.hpp>
#include <userver/ugrpc/client/fwd.hpp>
#include <userver/ugrpc/client/impl/channel_cache.hpp>
#include <userver/ugrpc/client/impl/client_data.hpp>
#include <userver/ugrpc/client/middlewares/base.hpp>

USERVER_NAMESPACE_BEGIN
Expand All @@ -33,28 +33,6 @@ class CompletionQueuePoolBase;

namespace ugrpc::client {

/// Settings relating to the ClientFactory
struct ClientFactorySettings final {
/// gRPC channel credentials, none by default
std::shared_ptr<grpc::ChannelCredentials> credentials{grpc::InsecureChannelCredentials()};

/// gRPC channel credentials by client_name. If not set, default `credentials`
/// is used instead.
std::unordered_map<std::string, std::shared_ptr<grpc::ChannelCredentials>> client_credentials{};

/// Optional grpc-core channel args
/// @see https://grpc.github.io/grpc/core/group__grpc__arg__keys.html
grpc::ChannelArguments channel_args{};

/// The logging level override for the internal grpcpp library. Must be either
/// `kDebug`, `kInfo` or `kError`.
logging::Level native_log_level{logging::Level::kError};

/// Number of underlying channels that will be created for every client
/// in this factory.
std::size_t channel_count{1};
};

/// Settings relating to creation of a code-generated client
struct ClientSettings final {
/// **(Required)**
Expand Down Expand Up @@ -82,6 +60,10 @@ struct ClientSettings final {
///
/// @snippet grpc/tests/tests/unit_test_client_qos.hpp qos config key
const dynamic_config::Key<ClientQos>* client_qos{nullptr};

/// **(Optional)**
/// Dedicated high-load methods that have separate channels
DedicatedMethodsConfig dedicated_methods_config{};
};

/// @ingroup userver_clients
Expand Down Expand Up @@ -126,6 +108,7 @@ class ClientFactory final {

impl::ClientDependencies MakeClientDependencies(ClientSettings&& settings);

ClientFactorySettings settings_;
engine::TaskProcessor& channel_task_processor_;
MiddlewareFactories mws_;
ugrpc::impl::CompletionQueuePoolBase& completion_queues_;
Expand Down
46 changes: 46 additions & 0 deletions grpc/include/userver/ugrpc/client/client_factory_settings.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

/// @file userver/ugrpc/client/client_factory_settings.hpp
/// @brief @copybrief ugrpc::client::ClientFactorySettings

#include <memory>
#include <string>
#include <unordered_map>

#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>

#include <userver/logging/level.hpp>

USERVER_NAMESPACE_BEGIN

namespace ugrpc::client {

// full rpc name -> count of channels
using DedicatedMethodsConfig = std::unordered_map<std::string, std::size_t>;

/// Settings relating to the ClientFactory
struct ClientFactorySettings final {
/// gRPC channel credentials, none by default
std::shared_ptr<grpc::ChannelCredentials> credentials{grpc::InsecureChannelCredentials()};

/// gRPC channel credentials by client_name. If not set, default `credentials`
/// is used instead.
std::unordered_map<std::string, std::shared_ptr<grpc::ChannelCredentials>> client_credentials{};

/// Optional grpc-core channel args
/// @see https://grpc.github.io/grpc/core/group__grpc__arg__keys.html
grpc::ChannelArguments channel_args{};

/// The logging level override for the internal grpcpp library. Must be either
/// `kDebug`, `kInfo` or `kError`.
logging::Level native_log_level{logging::Level::kError};

/// Number of underlying channels that will be created for every client
/// in this factory.
std::size_t channel_count{1};
};

} // namespace ugrpc::client

USERVER_NAMESPACE_END
61 changes: 54 additions & 7 deletions grpc/include/userver/ugrpc/client/impl/client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@

#include <grpcpp/channel.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/security/credentials.h>

#include <userver/dynamic_config/source.hpp>
#include <userver/testsuite/grpc_control.hpp>
#include <userver/utils/fixed_array.hpp>

#include <userver/ugrpc/client/client_factory_settings.hpp>
#include <userver/ugrpc/client/fwd.hpp>
#include <userver/ugrpc/client/impl/channel_cache.hpp>
#include <userver/ugrpc/client/middlewares/fwd.hpp>
#include <userver/ugrpc/impl/static_metadata.hpp>
#include <userver/ugrpc/impl/statistics.hpp>
#include <userver/utils/fixed_array.hpp>
#include <userver/ugrpc/impl/to_string.hpp>

USERVER_NAMESPACE_BEGIN

Expand All @@ -24,6 +28,10 @@ class StatisticsStorage;
class CompletionQueuePoolBase;
} // namespace ugrpc::impl

namespace ugrpc::client {
struct ClientFactorySettings;
}

namespace ugrpc::client::impl {

/// Contains all non-code-generated dependencies for creating a gRPC client
Expand All @@ -37,6 +45,8 @@ struct ClientDependencies final {
const dynamic_config::Source config_source;
testsuite::GrpcControl& testsuite_grpc;
const dynamic_config::Key<ClientQos>* qos{nullptr};
const ClientFactorySettings& settings;
DedicatedMethodsConfig dedicated_methods_config;
};

struct GenericClientTag final {
Expand All @@ -56,11 +66,12 @@ class ClientData final {
: dependencies_(std::move(dependencies)),
metadata_(metadata),
service_statistics_(&GetServiceStatistics()),
stubs_(MakeStubs<Service>(dependencies_.channel_token)) {}
default_stubs_(MakeStubs<Service>(dependencies_.channel_token)),
dedicated_stubs_(MakeDedicatedStubs<Service>(dependencies_, metadata)) {}

template <typename Service>
ClientData(ClientDependencies&& dependencies, GenericClientTag, std::in_place_type_t<Service>)
: dependencies_(std::move(dependencies)), stubs_(MakeStubs<Service>(dependencies_.channel_token)) {}
: dependencies_(std::move(dependencies)), default_stubs_(MakeStubs<Service>(dependencies_.channel_token)) {}

ClientData(ClientData&&) noexcept = default;
ClientData& operator=(ClientData&&) = delete;
Expand All @@ -69,8 +80,16 @@ class ClientData final {
ClientData& operator=(const ClientData&) = delete;

template <typename Service>
Stub<Service>& NextStub() const {
return *static_cast<Stub<Service>*>(NextStubPtr().get());
Stub<Service>& NextStubFromMethodId(std::size_t method_id) const {
if (!dedicated_stubs_[method_id].empty()) {
return *static_cast<Stub<Service>*>(NextStubPtr(dedicated_stubs_[method_id]).get());
}
return NextGenericStub<Service>();
}

template <typename Service>
Stub<Service>& NextGenericStub() const {
return *static_cast<Stub<Service>*>(NextStubPtr(default_stubs_).get());
}

grpc::CompletionQueue& NextQueue() const;
Expand All @@ -93,10 +112,23 @@ class ClientData final {

const dynamic_config::Key<ClientQos>* GetClientQos() const;

std::size_t GetDedicatedChannelCount(std::size_t method_id) const;

private:
static std::shared_ptr<grpc::Channel>
CreateChannelImpl(const ClientDependencies& dependencies, const grpc::string& endpoint);

static std::size_t GetDedicatedChannelCountImpl(
const ClientDependencies& dependencies,
std::size_t method_id,
const ugrpc::impl::StaticServiceMetadata& meta
);

using StubDeleterType = void (*)(void*);
using StubPtr = std::unique_ptr<void, StubDeleterType>;

using StubPool = utils::FixedArray<StubPtr>;

template <typename Service>
static void StubDeleter(void* ptr) noexcept {
delete static_cast<Stub<Service>*>(ptr);
Expand All @@ -110,14 +142,29 @@ class ClientData final {
});
}

const StubPtr& NextStubPtr() const;
template <typename Service>
static utils::FixedArray<StubPool>
MakeDedicatedStubs(ClientDependencies& dependencies, const ugrpc::impl::StaticServiceMetadata& meta) {
const auto& method_full_names = meta.method_full_names;
const auto endpoint_string = ugrpc::impl::ToGrpcString(dependencies.endpoint);
return utils::GenerateFixedArray(method_full_names.size(), [&](std::size_t method_id) {
const auto count_of_channels = GetDedicatedChannelCountImpl(dependencies, method_id, meta);
return utils::GenerateFixedArray(count_of_channels, [&](std::size_t) {
return StubPtr(Service::NewStub(CreateChannelImpl(dependencies, endpoint_string)).release(), &StubDeleter<Service>);
});
});
}

const StubPtr& NextStubPtr(const utils::FixedArray<StubPtr>& stubs) const;

ugrpc::impl::ServiceStatistics& GetServiceStatistics();

ClientDependencies dependencies_;
std::optional<ugrpc::impl::StaticServiceMetadata> metadata_{std::nullopt};
ugrpc::impl::ServiceStatistics* service_statistics_{nullptr};
utils::FixedArray<StubPtr> stubs_;
utils::FixedArray<StubPtr> default_stubs_;
// method_id -> stub_pool
utils::FixedArray<StubPool> dedicated_stubs_;
};

template <typename Client>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class SimpleClientComponentAny : public components::ComponentBase {
/// ---- | ----------- | -------------
/// endpoint | URL of the gRPC service | --
/// client-name | name of the gRPC server we talk to, for diagnostics | <uses the component name>
/// dedicated-channel-counts | a map of a full rpc name (`full.service.Name/MethodName`) in channel count. Used for high-load methods | -
/// factory-component | ClientFactoryComponent name to use for client creation | --

// clang-format on
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion grpc/src/ugrpc/client/channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

#include <userver/engine/async.hpp>

#include <ugrpc/impl/to_string.hpp>
#include <userver/ugrpc/impl/async_method_invocation.hpp>
#include <userver/ugrpc/impl/deadline_timepoint.hpp>
#include <userver/ugrpc/impl/to_string.hpp>

USERVER_NAMESPACE_BEGIN

Expand Down
19 changes: 11 additions & 8 deletions grpc/src/ugrpc/client/client_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@ ClientFactory::ClientFactory(
testsuite::GrpcControl& testsuite_grpc,
dynamic_config::Source source
)
: channel_task_processor_(channel_task_processor),
: settings_(std::move(settings)),
channel_task_processor_(channel_task_processor),
mws_(mws),
completion_queues_(completion_queues),
channel_cache_(
testsuite_grpc.IsTlsEnabled() ? settings.credentials : grpc::InsecureChannelCredentials(),
settings.channel_args,
settings.channel_count
testsuite_grpc.IsTlsEnabled() ? settings_.credentials : grpc::InsecureChannelCredentials(),
settings_.channel_args,
settings_.channel_count
),
client_statistics_storage_(statistics_storage),
config_source_(source),
testsuite_grpc_(testsuite_grpc) {
ugrpc::impl::SetupNativeLogging();
ugrpc::impl::UpdateNativeLogLevel(settings.native_log_level);
ugrpc::impl::UpdateNativeLogLevel(settings_.native_log_level);

for (auto& [client_name, creds] : settings.client_credentials) {
for (auto& [client_name, creds] : settings_.client_credentials) {
client_channel_cache_.try_emplace(
std::string{client_name},
testsuite_grpc.IsTlsEnabled() ? creds : grpc::InsecureChannelCredentials(),
settings.channel_args,
settings.channel_count
settings_.channel_args,
settings_.channel_count
);
}
}
Expand Down Expand Up @@ -71,6 +72,8 @@ impl::ClientDependencies ClientFactory::MakeClientDependencies(ClientSettings&&
config_source_,
testsuite_grpc_,
settings.client_qos,
settings_,
std::move(settings.dedicated_methods_config),
};
}

Expand Down
Loading

0 comments on commit d4e39cb

Please sign in to comment.