diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 7e7e2513a3096..5e0bfe9d1d24a 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2838,6 +2838,32 @@ configuration::configuration() "Per-shard capacity of the cache for validating schema IDs.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 128) + , pp_sr_smp_max_non_local_requests( + *this, + "pp_sr_smp_max_non_local_requests", + "Maximum number of x-core requests pending in Panda Proxy and Schema " + "Registry seastar::smp group. (for more details look at " + "`seastar::smp_service_group` documentation)", + {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, + std::nullopt) + , max_in_flight_schema_registry_requests_per_shard( + *this, + "max_in_flight_schema_registry_requests_per_shard", + "Maximum number of in flight HTTP requests permitted in schema registry " + "per shard. Any additional requests above this limit will be rejected " + "with a 429 error", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 500, + {.min = 1}) + , max_in_flight_pandaproxy_requests_per_shard( + *this, + "max_in_flight_pandaproxy_requests_per_shard", + "Maximum number of in flight HTTP requests permitted in pandaproxy per " + "shard. Any additional requests above this limit will be rejected with " + "a 429 error", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 500, + {.min = 1}) , kafka_memory_share_for_fetch( *this, "kafka_memory_share_for_fetch", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 2404b4b4e406d..cde97e303b725 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -531,6 +531,10 @@ struct configuration final : public config_store { enable_schema_id_validation; config::property kafka_schema_id_validation_cache_capacity; + property> pp_sr_smp_max_non_local_requests; + bounded_property max_in_flight_schema_registry_requests_per_shard; + bounded_property max_in_flight_pandaproxy_requests_per_shard; + bounded_property kafka_memory_share_for_fetch; property kafka_memory_batch_size_estimate_for_fetch; // debug controls diff --git a/src/v/pandaproxy/reply.h b/src/v/pandaproxy/reply.h index 7cbf6058a4ee1..b314b58ed48ed 100644 --- a/src/v/pandaproxy/reply.h +++ b/src/v/pandaproxy/reply.h @@ -60,6 +60,11 @@ inline ss::http::reply& set_reply_unavailable(ss::http::reply& rep) { .add_header("Retry-After", "0"); } +inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) { + return rep.set_status(ss::http::reply::status_type::too_many_requests) + .add_header("Retry-After", "0"); +} + inline std::unique_ptr reply_unavailable() { auto rep = std::make_unique(ss::http::reply{}); set_reply_unavailable(*rep); diff --git a/src/v/pandaproxy/rest/proxy.cc b/src/v/pandaproxy/rest/proxy.cc index ff0ff27a60c13..e886090b414c4 100644 --- a/src/v/pandaproxy/rest/proxy.cc +++ b/src/v/pandaproxy/rest/proxy.cc @@ -12,6 +12,7 @@ #include "cluster/controller.h" #include "cluster/ephemeral_credential_frontend.h" #include "cluster/members_table.h" +#include "config/configuration.h" #include "kafka/client/config_utils.h" #include "net/unresolved_address.h" #include "pandaproxy/api/api-doc/rest.json.hh" @@ -111,9 +112,11 @@ proxy::proxy( cluster::controller* controller) : _config(config) , _mem_sem(max_memory, "pproxy/mem") + , _inflight_sem(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard(), "pproxy/inflight") + , _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind()) , _client(client) , _client_cache(client_cache) - , _ctx{{{{}, _mem_sem, {}, smp_sg}, *this}, + , _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}, {config::always_true(), config::shard_local_cfg().superusers.bind(), controller}, _config.pandaproxy_api.value()} , _server( @@ -125,7 +128,10 @@ proxy::proxy( _ctx, json::serialization_format::application_json) , _ensure_started{[this]() { return do_start(); }} - , _controller(controller) {} + , _controller(controller) { + _inflight_config_binding.watch( + [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); +} ss::future<> proxy::start() { _server.routes(get_proxy_routes(_gate, _ensure_started)); diff --git a/src/v/pandaproxy/rest/proxy.h b/src/v/pandaproxy/rest/proxy.h index 0b52fcb65a9cc..aa4f253459af2 100644 --- a/src/v/pandaproxy/rest/proxy.h +++ b/src/v/pandaproxy/rest/proxy.h @@ -18,6 +18,7 @@ #include "pandaproxy/server.h" #include "pandaproxy/util.h" #include "security/request_auth.h" +#include "utils/adjustable_semaphore.h" #include #include @@ -56,6 +57,8 @@ class proxy : public ss::peering_sharded_service { configuration _config; ssx::semaphore _mem_sem; + adjustable_semaphore _inflight_sem; + config::binding _inflight_config_binding; ss::gate _gate; ss::sharded& _client; ss::sharded& _client_cache; diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index da85d2d008617..2e2f02124ff0b 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -13,6 +13,7 @@ #include "cluster/ephemeral_credential_frontend.h" #include "cluster/members_table.h" #include "cluster/security_frontend.h" +#include "config/configuration.h" #include "kafka/client/brokers.h" #include "kafka/client/client_fetch_batch_reader.h" #include "kafka/client/config_utils.h" @@ -502,8 +503,13 @@ service::service( ss::sharded& audit_mgr) : _config(config) , _mem_sem(max_memory, "pproxy/schema-svc") + , _inflight_sem(config::shard_local_cfg() + .max_in_flight_schema_registry_requests_per_shard()) + , _inflight_config_binding( + config::shard_local_cfg() + .max_in_flight_schema_registry_requests_per_shard.bind()) , _client(client) - , _ctx{{{}, _mem_sem, {}, smp_sg}, *this} + , _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this} , _server( "schema_registry", // server_name "schema_registry", // public_metric_group_name @@ -520,7 +526,10 @@ service::service( , _auth{ config::always_true(), config::shard_local_cfg().superusers.bind(), - controller.get()} {} + controller.get()} { + _inflight_config_binding.watch( + [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); +} ss::future<> service::start() { co_await configure(); diff --git a/src/v/pandaproxy/schema_registry/service.h b/src/v/pandaproxy/schema_registry/service.h index d2a47830ca65f..b55b581f64a41 100644 --- a/src/v/pandaproxy/schema_registry/service.h +++ b/src/v/pandaproxy/schema_registry/service.h @@ -20,6 +20,7 @@ #include "pandaproxy/util.h" #include "security/fwd.h" #include "security/request_auth.h" +#include "utils/adjustable_semaphore.h" #include #include @@ -70,6 +71,8 @@ class service : public ss::peering_sharded_service { ss::future<> fetch_internal_topic(); configuration _config; ssx::semaphore _mem_sem; + adjustable_semaphore _inflight_sem; + config::binding _inflight_config_binding; ss::gate _gate; ss::sharded& _client; ctx_server::context_t _ctx; diff --git a/src/v/pandaproxy/server.cc b/src/v/pandaproxy/server.cc index be9066edb2b20..ad1149b743593 100644 --- a/src/v/pandaproxy/server.cc +++ b/src/v/pandaproxy/server.cc @@ -91,30 +91,41 @@ struct handler_adaptor : ss::httpd::handler_base { auto guard = ss::gate::holder(_pending_requests); server::request_t rq{std::move(req), this->_ctx}; server::reply_t rp{std::move(rep)}; + const auto set_and_measure_response = + [&measure](const server::reply_t& rp) { + set_mime_type(*rp.rep, rp.mime_type); + measure.set_status(rp.rep->_status); + }; + auto inflight_units = _ctx.inflight_sem.try_get_units(1); + if (!inflight_units) { + set_reply_too_many_requests(*rp.rep); + rp.mime_type = _exceptional_mime_type; + set_and_measure_response(rp); + co_return std::move(rp.rep); + } auto req_size = get_request_size(*rq.req); auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size); if (_ctx.as.abort_requested()) { set_reply_unavailable(*rp.rep); rp.mime_type = _exceptional_mime_type; - } else { - auto method = rq.req->_method; - auto url = rq.req->_url; - try { - rp = co_await _handler(std::move(rq), std::move(rp)); - } catch (...) { - auto ex = std::current_exception(); - vlog( - plog.warn, - "Request: {} {} failed: {}", - method, - url, - std::current_exception()); - rp = server::reply_t{ - exception_reply(ex), _exceptional_mime_type}; - } + set_and_measure_response(rp); + co_return std::move(rp.rep); + } + auto method = rq.req->_method; + auto url = rq.req->_url; + try { + rp = co_await _handler(std::move(rq), std::move(rp)); + } catch (...) { + auto ex = std::current_exception(); + vlog( + plog.warn, + "Request: {} {} failed: {}", + method, + url, + std::current_exception()); + rp = server::reply_t{exception_reply(ex), _exceptional_mime_type}; } - set_mime_type(*rp.rep, rp.mime_type); - measure.set_status(rp.rep->_status); + set_and_measure_response(rp); co_return std::move(rp.rep); } diff --git a/src/v/pandaproxy/server.h b/src/v/pandaproxy/server.h index 34760b091e7ec..93de1d6b5b931 100644 --- a/src/v/pandaproxy/server.h +++ b/src/v/pandaproxy/server.h @@ -21,6 +21,7 @@ #include "pandaproxy/kafka_client_cache.h" #include "pandaproxy/types.h" #include "security/request_auth.h" +#include "utils/adjustable_semaphore.h" #include #include @@ -70,6 +71,7 @@ class server { struct context_t { std::vector advertised_listeners; ssx::semaphore& mem_sem; + adjustable_semaphore& inflight_sem; ss::abort_source as; ss::smp_service_group smp_sg; }; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 8016417c0737b..c0cb5f83e4fe8 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -539,7 +539,10 @@ void application::initialize( .raft_group_max_non_local_requests = config::shard_local_cfg().raft_smp_max_non_local_requests().value_or( smp_groups::default_raft_non_local_requests( - config::shard_local_cfg().topic_partitions_per_shard()))}; + config::shard_local_cfg().topic_partitions_per_shard())), + .proxy_group_max_non_local_requests + = config::shard_local_cfg().pp_sr_smp_max_non_local_requests().value_or( + smp_groups::default_max_nonlocal_requests)}; smp_service_groups.create_groups(smp_groups_cfg).get(); _deferred.emplace_back( diff --git a/src/v/utils/adjustable_semaphore.h b/src/v/utils/adjustable_semaphore.h index b99053ffbaa75..2063b04648c7d 100644 --- a/src/v/utils/adjustable_semaphore.h +++ b/src/v/utils/adjustable_semaphore.h @@ -90,6 +90,14 @@ class adjustable_semaphore { return ss::get_units(_sem, units, as); } + /** + * Attempts to immediately get units from the semaphore, returning + * std::nullopt if no units exist. + */ + std::optional try_get_units(size_t units) { + return ss::try_get_units(_sem, units); + } + size_t current() const noexcept { return _sem.current(); } ssize_t available_units() const noexcept { return _sem.available_units(); }