Skip to content

Commit

Permalink
pp: Added check for available cross shard units
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Boquard <michael@redpanda.com>
  • Loading branch information
michael-redpanda committed Jan 10, 2024
1 parent 0ae4b1a commit 3359a67
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/v/pandaproxy/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/smp.hh>
#include <seastar/http/api_docs.hh>
#include <seastar/http/exception.hh>
#include <seastar/http/handlers.hh>
#include <seastar/http/httpd.hh>
#include <seastar/http/json_path.hh>
Expand Down Expand Up @@ -194,9 +196,21 @@ class auth_ctx_server : public ctx_server<service_t> {

template<std::invocable<kafka_client_cache&> Func>
auto dispatch(Func&& func) {
auto shard = user_shard(user.name);
// If running single core, the service group semaphore has a count
// of 0
if (ss::smp::count > 1) {
const auto& sg_sem = ss::get_smp_service_groups_semaphore(
ss::internal::smp_service_group_id(context().smp_sg), shard);
if (sg_sem.available_units() <= 0) {
throw ss::httpd::base_exception(
"Exhausted cross shard semaphore.",
ss::http::reply::status_type::internal_server_error);
}
}
// Access the cache on the appropriate shard.
return service().client_cache().invoke_on(
user_shard(user.name),
shard,
ss::smp_submit_to_options{context().smp_sg},
[func{std::forward<Func>(func)}](
kafka_client_cache& cache) mutable {
Expand Down

0 comments on commit 3359a67

Please sign in to comment.