diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index b81a85382f0e2..951b0ea012576 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -101,7 +101,7 @@ server::server( ss::sharded& tx_gateway_frontend, ss::sharded& tx_registry_frontend, std::optional qdc_config, - ssx::thread_worker& tw, + ssx::singleton_thread_worker& tw, const std::unique_ptr& sr) noexcept : net::server(cfg, klog) , _smp_group(smp) diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index afa0d3fbfe5e4..c2fcddd495c86 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -65,7 +65,7 @@ class server final ss::sharded&, ss::sharded&, std::optional, - ssx::thread_worker&, + ssx::singleton_thread_worker&, const std::unique_ptr&) noexcept; ~server() noexcept override = default; @@ -161,7 +161,7 @@ class server final latency_probe& latency_probe() { return *_probe; } - ssx::thread_worker& thread_worker() { return _thread_worker; } + ssx::singleton_thread_worker& thread_worker() { return _thread_worker; } const std::unique_ptr& schema_registry() { return _schema_registry; @@ -222,7 +222,7 @@ class server final ssx::metrics::metric_groups _metrics = ssx::metrics::metric_groups::make_internal(); std::unique_ptr _probe; - ssx::thread_worker& _thread_worker; + ssx::singleton_thread_worker& _thread_worker; std::unique_ptr _replica_selector; const std::unique_ptr& _schema_registry; }; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 05192fe3fae7d..c1d61f9ef9122 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -2132,7 +2132,7 @@ void application::start_runtime_services( }); }); - thread_worker->start().get(); + thread_worker->start({.name = "worker"}).get(); // single instance node_status_backend.invoke_on_all(&cluster::node_status_backend::start) diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 3aa0ae8ac49a4..0a54199c4d814 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -143,7 +143,7 @@ class application { std::unique_ptr controller; - std::unique_ptr thread_worker; + std::unique_ptr thread_worker; ss::sharded _kafka_server; diff --git a/src/v/security/gssapi_authenticator.cc b/src/v/security/gssapi_authenticator.cc index 873ce420efbe9..09d62e69f9156 100644 --- a/src/v/security/gssapi_authenticator.cc +++ b/src/v/security/gssapi_authenticator.cc @@ -169,7 +169,7 @@ class gssapi_authenticator::impl { }; gssapi_authenticator::gssapi_authenticator( - ssx::thread_worker& thread_worker, + ssx::singleton_thread_worker& thread_worker, std::vector rules, ss::sstring principal, ss::sstring keytab) diff --git a/src/v/security/gssapi_authenticator.h b/src/v/security/gssapi_authenticator.h index 36a62a6943c15..3908009dedbb0 100644 --- a/src/v/security/gssapi_authenticator.h +++ b/src/v/security/gssapi_authenticator.h @@ -21,7 +21,7 @@ class gssapi_authenticator final : public sasl_mechanism { static constexpr const char* name = "GSSAPI"; gssapi_authenticator( - ssx::thread_worker& thread_worker, + ssx::singleton_thread_worker& thread_worker, std::vector rules, ss::sstring principal, ss::sstring keytab); @@ -40,7 +40,7 @@ class gssapi_authenticator final : public sasl_mechanism { friend std::ostream& operator<<(std::ostream& os, gssapi_authenticator::state const s); - ssx::thread_worker& _worker; + ssx::singleton_thread_worker& _worker; security::acl_principal _principal; state _state{state::init}; class impl; diff --git a/src/v/ssx/fwd.h b/src/v/ssx/fwd.h index 04717c769f53d..2decf6c83de28 100644 --- a/src/v/ssx/fwd.h +++ b/src/v/ssx/fwd.h @@ -13,6 +13,6 @@ namespace ssx { -class thread_worker; +class singleton_thread_worker; } // namespace ssx diff --git a/src/v/ssx/tests/thread_worker.cc b/src/v/ssx/tests/thread_worker.cc index 95d1d7736c3ca..cb33c31e107d7 100644 --- a/src/v/ssx/tests/thread_worker.cc +++ b/src/v/ssx/tests/thread_worker.cc @@ -34,8 +34,8 @@ template auto thread_worker_test() { BOOST_REQUIRE_GT(ss::smp::count, 1); - auto w = ssx::thread_worker{}; - w.start().get(); + auto w = ssx::singleton_thread_worker{}; + w.start({}).get(); std::vector>> all_results(ss::smp::count); @@ -73,6 +73,13 @@ auto thread_worker_test() { } } +SEASTAR_THREAD_TEST_CASE(thread_worker_can_be_stopped_before_its_started) { + // During application startup, we register a defer for stopping before the + // worker is started. + auto w = ssx::singleton_thread_worker{}; + w.stop().get(); +} + SEASTAR_THREAD_TEST_CASE(thread_worker_single_cancel_after) { // cancel thread worker at the after all of the gets thread_worker_test<1, 1>(); diff --git a/src/v/ssx/tests/thread_worker_bench.cc b/src/v/ssx/tests/thread_worker_bench.cc index e3167eda103a3..ba9850fe5f1ee 100644 --- a/src/v/ssx/tests/thread_worker_bench.cc +++ b/src/v/ssx/tests/thread_worker_bench.cc @@ -13,8 +13,8 @@ #include ss::future<> run_test(size_t data_size) { - auto w = ssx::thread_worker{}; - co_await w.start(); + auto w = ssx::singleton_thread_worker{}; + co_await w.start({}); std::vector> vec; vec.reserve(data_size); diff --git a/src/v/ssx/thread_worker.h b/src/v/ssx/thread_worker.h index 357cb10dfaf5a..3e0fbace01ab5 100644 --- a/src/v/ssx/thread_worker.h +++ b/src/v/ssx/thread_worker.h @@ -21,14 +21,20 @@ #include #include #include +#include +#include #include +#include #include #include +#include #include #include #include +#include +#include #include #include @@ -44,13 +50,36 @@ class task_base { task_base& operator=(task_base&&) = delete; task_base& operator=(task_base const&) = delete; - virtual void process(ss::alien::instance&, ss::shard_id) = 0; + virtual ss::stop_iteration process(ss::alien::instance&, ss::shard_id) = 0; virtual void set_exception(ss::alien::instance&, ss::shard_id, std::exception_ptr) = 0; virtual ~task_base() = default; }; +class stop_task final : public task_base { +public: + ss::future<> get_future() noexcept { return _promise.get_future(); } + + ss::stop_iteration + process(ss::alien::instance& alien, ss::shard_id shard) final { + ss::alien::run_on( + alien, shard, [this]() noexcept { _promise.set_value(); }); + return ss::stop_iteration::yes; + } + + void set_exception( + ss::alien::instance& alien, + ss::shard_id shard, + std::exception_ptr p) final { + ss::alien::run_on( + alien, shard, [this, p]() noexcept { _promise.set_exception(p); }); + } + +private: + ss::promise<> _promise; +}; + template class worker_task final : public task_base { using value_type = std::invoke_result_t; @@ -63,7 +92,8 @@ class worker_task final : public task_base { return _promise.get_future(); } - void process(ss::alien::instance& alien, ss::shard_id shard) final { + ss::stop_iteration + process(ss::alien::instance& alien, ss::shard_id shard) final { try { if constexpr (std::is_void_v) { _func(); @@ -79,6 +109,7 @@ class worker_task final : public task_base { } catch (...) { set_exception(alien, shard, std::current_exception()); }; + return ss::stop_iteration::no; } void set_exception( @@ -95,88 +126,138 @@ class worker_task final : public task_base { }; class thread_worker { - static constexpr eventfd_t RUNNABLE_INIT = 0; - static constexpr eventfd_t RUNNABLE_READY = 1; - static constexpr eventfd_t RUNNABLE_STOP = 2; - public: - thread_worker() - : _alien{ss::engine().alien()} + struct config { + // If the thread should be pinned to a the same core as the shard it was + // created on. + bool pin_to_shard_core = false; + // NOTE: pthread names must be less than 12 bytes + // + // Why 12 bytes? pthread only support names of length 16 and we want to + // suffix threads with the core it's associated with via `-xxx`. + ss::sstring name = "thread"; + }; + + explicit thread_worker() + : _alien(&ss::engine().alien()) + , _cpu_id(::sched_getcpu()) , _shard_id(ss::this_shard_id()) {} - void start() { - _worker = std::thread([this]() { return run(); }); + void start(config c) { + _worker = std::thread([this, c = std::move(c)]() mutable { + configure_thread(std::move(c)); + run(); + }); } ss::future<> stop() { co_await _gate.close(); - _ready.signal(RUNNABLE_STOP); if (_worker.joinable()) { + auto task = impl::stop_task(); + vassert( + _queue.push(&task), + "expected to be able to push a task onto the queue"); + _ready.signal(1); + co_await task.get_future(); _worker.join(); } } template - auto submit(Func func) { + auto submit(Func func) -> + typename ss::futurize>::type { auto gh = _gate.hold(); - return _mutex.get_units().then( - [this, func{std::move(func)}, gh{std::move(gh)}](auto units) mutable { - vassert(_task == nullptr, "Only one task supported at a time"); - auto task = std::make_unique>( - std::move(func)); - auto f = task->get_future().finally( - [this, gh{std::move(gh)}, units{std::move(units)}] { - _task.reset(); - }); - _task = std::move(task); - _ready.signal(RUNNABLE_READY); - return f; - }); + auto units = co_await ss::get_units(_semaphore, 1); + auto task = impl::worker_task(std::move(func)); + vassert( + _queue.push(&task), + "expected to be able to push a task onto the queue"); + _ready.signal(1); + co_return co_await task.get_future(); } private: - void configure_thread() { - ::pthread_setname_np(::pthread_self(), "ssx::thread_worker"); + void configure_thread(config c) { + auto name = ss::format("{}-{}", c.name, _shard_id); + ss::throw_pthread_error( + ::pthread_setname_np(::pthread_self(), name.c_str())); + if (c.pin_to_shard_core) { + cpu_set_t cs; + CPU_ZERO(&cs); + CPU_SET(_cpu_id, &cs); + ss::throw_pthread_error( + pthread_setaffinity_np(pthread_self(), sizeof(cs), &cs)); + } // Ignore all signals - let seastar handle them sigset_t mask; ::sigfillset(&mask); ss::throw_pthread_error(::pthread_sigmask(SIG_BLOCK, &mask, nullptr)); } - size_t run() { - configure_thread(); + + void run() { + std::array items{}; while (true) { eventfd_t result = 0; auto r = ::eventfd_read(_ready.get_read_fd(), &result); - if (r < 0 || result != RUNNABLE_READY) { - if (_task) { - _task->set_exception( - _alien, - _shard_id, - std::make_exception_ptr(ss::abort_requested_exception{})); + while (!_queue.empty()) { + size_t num_tasks = _queue.pop(items.data(), items.size()); + auto stop_iteration = process_tasks({items.data(), num_tasks}); + if (stop_iteration == ss::stop_iteration::yes) { + // stop was requested, we can be done + vassert( + _queue.empty(), + "expected a stop task to be the last task in the task " + "queue, but there were {} available to process.", + _queue.read_available()); + return; } } if (r < 0) { - return r; - } else if (result == RUNNABLE_STOP) { - return 0; + return; } - _task->process(_alien, _shard_id); } } - ss::alien::instance& _alien; + ss::stop_iteration process_tasks(std::span tasks) { + for (size_t i = 0; i < tasks.size(); ++i) { + auto stop_iteration = tasks[i]->process(*_alien, _shard_id); + if (stop_iteration == ss::stop_iteration::yes) { + fail_tasks(tasks.subspan(i + 1)); + return ss::stop_iteration::yes; + } + } + return ss::stop_iteration::no; + } + + void fail_tasks(std::span tasks) { + for (auto* task : tasks) { + task->set_exception( + *_alien, + _shard_id, + std::make_exception_ptr(ss::abort_requested_exception{})); + } + } + + ss::alien::instance* _alien; + unsigned _cpu_id; ss::shard_id _shard_id; std::thread _worker; ss::gate _gate; - mutex _mutex; - seastar::writeable_eventfd _ready{RUNNABLE_INIT}; - std::unique_ptr _task{nullptr}; + ss::writeable_eventfd _ready; + + constexpr static size_t max_length = 128; + ss::semaphore _semaphore = ss::semaphore(max_length); + boost::lockfree:: + spsc_queue> + _queue; }; } // namespace impl /** - * thread_worker runs tasks in a std::thread. + * singleton_thread_worker runs tasks in a std::thread, it's expected that only + * a single core manages the lifetime of this shard, and all other shards submit + * their requests through that shard. * * By running in a std::thread, it's possible to make blocking calls such as * file I/O and posix thread primitives without blocking a reactor. @@ -186,21 +267,27 @@ class thread_worker { * class is most suited to run for the lifetime of an application, rather than * short-lived. */ -class thread_worker { +class singleton_thread_worker { public: static constexpr ss::shard_id shard_id = 0; - thread_worker() = default; + struct config { + // NOTE: pthread names must be less than 12 bytes + // + // Why 12 bytes? pthread only support names of length 16 and we want to + // suffix threads with the core it's associated with via `-xxx`. + ss::sstring name = "thread"; + }; /** * start the background thread. */ - ss::future<> start() { + ss::future<> start(config c) { vassert( ss::this_shard_id() == shard_id, "thread_worker must be started on shard ", shard_id); co_await _gate.start(); - _impl.start(); + _impl.start({.name = std::move(c.name)}); } /** @@ -240,4 +327,57 @@ class thread_worker { impl::thread_worker _impl; }; +/** + * sharded_thread_worker runs tasks in a std::thread, and creates a thread core + * each reactor core so that there is a twin std::thread for each reactor core. + * + * By running in a std::thread, it's possible to make blocking calls such as + * file I/O and posix thread primitives without blocking a reactor. + * + * The thread worker will drain all operations before joining the thread in + * stop(), but it should be noted that joining a thread may block. As such, this + * class is most suited to run for the lifetime of an application, rather than + * short-lived. + */ +class sharded_thread_worker { +public: + struct config { + // NOTE: pthread names must be less than 12 bytes + // + // Why 12 bytes? pthread only support names of length 16 and we want to + // suffix threads with the core it's associated with via `-xxx`. + ss::sstring name = "thread"; + }; + + /** + * start the background thread. + */ + ss::future<> start(config c) { + co_await _impl.start(); + co_await _impl.invoke_on_all([&c](impl::thread_worker& w) { + return w.start({.pin_to_shard_core = true, .name = c.name}); + }); + } + + /** + * stop and join the background thread. + * + * Although the work has completed, it should be noted that joining a thread + * may block the reactor. + */ + ss::future<> stop() { co_await _impl.stop(); } + + /** + * submit a task to the background thread + */ + template + auto submit(Func func) -> + typename ss::futurize>::type { + return _impl.local().submit(std::move(func)); + } + +private: + ss::sharded _impl; +}; + } // namespace ssx