From 9b11ec33c29aae78acbd12362eaffec20f755c84 Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 5 Jan 2024 06:25:49 -0500 Subject: [PATCH 1/8] config: Added config to set SG for proxy Signed-off-by: Michael Boquard --- src/v/config/configuration.cc | 8 ++++++++ src/v/config/configuration.h | 2 ++ 2 files changed, 10 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index f0d7db4a92ec..68cf6d73b8df 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2829,6 +2829,14 @@ configuration::configuration() "Per-shard capacity of the cache for validating schema IDs.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, 128) + , pp_sr_smp_max_non_local_requests( + *this, + "pp_sr_smp_max_non_local_requests", + "Maximum number of x-core requests pending in Panda Proxy and Schema " + "Registry seastar::smp group. (for more details look at " + "`seastar::smp_service_group` documentation)", + {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, + std::nullopt) , kafka_memory_share_for_fetch( *this, "kafka_memory_share_for_fetch", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index be3949c92b66..7bb2398411d9 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -529,6 +529,8 @@ struct configuration final : public config_store { enable_schema_id_validation; config::property kafka_schema_id_validation_cache_capacity; + property> pp_sr_smp_max_non_local_requests; + bounded_property kafka_memory_share_for_fetch; property kafka_memory_batch_size_estimate_for_fetch; // debug controls From 9bedb5e8712646e26a6acecbfee5943446b2b4fa Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 5 Jan 2024 12:21:07 -0500 Subject: [PATCH 2/8] app: Wire up PP SR x-shard config Signed-off-by: Michael Boquard --- src/v/redpanda/application.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index b68a51aab405..8caebf5b7b3a 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -535,7 +535,10 @@ void application::initialize( .raft_group_max_non_local_requests = config::shard_local_cfg().raft_smp_max_non_local_requests().value_or( smp_groups::default_raft_non_local_requests( - config::shard_local_cfg().topic_partitions_per_shard()))}; + config::shard_local_cfg().topic_partitions_per_shard())), + .proxy_group_max_non_local_requests + = config::shard_local_cfg().pp_sr_smp_max_non_local_requests().value_or( + smp_groups::default_max_nonlocal_requests)}; smp_service_groups.create_groups(smp_groups_cfg).get(); _deferred.emplace_back( From f7707d21071bf9d15a7b7bd0c58af222d5dd79ff Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 12 Jan 2024 11:05:02 -0500 Subject: [PATCH 3/8] config: Added config for in flight PP/SR restriction Signed-off-by: Michael Boquard --- src/v/config/configuration.cc | 18 ++++++++++++++++++ src/v/config/configuration.h | 2 ++ 2 files changed, 20 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 68cf6d73b8df..a388a2fb86e2 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2837,6 +2837,24 @@ configuration::configuration() "`seastar::smp_service_group` documentation)", {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, std::nullopt) + , max_in_flight_schema_registry_requests_per_shard( + *this, + "max_in_flight_schema_registry_requests_per_shard", + "Maximum number of in flight HTTP requests permitted in schema registry " + "per shard. Any additional requests above this limit will be rejected " + "with a 429 error", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 500, + {.min = 1}) + , max_in_flight_pandaproxy_requests_per_shard( + *this, + "max_in_flight_pandaproxy_requests_per_shard", + "Maximum number of in flight HTTP requests permitted in pandaproxy per " + "shard. Any additional requests above this limit will be rejected with " + "a 429 error", + {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, + 500, + {.min = 1}) , kafka_memory_share_for_fetch( *this, "kafka_memory_share_for_fetch", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 7bb2398411d9..743bba49654c 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -530,6 +530,8 @@ struct configuration final : public config_store { config::property kafka_schema_id_validation_cache_capacity; property> pp_sr_smp_max_non_local_requests; + bounded_property max_in_flight_schema_registry_requests_per_shard; + bounded_property max_in_flight_pandaproxy_requests_per_shard; bounded_property kafka_memory_share_for_fetch; property kafka_memory_batch_size_estimate_for_fetch; From e088ac31a7d2930d7714a488c00b21954331dd5d Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 12 Jan 2024 16:09:30 -0500 Subject: [PATCH 4/8] utils: Added try_get_units to adjustable_semaphore Signed-off-by: Michael Boquard --- src/v/utils/adjustable_semaphore.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/v/utils/adjustable_semaphore.h b/src/v/utils/adjustable_semaphore.h index b99053ffbaa7..2063b04648c7 100644 --- a/src/v/utils/adjustable_semaphore.h +++ b/src/v/utils/adjustable_semaphore.h @@ -90,6 +90,14 @@ class adjustable_semaphore { return ss::get_units(_sem, units, as); } + /** + * Attempts to immediately get units from the semaphore, returning + * std::nullopt if no units exist. + */ + std::optional try_get_units(size_t units) { + return ss::try_get_units(_sem, units); + } + size_t current() const noexcept { return _sem.current(); } ssize_t available_units() const noexcept { return _sem.available_units(); } From 260d1d5b074dbf47424b90acefaa1a7fb8af79c7 Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 12 Jan 2024 16:21:04 -0500 Subject: [PATCH 5/8] pp: Wire in in-flight semaphore Signed-off-by: Michael Boquard --- src/v/pandaproxy/rest/proxy.cc | 8 +++++++- src/v/pandaproxy/rest/proxy.h | 3 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/v/pandaproxy/rest/proxy.cc b/src/v/pandaproxy/rest/proxy.cc index ff0ff27a60c1..fb8146baee1b 100644 --- a/src/v/pandaproxy/rest/proxy.cc +++ b/src/v/pandaproxy/rest/proxy.cc @@ -12,6 +12,7 @@ #include "cluster/controller.h" #include "cluster/ephemeral_credential_frontend.h" #include "cluster/members_table.h" +#include "config/configuration.h" #include "kafka/client/config_utils.h" #include "net/unresolved_address.h" #include "pandaproxy/api/api-doc/rest.json.hh" @@ -111,6 +112,8 @@ proxy::proxy( cluster::controller* controller) : _config(config) , _mem_sem(max_memory, "pproxy/mem") + , _inflight_sem(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard(), "pproxy/inflight") + , _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind()) , _client(client) , _client_cache(client_cache) , _ctx{{{{}, _mem_sem, {}, smp_sg}, *this}, @@ -125,7 +128,10 @@ proxy::proxy( _ctx, json::serialization_format::application_json) , _ensure_started{[this]() { return do_start(); }} - , _controller(controller) {} + , _controller(controller) { + _inflight_config_binding.watch( + [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); +} ss::future<> proxy::start() { _server.routes(get_proxy_routes(_gate, _ensure_started)); diff --git a/src/v/pandaproxy/rest/proxy.h b/src/v/pandaproxy/rest/proxy.h index 0b52fcb65a9c..aa4f253459af 100644 --- a/src/v/pandaproxy/rest/proxy.h +++ b/src/v/pandaproxy/rest/proxy.h @@ -18,6 +18,7 @@ #include "pandaproxy/server.h" #include "pandaproxy/util.h" #include "security/request_auth.h" +#include "utils/adjustable_semaphore.h" #include #include @@ -56,6 +57,8 @@ class proxy : public ss::peering_sharded_service { configuration _config; ssx::semaphore _mem_sem; + adjustable_semaphore _inflight_sem; + config::binding _inflight_config_binding; ss::gate _gate; ss::sharded& _client; ss::sharded& _client_cache; From b9c03840cd07ee48c899bfe2fff7eb88c19ebc35 Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 12 Jan 2024 16:21:44 -0500 Subject: [PATCH 6/8] sr: Wired in in-flight sempahore Signed-off-by: Michael Boquard --- src/v/pandaproxy/schema_registry/service.cc | 11 ++++++++++- src/v/pandaproxy/schema_registry/service.h | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index da85d2d00861..5c4693a6e15b 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -13,6 +13,7 @@ #include "cluster/ephemeral_credential_frontend.h" #include "cluster/members_table.h" #include "cluster/security_frontend.h" +#include "config/configuration.h" #include "kafka/client/brokers.h" #include "kafka/client/client_fetch_batch_reader.h" #include "kafka/client/config_utils.h" @@ -502,6 +503,11 @@ service::service( ss::sharded& audit_mgr) : _config(config) , _mem_sem(max_memory, "pproxy/schema-svc") + , _inflight_sem(config::shard_local_cfg() + .max_in_flight_schema_registry_requests_per_shard()) + , _inflight_config_binding( + config::shard_local_cfg() + .max_in_flight_schema_registry_requests_per_shard.bind()) , _client(client) , _ctx{{{}, _mem_sem, {}, smp_sg}, *this} , _server( @@ -520,7 +526,10 @@ service::service( , _auth{ config::always_true(), config::shard_local_cfg().superusers.bind(), - controller.get()} {} + controller.get()} { + _inflight_config_binding.watch( + [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); +} ss::future<> service::start() { co_await configure(); diff --git a/src/v/pandaproxy/schema_registry/service.h b/src/v/pandaproxy/schema_registry/service.h index d2a47830ca65..b55b581f64a4 100644 --- a/src/v/pandaproxy/schema_registry/service.h +++ b/src/v/pandaproxy/schema_registry/service.h @@ -20,6 +20,7 @@ #include "pandaproxy/util.h" #include "security/fwd.h" #include "security/request_auth.h" +#include "utils/adjustable_semaphore.h" #include #include @@ -70,6 +71,8 @@ class service : public ss::peering_sharded_service { ss::future<> fetch_internal_topic(); configuration _config; ssx::semaphore _mem_sem; + adjustable_semaphore _inflight_sem; + config::binding _inflight_config_binding; ss::gate _gate; ss::sharded& _client; ctx_server::context_t _ctx; From dc0d6cf30f07680b6f83d3d28b3dad0936884ffe Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 12 Jan 2024 16:23:47 -0500 Subject: [PATCH 7/8] pp/sr: Added adjustable inflight semaphore to context Signed-off-by: Michael Boquard --- src/v/pandaproxy/rest/proxy.cc | 2 +- src/v/pandaproxy/schema_registry/service.cc | 2 +- src/v/pandaproxy/server.h | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/v/pandaproxy/rest/proxy.cc b/src/v/pandaproxy/rest/proxy.cc index fb8146baee1b..e886090b414c 100644 --- a/src/v/pandaproxy/rest/proxy.cc +++ b/src/v/pandaproxy/rest/proxy.cc @@ -116,7 +116,7 @@ proxy::proxy( , _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind()) , _client(client) , _client_cache(client_cache) - , _ctx{{{{}, _mem_sem, {}, smp_sg}, *this}, + , _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}, {config::always_true(), config::shard_local_cfg().superusers.bind(), controller}, _config.pandaproxy_api.value()} , _server( diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index 5c4693a6e15b..2e2f02124ff0 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -509,7 +509,7 @@ service::service( config::shard_local_cfg() .max_in_flight_schema_registry_requests_per_shard.bind()) , _client(client) - , _ctx{{{}, _mem_sem, {}, smp_sg}, *this} + , _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this} , _server( "schema_registry", // server_name "schema_registry", // public_metric_group_name diff --git a/src/v/pandaproxy/server.h b/src/v/pandaproxy/server.h index 34760b091e7e..93de1d6b5b93 100644 --- a/src/v/pandaproxy/server.h +++ b/src/v/pandaproxy/server.h @@ -21,6 +21,7 @@ #include "pandaproxy/kafka_client_cache.h" #include "pandaproxy/types.h" #include "security/request_auth.h" +#include "utils/adjustable_semaphore.h" #include #include @@ -70,6 +71,7 @@ class server { struct context_t { std::vector advertised_listeners; ssx::semaphore& mem_sem; + adjustable_semaphore& inflight_sem; ss::abort_source as; ss::smp_service_group smp_sg; }; From 5a4d0ed181dcec80262044d8ec6ed53dfb1aff6b Mon Sep 17 00:00:00 2001 From: Michael Boquard Date: Fri, 12 Jan 2024 16:24:13 -0500 Subject: [PATCH 8/8] pp/sr: Added handling of inflight semaphore If inflight semaphore is exhausted, then return a 429 error. Signed-off-by: Michael Boquard --- src/v/pandaproxy/reply.h | 5 ++++ src/v/pandaproxy/server.cc | 47 +++++++++++++++++++++++--------------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/v/pandaproxy/reply.h b/src/v/pandaproxy/reply.h index 7cbf6058a4ee..b314b58ed48e 100644 --- a/src/v/pandaproxy/reply.h +++ b/src/v/pandaproxy/reply.h @@ -60,6 +60,11 @@ inline ss::http::reply& set_reply_unavailable(ss::http::reply& rep) { .add_header("Retry-After", "0"); } +inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) { + return rep.set_status(ss::http::reply::status_type::too_many_requests) + .add_header("Retry-After", "0"); +} + inline std::unique_ptr reply_unavailable() { auto rep = std::make_unique(ss::http::reply{}); set_reply_unavailable(*rep); diff --git a/src/v/pandaproxy/server.cc b/src/v/pandaproxy/server.cc index be9066edb2b2..ad1149b74359 100644 --- a/src/v/pandaproxy/server.cc +++ b/src/v/pandaproxy/server.cc @@ -91,30 +91,41 @@ struct handler_adaptor : ss::httpd::handler_base { auto guard = ss::gate::holder(_pending_requests); server::request_t rq{std::move(req), this->_ctx}; server::reply_t rp{std::move(rep)}; + const auto set_and_measure_response = + [&measure](const server::reply_t& rp) { + set_mime_type(*rp.rep, rp.mime_type); + measure.set_status(rp.rep->_status); + }; + auto inflight_units = _ctx.inflight_sem.try_get_units(1); + if (!inflight_units) { + set_reply_too_many_requests(*rp.rep); + rp.mime_type = _exceptional_mime_type; + set_and_measure_response(rp); + co_return std::move(rp.rep); + } auto req_size = get_request_size(*rq.req); auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size); if (_ctx.as.abort_requested()) { set_reply_unavailable(*rp.rep); rp.mime_type = _exceptional_mime_type; - } else { - auto method = rq.req->_method; - auto url = rq.req->_url; - try { - rp = co_await _handler(std::move(rq), std::move(rp)); - } catch (...) { - auto ex = std::current_exception(); - vlog( - plog.warn, - "Request: {} {} failed: {}", - method, - url, - std::current_exception()); - rp = server::reply_t{ - exception_reply(ex), _exceptional_mime_type}; - } + set_and_measure_response(rp); + co_return std::move(rp.rep); + } + auto method = rq.req->_method; + auto url = rq.req->_url; + try { + rp = co_await _handler(std::move(rq), std::move(rp)); + } catch (...) { + auto ex = std::current_exception(); + vlog( + plog.warn, + "Request: {} {} failed: {}", + method, + url, + std::current_exception()); + rp = server::reply_t{exception_reply(ex), _exceptional_mime_type}; } - set_mime_type(*rp.rep, rp.mime_type); - measure.set_status(rp.rep->_status); + set_and_measure_response(rp); co_return std::move(rp.rep); }