Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject PP/SR HTTP requests if cross shard semaphore has been exhausted #15977

Merged
merged 8 commits into from
Jan 16, 2024
26 changes: 26 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,32 @@ configuration::configuration()
"Per-shard capacity of the cache for validating schema IDs.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
128)
, pp_sr_smp_max_non_local_requests(
*this,
"pp_sr_smp_max_non_local_requests",
"Maximum number of x-core requests pending in Panda Proxy and Schema "
"Registry seastar::smp group. (for more details look at "
"`seastar::smp_service_group` documentation)",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
std::nullopt)
, max_in_flight_schema_registry_requests_per_shard(
*this,
"max_in_flight_schema_registry_requests_per_shard",
"Maximum number of in flight HTTP requests permitted in schema registry "
"per shard. Any additional requests above this limit will be rejected "
"with a 429 error",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
500,
{.min = 1})
, max_in_flight_pandaproxy_requests_per_shard(
*this,
"max_in_flight_pandaproxy_requests_per_shard",
"Maximum number of in flight HTTP requests permitted in pandaproxy per "
"shard. Any additional requests above this limit will be rejected with "
"a 429 error",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
500,
{.min = 1})
, kafka_memory_share_for_fetch(
*this,
"kafka_memory_share_for_fetch",
Expand Down
4 changes: 4 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ struct configuration final : public config_store {
enable_schema_id_validation;
config::property<size_t> kafka_schema_id_validation_cache_capacity;

property<std::optional<uint32_t>> pp_sr_smp_max_non_local_requests;
bounded_property<size_t> max_in_flight_schema_registry_requests_per_shard;
bounded_property<size_t> max_in_flight_pandaproxy_requests_per_shard;

bounded_property<double, numeric_bounds> kafka_memory_share_for_fetch;
property<size_t> kafka_memory_batch_size_estimate_for_fetch;
// debug controls
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ inline ss::http::reply& set_reply_unavailable(ss::http::reply& rep) {
.add_header("Retry-After", "0");
}

inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) {
return rep.set_status(ss::http::reply::status_type::too_many_requests)
.add_header("Retry-After", "0");
}

inline std::unique_ptr<ss::http::reply> reply_unavailable() {
auto rep = std::make_unique<ss::http::reply>(ss::http::reply{});
set_reply_unavailable(*rep);
Expand Down
10 changes: 8 additions & 2 deletions src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "cluster/controller.h"
#include "cluster/ephemeral_credential_frontend.h"
#include "cluster/members_table.h"
#include "config/configuration.h"
#include "kafka/client/config_utils.h"
#include "net/unresolved_address.h"
#include "pandaproxy/api/api-doc/rest.json.hh"
Expand Down Expand Up @@ -111,9 +112,11 @@ proxy::proxy(
cluster::controller* controller)
: _config(config)
, _mem_sem(max_memory, "pproxy/mem")
, _inflight_sem(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard(), "pproxy/inflight")
, _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind())
, _client(client)
, _client_cache(client_cache)
, _ctx{{{{}, _mem_sem, {}, smp_sg}, *this},
, _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
{config::always_true(), config::shard_local_cfg().superusers.bind(), controller},
_config.pandaproxy_api.value()}
, _server(
Expand All @@ -125,7 +128,10 @@ proxy::proxy(
_ctx,
json::serialization_format::application_json)
, _ensure_started{[this]() { return do_start(); }}
, _controller(controller) {}
, _controller(controller) {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
Comment on lines +132 to +133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_inflight_sem could be destructed at this point (I don't see another mechanism that unsubscribes the watch, or otherwise sequences the destruction order.

}

ss::future<> proxy::start() {
_server.routes(get_proxy_routes(_gate, _ensure_started));
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/rest/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "pandaproxy/server.h"
#include "pandaproxy/util.h"
#include "security/request_auth.h"
#include "utils/adjustable_semaphore.h"

#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -56,6 +57,8 @@ class proxy : public ss::peering_sharded_service<proxy> {

configuration _config;
ssx::semaphore _mem_sem;
adjustable_semaphore _inflight_sem;
config::binding<size_t> _inflight_config_binding;
ss::gate _gate;
ss::sharded<kafka::client::client>& _client;
ss::sharded<kafka_client_cache>& _client_cache;
Expand Down
13 changes: 11 additions & 2 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/ephemeral_credential_frontend.h"
#include "cluster/members_table.h"
#include "cluster/security_frontend.h"
#include "config/configuration.h"
#include "kafka/client/brokers.h"
#include "kafka/client/client_fetch_batch_reader.h"
#include "kafka/client/config_utils.h"
Expand Down Expand Up @@ -502,8 +503,13 @@ service::service(
ss::sharded<security::audit::audit_log_manager>& audit_mgr)
: _config(config)
, _mem_sem(max_memory, "pproxy/schema-svc")
, _inflight_sem(config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard())
, _inflight_config_binding(
config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard.bind())
, _client(client)
, _ctx{{{}, _mem_sem, {}, smp_sg}, *this}
, _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
, _server(
"schema_registry", // server_name
"schema_registry", // public_metric_group_name
Expand All @@ -520,7 +526,10 @@ service::service(
, _auth{
config::always_true(),
config::shard_local_cfg().superusers.bind(),
controller.get()} {}
controller.get()} {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
Comment on lines +530 to +531
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_inflight_sem could be destructed at this point (I don't see another mechanism that unsubscribes the watch, or otherwise sequences the destruction order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't destruction sequence order dictated by the class? _inflight_config_binding is constructed after _inflight_sem and therefor should be destructed before the semaphore?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I find relying on destructor order is incredibly sneaky and usually deserves a comment somewhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't destruction sequence order dictated by the class? _inflight_config_binding is constructed after _inflight_sem and therefor should be destructed before the semaphore?

You're correct.

}

ss::future<> service::start() {
co_await configure();
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "pandaproxy/util.h"
#include "security/fwd.h"
#include "security/request_auth.h"
#include "utils/adjustable_semaphore.h"

#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -70,6 +71,8 @@ class service : public ss::peering_sharded_service<service> {
ss::future<> fetch_internal_topic();
configuration _config;
ssx::semaphore _mem_sem;
adjustable_semaphore _inflight_sem;
config::binding<size_t> _inflight_config_binding;
ss::gate _gate;
ss::sharded<kafka::client::client>& _client;
ctx_server<service>::context_t _ctx;
Expand Down
47 changes: 29 additions & 18 deletions src/v/pandaproxy/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,41 @@ struct handler_adaptor : ss::httpd::handler_base {
auto guard = ss::gate::holder(_pending_requests);
server::request_t rq{std::move(req), this->_ctx};
server::reply_t rp{std::move(rep)};
const auto set_and_measure_response =
[&measure](const server::reply_t& rp) {
set_mime_type(*rp.rep, rp.mime_type);
measure.set_status(rp.rep->_status);
};
auto inflight_units = _ctx.inflight_sem.try_get_units(1);
if (!inflight_units) {
set_reply_too_many_requests(*rp.rep);
rp.mime_type = _exceptional_mime_type;
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto req_size = get_request_size(*rq.req);
auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size);
if (_ctx.as.abort_requested()) {
set_reply_unavailable(*rp.rep);
rp.mime_type = _exceptional_mime_type;
} else {
auto method = rq.req->_method;
auto url = rq.req->_url;
try {
rp = co_await _handler(std::move(rq), std::move(rp));
} catch (...) {
auto ex = std::current_exception();
vlog(
plog.warn,
"Request: {} {} failed: {}",
method,
url,
std::current_exception());
rp = server::reply_t{
exception_reply(ex), _exceptional_mime_type};
}
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto method = rq.req->_method;
auto url = rq.req->_url;
try {
rp = co_await _handler(std::move(rq), std::move(rp));
} catch (...) {
auto ex = std::current_exception();
vlog(
plog.warn,
"Request: {} {} failed: {}",
method,
url,
std::current_exception());
rp = server::reply_t{exception_reply(ex), _exceptional_mime_type};
}
set_mime_type(*rp.rep, rp.mime_type);
measure.set_status(rp.rep->_status);
set_and_measure_response(rp);
co_return std::move(rp.rep);
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "pandaproxy/kafka_client_cache.h"
#include "pandaproxy/types.h"
#include "security/request_auth.h"
#include "utils/adjustable_semaphore.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
Expand Down Expand Up @@ -70,6 +71,7 @@ class server {
struct context_t {
std::vector<net::unresolved_address> advertised_listeners;
ssx::semaphore& mem_sem;
adjustable_semaphore& inflight_sem;
ss::abort_source as;
ss::smp_service_group smp_sg;
};
Expand Down
5 changes: 4 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,10 @@ void application::initialize(
.raft_group_max_non_local_requests
= config::shard_local_cfg().raft_smp_max_non_local_requests().value_or(
smp_groups::default_raft_non_local_requests(
config::shard_local_cfg().topic_partitions_per_shard()))};
config::shard_local_cfg().topic_partitions_per_shard())),
.proxy_group_max_non_local_requests
= config::shard_local_cfg().pp_sr_smp_max_non_local_requests().value_or(
smp_groups::default_max_nonlocal_requests)};

smp_service_groups.create_groups(smp_groups_cfg).get();
_deferred.emplace_back(
Expand Down
8 changes: 8 additions & 0 deletions src/v/utils/adjustable_semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ class adjustable_semaphore {
return ss::get_units(_sem, units, as);
}

/**
* Attempts to immediately get units from the semaphore, returning
* std::nullopt if no units exist.
*/
std::optional<ssx::semaphore_units> try_get_units(size_t units) {
return ss::try_get_units(_sem, units);
}

size_t current() const noexcept { return _sem.current(); }
ssize_t available_units() const noexcept { return _sem.available_units(); }

Expand Down