Skip to content

Commit

Permalink
Merge pull request redpanda-data#15977 from michael-redpanda/issues/1…
Browse files Browse the repository at this point in the history
…4116

Reject PP/SR HTTP requests if cross shard semaphore has been exhausted
  • Loading branch information
michael-redpanda authored Jan 16, 2024
2 parents 41b0a4f + 5a4d0ed commit a99419e
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 23 deletions.
26 changes: 26 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2838,6 +2838,32 @@ configuration::configuration()
"Per-shard capacity of the cache for validating schema IDs.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
128)
, pp_sr_smp_max_non_local_requests(
*this,
"pp_sr_smp_max_non_local_requests",
"Maximum number of x-core requests pending in Panda Proxy and Schema "
"Registry seastar::smp group. (for more details look at "
"`seastar::smp_service_group` documentation)",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
std::nullopt)
, max_in_flight_schema_registry_requests_per_shard(
*this,
"max_in_flight_schema_registry_requests_per_shard",
"Maximum number of in flight HTTP requests permitted in schema registry "
"per shard. Any additional requests above this limit will be rejected "
"with a 429 error",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
500,
{.min = 1})
, max_in_flight_pandaproxy_requests_per_shard(
*this,
"max_in_flight_pandaproxy_requests_per_shard",
"Maximum number of in flight HTTP requests permitted in pandaproxy per "
"shard. Any additional requests above this limit will be rejected with "
"a 429 error",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
500,
{.min = 1})
, kafka_memory_share_for_fetch(
*this,
"kafka_memory_share_for_fetch",
Expand Down
4 changes: 4 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ struct configuration final : public config_store {
enable_schema_id_validation;
config::property<size_t> kafka_schema_id_validation_cache_capacity;

property<std::optional<uint32_t>> pp_sr_smp_max_non_local_requests;
bounded_property<size_t> max_in_flight_schema_registry_requests_per_shard;
bounded_property<size_t> max_in_flight_pandaproxy_requests_per_shard;

bounded_property<double, numeric_bounds> kafka_memory_share_for_fetch;
property<size_t> kafka_memory_batch_size_estimate_for_fetch;
// debug controls
Expand Down
5 changes: 5 additions & 0 deletions src/v/pandaproxy/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ inline ss::http::reply& set_reply_unavailable(ss::http::reply& rep) {
.add_header("Retry-After", "0");
}

inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) {
return rep.set_status(ss::http::reply::status_type::too_many_requests)
.add_header("Retry-After", "0");
}

inline std::unique_ptr<ss::http::reply> reply_unavailable() {
auto rep = std::make_unique<ss::http::reply>(ss::http::reply{});
set_reply_unavailable(*rep);
Expand Down
10 changes: 8 additions & 2 deletions src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "cluster/controller.h"
#include "cluster/ephemeral_credential_frontend.h"
#include "cluster/members_table.h"
#include "config/configuration.h"
#include "kafka/client/config_utils.h"
#include "net/unresolved_address.h"
#include "pandaproxy/api/api-doc/rest.json.hh"
Expand Down Expand Up @@ -111,9 +112,11 @@ proxy::proxy(
cluster::controller* controller)
: _config(config)
, _mem_sem(max_memory, "pproxy/mem")
, _inflight_sem(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard(), "pproxy/inflight")
, _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind())
, _client(client)
, _client_cache(client_cache)
, _ctx{{{{}, _mem_sem, {}, smp_sg}, *this},
, _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
{config::always_true(), config::shard_local_cfg().superusers.bind(), controller},
_config.pandaproxy_api.value()}
, _server(
Expand All @@ -125,7 +128,10 @@ proxy::proxy(
_ctx,
json::serialization_format::application_json)
, _ensure_started{[this]() { return do_start(); }}
, _controller(controller) {}
, _controller(controller) {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
}

ss::future<> proxy::start() {
_server.routes(get_proxy_routes(_gate, _ensure_started));
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/rest/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "pandaproxy/server.h"
#include "pandaproxy/util.h"
#include "security/request_auth.h"
#include "utils/adjustable_semaphore.h"

#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -56,6 +57,8 @@ class proxy : public ss::peering_sharded_service<proxy> {

configuration _config;
ssx::semaphore _mem_sem;
adjustable_semaphore _inflight_sem;
config::binding<size_t> _inflight_config_binding;
ss::gate _gate;
ss::sharded<kafka::client::client>& _client;
ss::sharded<kafka_client_cache>& _client_cache;
Expand Down
13 changes: 11 additions & 2 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/ephemeral_credential_frontend.h"
#include "cluster/members_table.h"
#include "cluster/security_frontend.h"
#include "config/configuration.h"
#include "kafka/client/brokers.h"
#include "kafka/client/client_fetch_batch_reader.h"
#include "kafka/client/config_utils.h"
Expand Down Expand Up @@ -502,8 +503,13 @@ service::service(
ss::sharded<security::audit::audit_log_manager>& audit_mgr)
: _config(config)
, _mem_sem(max_memory, "pproxy/schema-svc")
, _inflight_sem(config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard())
, _inflight_config_binding(
config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard.bind())
, _client(client)
, _ctx{{{}, _mem_sem, {}, smp_sg}, *this}
, _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
, _server(
"schema_registry", // server_name
"schema_registry", // public_metric_group_name
Expand All @@ -520,7 +526,10 @@ service::service(
, _auth{
config::always_true(),
config::shard_local_cfg().superusers.bind(),
controller.get()} {}
controller.get()} {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
}

ss::future<> service::start() {
co_await configure();
Expand Down
3 changes: 3 additions & 0 deletions src/v/pandaproxy/schema_registry/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "pandaproxy/util.h"
#include "security/fwd.h"
#include "security/request_auth.h"
#include "utils/adjustable_semaphore.h"

#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
Expand Down Expand Up @@ -70,6 +71,8 @@ class service : public ss::peering_sharded_service<service> {
ss::future<> fetch_internal_topic();
configuration _config;
ssx::semaphore _mem_sem;
adjustable_semaphore _inflight_sem;
config::binding<size_t> _inflight_config_binding;
ss::gate _gate;
ss::sharded<kafka::client::client>& _client;
ctx_server<service>::context_t _ctx;
Expand Down
47 changes: 29 additions & 18 deletions src/v/pandaproxy/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,41 @@ struct handler_adaptor : ss::httpd::handler_base {
auto guard = ss::gate::holder(_pending_requests);
server::request_t rq{std::move(req), this->_ctx};
server::reply_t rp{std::move(rep)};
const auto set_and_measure_response =
[&measure](const server::reply_t& rp) {
set_mime_type(*rp.rep, rp.mime_type);
measure.set_status(rp.rep->_status);
};
auto inflight_units = _ctx.inflight_sem.try_get_units(1);
if (!inflight_units) {
set_reply_too_many_requests(*rp.rep);
rp.mime_type = _exceptional_mime_type;
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto req_size = get_request_size(*rq.req);
auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size);
if (_ctx.as.abort_requested()) {
set_reply_unavailable(*rp.rep);
rp.mime_type = _exceptional_mime_type;
} else {
auto method = rq.req->_method;
auto url = rq.req->_url;
try {
rp = co_await _handler(std::move(rq), std::move(rp));
} catch (...) {
auto ex = std::current_exception();
vlog(
plog.warn,
"Request: {} {} failed: {}",
method,
url,
std::current_exception());
rp = server::reply_t{
exception_reply(ex), _exceptional_mime_type};
}
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto method = rq.req->_method;
auto url = rq.req->_url;
try {
rp = co_await _handler(std::move(rq), std::move(rp));
} catch (...) {
auto ex = std::current_exception();
vlog(
plog.warn,
"Request: {} {} failed: {}",
method,
url,
std::current_exception());
rp = server::reply_t{exception_reply(ex), _exceptional_mime_type};
}
set_mime_type(*rp.rep, rp.mime_type);
measure.set_status(rp.rep->_status);
set_and_measure_response(rp);
co_return std::move(rp.rep);
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "pandaproxy/kafka_client_cache.h"
#include "pandaproxy/types.h"
#include "security/request_auth.h"
#include "utils/adjustable_semaphore.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
Expand Down Expand Up @@ -70,6 +71,7 @@ class server {
struct context_t {
std::vector<net::unresolved_address> advertised_listeners;
ssx::semaphore& mem_sem;
adjustable_semaphore& inflight_sem;
ss::abort_source as;
ss::smp_service_group smp_sg;
};
Expand Down
5 changes: 4 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,10 @@ void application::initialize(
.raft_group_max_non_local_requests
= config::shard_local_cfg().raft_smp_max_non_local_requests().value_or(
smp_groups::default_raft_non_local_requests(
config::shard_local_cfg().topic_partitions_per_shard()))};
config::shard_local_cfg().topic_partitions_per_shard())),
.proxy_group_max_non_local_requests
= config::shard_local_cfg().pp_sr_smp_max_non_local_requests().value_or(
smp_groups::default_max_nonlocal_requests)};

smp_service_groups.create_groups(smp_groups_cfg).get();
_deferred.emplace_back(
Expand Down
8 changes: 8 additions & 0 deletions src/v/utils/adjustable_semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ class adjustable_semaphore {
return ss::get_units(_sem, units, as);
}

/**
* Attempts to immediately get units from the semaphore, returning
* std::nullopt if no units exist.
*/
std::optional<ssx::semaphore_units> try_get_units(size_t units) {
return ss::try_get_units(_sem, units);
}

size_t current() const noexcept { return _sem.current(); }
ssize_t available_units() const noexcept { return _sem.available_units(); }

Expand Down

0 comments on commit a99419e

Please sign in to comment.