Skip to content

Commit

Permalink
Optimize thread_worker
Browse files Browse the repository at this point in the history
Use a ring buffer to bulk pass tasks around, and also support creating a
twin worker thread for every reactor thread.

Before:

```
test                                      iterations      median         mad         min         max      allocs       tasks        inst
thread_worker_test.1                            3354   216.749us    47.062ns   216.590us   216.951us       5.000       4.000   1610844.2
thread_worker_test.10                            460     2.088ms   870.335ns     2.087ms     2.089ms      70.000      58.000  25391981.4
thread_worker_test.100                            61    16.465ms     1.733us    16.461ms    16.469ms     700.000     598.000 208106393.1
thread_worker_test.1000                           50    19.925ms     6.187us    19.919ms    19.937ms    7007.000    5998.000 214776807.2
```

After:

```
test                                      iterations      median         mad         min         max      allocs       tasks        inst
thread_worker_test.1                            1855   216.515us    77.552ns   216.330us   216.593us       4.000       4.000   1549639.0
thread_worker_test.10                            413     2.085ms   394.971ns     2.084ms     2.086ms      40.000      40.000  24921287.6
thread_worker_test.100                           113     8.518ms     3.891us     8.513ms     8.524ms     400.000     400.000 106121583.5
thread_worker_test.1000                           93    10.424ms   605.968ns    10.423ms    10.426ms    4880.000    5744.000 127114775.7
```

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
  • Loading branch information
rockwotj committed Aug 29, 2023
1 parent ee5f900 commit 76ab1e6
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ server::server(
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<cluster::tx_registry_frontend>& tx_registry_frontend,
std::optional<qdc_monitor::config> qdc_config,
ssx::thread_worker& tw,
ssx::singleton_thread_worker& tw,
const std::unique_ptr<pandaproxy::schema_registry::api>& sr) noexcept
: net::server(cfg, klog)
, _smp_group(smp)
Expand Down
6 changes: 3 additions & 3 deletions src/v/kafka/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class server final
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<cluster::tx_registry_frontend>&,
std::optional<qdc_monitor::config>,
ssx::thread_worker&,
ssx::singleton_thread_worker&,
const std::unique_ptr<pandaproxy::schema_registry::api>&) noexcept;

~server() noexcept override = default;
Expand Down Expand Up @@ -161,7 +161,7 @@ class server final

latency_probe& latency_probe() { return *_probe; }

ssx::thread_worker& thread_worker() { return _thread_worker; }
ssx::singleton_thread_worker& thread_worker() { return _thread_worker; }

const std::unique_ptr<pandaproxy::schema_registry::api>& schema_registry() {
return _schema_registry;
Expand Down Expand Up @@ -222,7 +222,7 @@ class server final
ssx::metrics::metric_groups _metrics
= ssx::metrics::metric_groups::make_internal();
std::unique_ptr<class latency_probe> _probe;
ssx::thread_worker& _thread_worker;
ssx::singleton_thread_worker& _thread_worker;
std::unique_ptr<replica_selector> _replica_selector;
const std::unique_ptr<pandaproxy::schema_registry::api>& _schema_registry;
};
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2132,7 +2132,7 @@ void application::start_runtime_services(
});
});

thread_worker->start().get();
thread_worker->start({.name = "worker"}).get();

// single instance
node_status_backend.invoke_on_all(&cluster::node_status_backend::start)
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class application {

std::unique_ptr<cluster::controller> controller;

std::unique_ptr<ssx::thread_worker> thread_worker;
std::unique_ptr<ssx::singleton_thread_worker> thread_worker;

ss::sharded<kafka::server> _kafka_server;

Expand Down
2 changes: 1 addition & 1 deletion src/v/security/gssapi_authenticator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class gssapi_authenticator::impl {
};

gssapi_authenticator::gssapi_authenticator(
ssx::thread_worker& thread_worker,
ssx::singleton_thread_worker& thread_worker,
std::vector<gssapi_rule> rules,
ss::sstring principal,
ss::sstring keytab)
Expand Down
4 changes: 2 additions & 2 deletions src/v/security/gssapi_authenticator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class gssapi_authenticator final : public sasl_mechanism {
static constexpr const char* name = "GSSAPI";

gssapi_authenticator(
ssx::thread_worker& thread_worker,
ssx::singleton_thread_worker& thread_worker,
std::vector<gssapi_rule> rules,
ss::sstring principal,
ss::sstring keytab);
Expand All @@ -40,7 +40,7 @@ class gssapi_authenticator final : public sasl_mechanism {
friend std::ostream&
operator<<(std::ostream& os, gssapi_authenticator::state const s);

ssx::thread_worker& _worker;
ssx::singleton_thread_worker& _worker;
security::acl_principal _principal;
state _state{state::init};
class impl;
Expand Down
2 changes: 1 addition & 1 deletion src/v/ssx/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@

namespace ssx {

class thread_worker;
class singleton_thread_worker;

} // namespace ssx
11 changes: 9 additions & 2 deletions src/v/ssx/tests/thread_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ template<size_t tries, size_t stop_at>
auto thread_worker_test() {
BOOST_REQUIRE_GT(ss::smp::count, 1);

auto w = ssx::thread_worker{};
w.start().get();
auto w = ssx::singleton_thread_worker{};
w.start({}).get();

std::vector<std::vector<ss::future<move_only>>> all_results(ss::smp::count);

Expand Down Expand Up @@ -73,6 +73,13 @@ auto thread_worker_test() {
}
}

SEASTAR_THREAD_TEST_CASE(thread_worker_can_be_stopped_before_its_started) {
// During application startup, we register a defer for stopping before the
// worker is started.
auto w = ssx::singleton_thread_worker{};
w.stop().get();
}

SEASTAR_THREAD_TEST_CASE(thread_worker_single_cancel_after) {
// cancel thread worker at the after all of the gets
thread_worker_test<1, 1>();
Expand Down
4 changes: 2 additions & 2 deletions src/v/ssx/tests/thread_worker_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#include <seastar/testing/perf_tests.hh>

ss::future<> run_test(size_t data_size) {
auto w = ssx::thread_worker{};
co_await w.start();
auto w = ssx::singleton_thread_worker{};
co_await w.start({});

std::vector<ss::future<size_t>> vec;
vec.reserve(data_size);
Expand Down
Loading

0 comments on commit 76ab1e6

Please sign in to comment.