From c943a0904e6f1a3d3efcd8381d5fd53bf4cc7588 Mon Sep 17 00:00:00 2001 From: alexey bashtanov Date: Tue, 11 Jun 2024 15:13:52 +0100 Subject: [PATCH] c/backend: shard_table to be able to notify subscribers --- src/v/cluster/shard_table.h | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/v/cluster/shard_table.h b/src/v/cluster/shard_table.h index 37cd561f7a82..452194d1d9f6 100644 --- a/src/v/cluster/shard_table.h +++ b/src/v/cluster/shard_table.h @@ -13,10 +13,12 @@ #include "base/seastarx.h" #include "cluster/logger.h" +#include "cluster/notification.h" #include "container/chunked_hash_map.h" #include "model/fundamental.h" #include "model/ktp.h" #include "raft/fundamental.h" +#include "utils/notification_list.h" #include // shard_id @@ -74,6 +76,8 @@ class shard_table final { log_rev); _ntp_idx.insert_or_assign(ntp, shard_revision{shard, log_rev}); _group_idx.insert_or_assign(g, shard_revision{shard, log_rev}); + + _notification_list.notify(ntp, g, shard); } void @@ -104,6 +108,21 @@ class shard_table final { log_rev); _ntp_idx.erase(ntp); _group_idx.erase(g); + + _notification_list.notify(ntp, g, std::nullopt); + } + + using change_cb_t = ss::noncopyable_function shard)>; + + notification_id_type register_notification(change_cb_t&& cb) { + return _notification_list.register_cb(std::move(cb)); + } + + void unregister_delta_notification(cluster::notification_id_type id) { + _notification_list.unregister_cb(id); } private: @@ -131,5 +150,7 @@ class shard_table final { _ntp_idx; // raft index chunked_hash_map _group_idx; + + notification_list _notification_list; }; } // namespace cluster