Skip to content

Commit

Permalink
Merge pull request #15139 from mmaslankaprv/v23.2.x-backports
Browse files Browse the repository at this point in the history
[v23.2.x] Backport of #13964 #14739 #14803 #15049 #15032
  • Loading branch information
mmaslankaprv authored Nov 28, 2023
2 parents af939c3 + 3d338fb commit c01a044
Show file tree
Hide file tree
Showing 12 changed files with 685 additions and 350 deletions.
2 changes: 1 addition & 1 deletion src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ void partition_manifest::set_archive_clean_offset(

bool partition_manifest::advance_start_kafka_offset(
kafka::offset new_start_offset) {
if (_start_kafka_offset_override >= new_start_offset) {
if (_start_kafka_offset_override > new_start_offset) {
return false;
}
_start_kafka_offset_override = new_start_offset;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage/tests/partition_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1431,9 +1431,9 @@ SEASTAR_THREAD_TEST_CASE(test_partition_manifest_start_kafka_offset_advance) {
m.get_start_kafka_offset_override(), kafka::offset(370));
BOOST_REQUIRE_EQUAL(m.get_start_offset(), model::offset(100));
BOOST_REQUIRE_EQUAL(m.get_start_kafka_offset(), kafka::offset(90));

// Allow update with the same value
BOOST_REQUIRE(m.advance_start_kafka_offset(kafka::offset(370)));
// If trying to move back, it should no-op.
BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(370)));
BOOST_REQUIRE(!m.advance_start_kafka_offset(kafka::offset(369)));
BOOST_REQUIRE_EQUAL(
m.get_start_kafka_offset_override(), kafka::offset(370));
Expand Down
19 changes: 14 additions & 5 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/node_config.h"
#include "features/feature_table.h"
#include "model/metadata.h"
#include "resource_mgmt/io_priority.h"
#include "rpc/connection_cache.h"
#include "utils/file_io.h"
Expand Down Expand Up @@ -81,7 +83,7 @@ config_manager::config_manager(
/**
* Register notification immediately not to lose status updates.
*/
_member_removed_notification
_member_update_notification
= _members.local().register_members_updated_notification(
[this](model::node_id id, model::membership_state new_state) {
handle_cluster_members_update(id, new_state);
Expand Down Expand Up @@ -229,17 +231,24 @@ ss::future<> config_manager::start() {
}
void config_manager::handle_cluster_members_update(
model::node_id id, model::membership_state new_state) {
if (new_state != model::membership_state::removed) {
return;
vlog(
clusterlog.debug,
"Processing membership notification: {{id: {} state: {}}}",
id,
new_state);
if (new_state == model::membership_state::active) {
// add an empty status placeholder if node is not yet known
status.try_emplace(id, config_status{.node = id});
} else if (new_state == model::membership_state::removed) {
status.erase(id);
}
status.erase(id);
}

ss::future<> config_manager::stop() {
vlog(clusterlog.info, "Stopping Config Manager...");
_reconcile_wait.broken();
_members.local().unregister_members_updated_notification(
_member_removed_notification);
_member_update_notification);
_leaders.local().unregister_leadership_change_notification(
_raft0_leader_changed_notification);
co_await _gate.close();
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class config_manager final {
ss::sharded<partition_leaders_table>& _leaders;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<cluster::members_table>& _members;
notification_id_type _member_removed_notification;
notification_id_type _member_update_notification;
notification_id_type _raft0_leader_changed_notification;

ss::condition_variable _reconcile_wait;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,7 @@ ss::future<result<kafka_result>> rm_stm::do_replicate(
ss::future<> rm_stm::stop() {
auto_abort_timer.cancel();
_log_stats_timer.cancel();
_metrics.clear();
return raft::state_machine::stop();
}

Expand Down
Loading

0 comments on commit c01a044

Please sign in to comment.