Skip to content

Commit

Permalink
pp/sr: Added handling of inflight semaphore
Browse files Browse the repository at this point in the history
If inflight semaphore is exhausted, then return a 429 error.

Signed-off-by: Michael Boquard <michael@redpanda.com>
  • Loading branch information
michael-redpanda committed Jan 12, 2024
1 parent dc0d6cf commit 5a4d0ed
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
5 changes: 5 additions & 0 deletions src/v/pandaproxy/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ inline ss::http::reply& set_reply_unavailable(ss::http::reply& rep) {
.add_header("Retry-After", "0");
}

inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) {
return rep.set_status(ss::http::reply::status_type::too_many_requests)
.add_header("Retry-After", "0");
}

inline std::unique_ptr<ss::http::reply> reply_unavailable() {
auto rep = std::make_unique<ss::http::reply>(ss::http::reply{});
set_reply_unavailable(*rep);
Expand Down
47 changes: 29 additions & 18 deletions src/v/pandaproxy/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,41 @@ struct handler_adaptor : ss::httpd::handler_base {
auto guard = ss::gate::holder(_pending_requests);
server::request_t rq{std::move(req), this->_ctx};
server::reply_t rp{std::move(rep)};
const auto set_and_measure_response =
[&measure](const server::reply_t& rp) {
set_mime_type(*rp.rep, rp.mime_type);
measure.set_status(rp.rep->_status);
};
auto inflight_units = _ctx.inflight_sem.try_get_units(1);
if (!inflight_units) {
set_reply_too_many_requests(*rp.rep);
rp.mime_type = _exceptional_mime_type;
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto req_size = get_request_size(*rq.req);
auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size);
if (_ctx.as.abort_requested()) {
set_reply_unavailable(*rp.rep);
rp.mime_type = _exceptional_mime_type;
} else {
auto method = rq.req->_method;
auto url = rq.req->_url;
try {
rp = co_await _handler(std::move(rq), std::move(rp));
} catch (...) {
auto ex = std::current_exception();
vlog(
plog.warn,
"Request: {} {} failed: {}",
method,
url,
std::current_exception());
rp = server::reply_t{
exception_reply(ex), _exceptional_mime_type};
}
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto method = rq.req->_method;
auto url = rq.req->_url;
try {
rp = co_await _handler(std::move(rq), std::move(rp));
} catch (...) {
auto ex = std::current_exception();
vlog(
plog.warn,
"Request: {} {} failed: {}",
method,
url,
std::current_exception());
rp = server::reply_t{exception_reply(ex), _exceptional_mime_type};
}
set_mime_type(*rp.rep, rp.mime_type);
measure.set_status(rp.rep->_status);
set_and_measure_response(rp);
co_return std::move(rp.rep);
}

Expand Down

0 comments on commit 5a4d0ed

Please sign in to comment.