Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a service option to disable check eovercrowded on server side #2774

Merged
merged 4 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,6 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}

if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}

if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(
Expand Down Expand Up @@ -505,6 +499,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
google::protobuf::Service* svc = NULL;
google::protobuf::MethodDescriptor* method = NULL;
if (NULL != server->options().baidu_master_service) {
if (socket->is_overcrowded()) {
chenBright marked this conversation as resolved.
Show resolved Hide resolved
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
svc = server->options().baidu_master_service;
auto sampled_request = new (std::nothrow) SampledRequest;
if (NULL == sampled_request) {
Expand Down Expand Up @@ -567,6 +566,11 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
mp->service->CallMethod(mp->method, cntl.get(), &breq, &bres, NULL);
break;
}
if (socket->is_overcrowded() && !server->options().ignore_eovercrowded && !mp->ignore_eovercrowded) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
// Switch to service-specific error.
non_service_error.release();
method_status = mp->status;
Expand Down
69 changes: 69 additions & 0 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
#endif

static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0);
static bool g_default_ignore_eovercrowded(false);
chenBright marked this conversation as resolved.
Show resolved Hide resolved

int Server::StartInternal(const butil::EndPoint& endpoint,
const PortRange& port_range,
Expand Down Expand Up @@ -2305,6 +2306,74 @@ int Server::MaxConcurrencyOf(google::protobuf::Service* service,
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
}

bool& Server::IgnoreEovercrowdedOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "IgnoreEovercrowdedOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return mp->ignore_eovercrowded;
}

bool Server::IgnoreEovercrowdedOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_ignore_eovercrowded;
}
if (mp == NULL || mp->status == NULL) {
return false;
}
return mp->ignore_eovercrowded;
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const {
return IgnoreEovercrowdedOf(_method_map.seek(full_method_name));
}

bool& Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) {
MethodProperty* mp = const_cast<MethodProperty*>(
FindMethodPropertyByFullName(full_service_name, method_name));
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_service_name
<< '/' << method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_ignore_eovercrowded;
}
return IgnoreEovercrowdedOf(mp);
}

bool Server::IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(FindMethodPropertyByFullName(
full_service_name, method_name));
}

bool& Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const {
return IgnoreEovercrowdedOf(service->GetDescriptor()->full_name(), method_name);
}

bool Server::AcceptRequest(Controller* cntl) const {
const Interceptor* interceptor = _options.interceptor;
if (!interceptor) {
Expand Down
18 changes: 18 additions & 0 deletions src/brpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ struct ServerOptions {
// Owned by Server and deleted in server's destructor.
RpcPBMessageFactory* rpc_pb_message_factory;

bool ignore_eovercrowded;
chenBright marked this conversation as resolved.
Show resolved Hide resolved

private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases.
Expand Down Expand Up @@ -412,6 +414,7 @@ class Server {
const google::protobuf::MethodDescriptor* method;
MethodStatus* status;
AdaptiveMaxConcurrency max_concurrency;
bool ignore_eovercrowded = false;
chenBright marked this conversation as resolved.
Show resolved Hide resolved

MethodProperty();
};
Expand Down Expand Up @@ -590,6 +593,19 @@ class Server {
int MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name);
chenBright marked this conversation as resolved.
Show resolved Hide resolved
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_method_name) const;

bool& IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const;

bool& IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name);
bool IgnoreEovercrowdedOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const;

int Concurrency() const {
return butil::subtle::NoBarrier_Load(&_concurrency);
};
Expand Down Expand Up @@ -694,6 +710,8 @@ friend class Controller;

AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const;
bool& IgnoreEovercrowdedOf(MethodProperty*);
bool IgnoreEovercrowdedOf(const MethodProperty*) const;

static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out);
Expand Down
Loading