Skip to content

Commit

Permalink
c/backend: shard_table to be able to notify subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
bashtanov committed Jun 11, 2024
1 parent 9c7c7a3 commit 843b0b6
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions src/v/cluster/shard_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "base/seastarx.h"
#include "cluster/logger.h"
#include "cluster/notification.h"
#include "container/chunked_hash_map.h"
#include "model/fundamental.h"
#include "model/ktp.h"
Expand Down Expand Up @@ -74,6 +75,10 @@ class shard_table final {
log_rev);
_ntp_idx.insert_or_assign(ntp, shard_revision{shard, log_rev});
_group_idx.insert_or_assign(g, shard_revision{shard, log_rev});

for (auto& cb : _notifications) {
cb.second({ntp, g, shard});
}
}

void
Expand Down Expand Up @@ -104,6 +109,31 @@ class shard_table final {
log_rev);
_ntp_idx.erase(ntp);
_group_idx.erase(g);

for (auto& cb : _notifications) {
cb.second({ntp, g, std::nullopt});
}
}

struct change_t {
model::ntp ntp;
raft::group_id g;
std::optional<ss::shard_id> shard;
};
using change_cb_t = ss::noncopyable_function<void(change_t)>;

notification_id_type register_notification(change_cb_t cb) {
auto id = _notification_id++;
_notifications.emplace_back(id, std::move(cb));
return id;
}

void unregister_delta_notification(cluster::notification_id_type id) {
std::erase_if(
_notifications,
[id](const std::pair<cluster::notification_id_type, change_cb_t>& n) {
return n.first == id;
});
}

private:
Expand Down Expand Up @@ -131,5 +161,9 @@ class shard_table final {
_ntp_idx;
// raft index
chunked_hash_map<raft::group_id, shard_revision> _group_idx;

cluster::notification_id_type _notification_id{0};
std::vector<std::pair<cluster::notification_id_type, change_cb_t>>
_notifications;
};
} // namespace cluster

0 comments on commit 843b0b6

Please sign in to comment.