Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] Backport of #13964 #14739 #14803 #15049 #15032 #15139

Merged
merged 7 commits into from
Nov 28, 2023
2 changes: 1 addition & 1 deletion src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ void partition_manifest::set_archive_clean_offset(

bool partition_manifest::advance_start_kafka_offset(
kafka::offset new_start_offset) {
if (_start_kafka_offset_override >= new_start_offset) {
if (_start_kafka_offset_override > new_start_offset) {
return false;
}
_start_kafka_offset_override = new_start_offset;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/partition_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1431,9 +1431,9 @@ SEASTAR_THREAD_TEST_CASE(test_partition_manifest_start_kafka_offset_advance) {
m.get_start_kafka_offset_override(), kafka::offset(370));
BOOST_REQUIRE_EQUAL(m.get_start_offset(), model::offset(100));
BOOST_REQUIRE_EQUAL(m.get_start_kafka_offset(), kafka::offset(90));

// Allow update with the same value
BOOST_REQUIRE(m.advance_start_kafka_offset(kafka::offset(370)));
// If trying to move back, it should no-op.
BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(370)));
BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(369)));
BOOST_REQUIRE_EQUAL(
m.get_start_kafka_offset_override(), kafka::offset(370));
Expand Down
19 changes: 14 additions & 5 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/node_config.h"
#include "features/feature_table.h"
#include "model/metadata.h"
#include "resource_mgmt/io_priority.h"
#include "rpc/connection_cache.h"
#include "utils/file_io.h"
Expand Down Expand Up @@ -81,7 +83,7 @@ config_manager::config_manager(
/**
* Register notification immediately not to lose status updates.
*/
_member_removed_notification
_member_update_notification
= _members.local().register_members_updated_notification(
[this](model::node_id id, model::membership_state new_state) {
handle_cluster_members_update(id, new_state);
Expand Down Expand Up @@ -229,17 +231,24 @@ ss::future<> config_manager::start() {
}
void config_manager::handle_cluster_members_update(
model::node_id id, model::membership_state new_state) {
if (new_state != model::membership_state::removed) {
return;
vlog(
clusterlog.debug,
"Processing membership notification: {{id: {} state: {}}}",
id,
new_state);
if (new_state == model::membership_state::active) {
// add an empty status placeholder if node is not yet known
status.try_emplace(id, config_status{.node = id});
} else if (new_state == model::membership_state::removed) {
status.erase(id);
}
status.erase(id);
}

ss::future<> config_manager::stop() {
vlog(clusterlog.info, "Stopping Config Manager...");
_reconcile_wait.broken();
_members.local().unregister_members_updated_notification(
_member_removed_notification);
_member_update_notification);
_leaders.local().unregister_leadership_change_notification(
_raft0_leader_changed_notification);
co_await _gate.close();
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class config_manager final {
ss::sharded<partition_leaders_table>& _leaders;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<cluster::members_table>& _members;
notification_id_type _member_removed_notification;
notification_id_type _member_update_notification;
notification_id_type _raft0_leader_changed_notification;

ss::condition_variable _reconcile_wait;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,7 @@ ss::future<result<kafka_result>> rm_stm::do_replicate(
ss::future<> rm_stm::stop() {
auto_abort_timer.cancel();
_log_stats_timer.cancel();
_metrics.clear();
return raft::state_machine::stop();
}

Expand Down
Loading
Loading