Skip to content

Commit

Permalink
Merge pull request #12075 from mmaslankaprv/v23.1.x-backports
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaslankaprv authored Jul 20, 2023
2 parents 8d1f797 + 922e321 commit 9a9ea29
Show file tree
Hide file tree
Showing 51 changed files with 1,267 additions and 744 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ ss::future<> archival_metadata_stm::apply(model::record_batch b) {
_manifest->advance_insync_offset(b.last_offset());
}

ss::future<> archival_metadata_stm::handle_eviction() {
ss::future<> archival_metadata_stm::handle_raft_snapshot() {
cloud_storage::partition_manifest manifest;

const auto& bucket_config
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class archival_metadata_stm final : public persisted_stm {
std::optional<std::reference_wrapper<ss::abort_source>>);

ss::future<> apply(model::record_batch batch) override;
ss::future<> handle_eviction() override;
ss::future<> handle_raft_snapshot() override;

ss::future<> apply_snapshot(stm_snapshot_header, iobuf&&) override;
ss::future<stm_snapshot> take_snapshot() override;
Expand Down
18 changes: 8 additions & 10 deletions src/v/cluster/config_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -743,16 +743,6 @@ void config_manager::merge_apply_result(
ss::future<> config_manager::store_delta(
config_version const& delta_version,
cluster_config_delta_cmd_data const& data) {
_seen_version = delta_version;

// version_shard is chosen to match controller_stm_shard, so
// our raft0 stm apply operations do not need a core jump to
// update the frontend version state.
vassert(
ss::this_shard_id() == config_frontend::version_shard,
"Must be called on frontend version_shard");
_frontend.local().set_next_version(_seen_version + config_version{1});

for (const auto& u : data.upsert) {
_raw_values[u.key] = u.value;
}
Expand Down Expand Up @@ -821,6 +811,14 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) {
_seen_version);
co_return errc::success;
}
_seen_version = delta_version;
// version_shard is chosen to match controller_stm_shard, so
// our raft0 stm apply operations do not need a core jump to
// update the frontend version state.
vassert(
ss::this_shard_id() == config_frontend::version_shard,
"Must be called on frontend version_shard");
_frontend.local().set_next_version(_seen_version + config_version{1});

const cluster_config_delta_cmd_data& data = cmd.value;
vlog(
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ class controller {
return _raft0->committed_offset();
}

model::offset get_dirty_offset() const { return _raft0->dirty_offset(); }

static const bytes invariants_key;

private:
friend controller_probe;

Expand Down
7 changes: 4 additions & 3 deletions src/v/cluster/controller_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rpc/connection_cache.h"
#include "ssx/future-util.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
Expand Down Expand Up @@ -142,7 +143,7 @@ controller_api::get_reconciliation_state(model::topic_namespace_view tp_ns) {
co_return co_await get_reconciliation_state(std::move(ntps));
}

ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
controller_api::get_remote_core_deltas(model::ntp ntp, ss::shard_id shard) {
return _backend.invoke_on(
shard, [ntp = std::move(ntp)](controller_backend& backend) {
Expand All @@ -164,7 +165,7 @@ controller_api::get_reconciliation_state(model::ntp ntp) {
std::move(ntp), errc::partition_not_exists);
}
// query controller backends for in progress operations
std::vector<backend_operation> ops;
ss::chunked_fifo<backend_operation> ops;
const auto shards = boost::irange<ss::shard_id>(0, ss::smp::count);
for (auto shard : shards) {
auto local_deltas = co_await get_remote_core_deltas(ntp, shard);
Expand Down Expand Up @@ -268,7 +269,7 @@ controller_api::get_reconciliation_state(
}
vassert(result.value().size() == 1, "result MUST contain single ntp");

return ret_t(result.value().front());
return ret_t(std::move(result.value().front()));
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/controller_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "seastarx.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/node_hash_map.h>
Expand Down Expand Up @@ -88,7 +89,7 @@ class controller_api {
std::optional<ss::shard_id> shard_for(const model::ntp& ntp) const;

private:
ss::future<std::vector<controller_backend::delta_metadata>>
ss::future<ss::chunked_fifo<controller_backend::delta_metadata>>
get_remote_core_deltas(model::ntp, ss::shard_id);

model::node_id _self;
Expand Down
14 changes: 9 additions & 5 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#include <absl/container/flat_hash_set.h>
#include <absl/container/node_hash_map.h>
#include <fmt/ranges.h>

#include <algorithm>
#include <exception>
Expand Down Expand Up @@ -472,7 +473,6 @@ controller_backend::deltas_t calculate_bootstrap_deltas(
}

auto start = std::next(it).base();
result_delta.reserve(std::distance(start, deltas.end()));
std::move(start, deltas.end(), std::back_inserter(result_delta));
return result_delta;
}
Expand Down Expand Up @@ -1116,9 +1116,9 @@ bool controller_backend::can_finish_update(
uint64_t current_retry,
topic_table_delta::op_type update_type,
const std::vector<model::broker_shard>& current_replicas) {
// force abort update may be finished by any node
if (update_type == topic_table_delta::op_type::force_abort_update) {
return true;
// Wait for the leader to be elected in the new replica set.
return current_leader == _self;
}
/**
* If the revert feature is active we use current leader to dispatch
Expand Down Expand Up @@ -1804,10 +1804,14 @@ ss::future<> controller_backend::delete_partition(
co_await _partition_manager.local().remove(ntp, mode);
}

std::vector<controller_backend::delta_metadata>
ss::chunked_fifo<controller_backend::delta_metadata>
controller_backend::list_ntp_deltas(const model::ntp& ntp) const {
if (auto it = _topic_deltas.find(ntp); it != _topic_deltas.end()) {
return it->second;
ss::chunked_fifo<controller_backend::delta_metadata> ret;
ret.reserve(it->second.size());
std::copy(
it->second.begin(), it->second.end(), std::back_inserter(ret));
return ret;
}

return {};
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/controller_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
#include "storage/api.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>

#include <absl/container/node_hash_map.h>

#include <cstdint>
#include <deque>
#include <ostream>

namespace cluster {
Expand Down Expand Up @@ -230,7 +232,7 @@ class controller_backend
friend std::ostream& operator<<(std::ostream&, const delta_metadata&);
};

using deltas_t = std::vector<delta_metadata>;
using deltas_t = std::deque<delta_metadata>;
using results_t = std::vector<std::error_code>;
controller_backend(
ss::sharded<cluster::topic_table>&,
Expand All @@ -246,7 +248,7 @@ class controller_backend
ss::future<> stop();
ss::future<> start();

std::vector<delta_metadata> list_ntp_deltas(const model::ntp&) const;
ss::chunked_fifo<delta_metadata> list_ntp_deltas(const model::ntp&) const;

private:
struct cross_shard_move_request {
Expand Down Expand Up @@ -408,6 +410,6 @@ class controller_backend
ss::metrics::metric_groups _metrics;
};

std::vector<controller_backend::delta_metadata> calculate_bootstrap_deltas(
model::node_id self, const std::vector<controller_backend::delta_metadata>&);
controller_backend::deltas_t calculate_bootstrap_deltas(
model::node_id self, const controller_backend::deltas_t&);
} // namespace cluster
2 changes: 1 addition & 1 deletion src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ ss::future<stm_snapshot> id_allocator_stm::take_snapshot() {
std::logic_error("id_allocator_stm doesn't support snapshots"));
}

ss::future<> id_allocator_stm::handle_eviction() {
ss::future<> id_allocator_stm::handle_raft_snapshot() {
_next_snapshot = _c->start_offset();
_processed = 0;
set_next(_next_snapshot);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class id_allocator_stm final : public persisted_stm {
ss::future<> write_snapshot();
ss::future<> apply_snapshot(stm_snapshot_header, iobuf&&) override;
ss::future<stm_snapshot> take_snapshot() override;
ss::future<> handle_eviction() override;
ss::future<> handle_raft_snapshot() override;
ss::future<bool> sync(model::timeout_clock::duration);

mutex _lock;
Expand Down
67 changes: 28 additions & 39 deletions src/v/cluster/metadata_dissemination_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,15 @@
#include "model/metadata.h"
#include "model/timeout_clock.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/smp.hh>

#include <boost/range/irange.hpp>

#include <algorithm>
#include <iterator>

namespace {
std::vector<cluster::ntp_leader_revision>
from_ntp_leaders(std::vector<cluster::ntp_leader> old_leaders) {
std::vector<cluster::ntp_leader_revision> leaders;
leaders.reserve(old_leaders.size());
std::transform(
old_leaders.begin(),
old_leaders.end(),
std::back_inserter(leaders),
[](cluster::ntp_leader& leader) {
return cluster::ntp_leader_revision(
std::move(leader.ntp),
leader.term,
leader.leader_id,
model::revision_id{} /* explicitly default */
);
});
return leaders;
}
} // namespace
namespace cluster {
metadata_dissemination_handler::metadata_dissemination_handler(
ss::scheduling_group sg,
Expand All @@ -50,15 +36,6 @@ metadata_dissemination_handler::metadata_dissemination_handler(
: metadata_dissemination_rpc_service(sg, ssg)
, _leaders(leaders) {}

ss::future<update_leadership_reply>
metadata_dissemination_handler::update_leadership(
update_leadership_request&& req, rpc::streaming_context&) {
return ss::with_scheduling_group(
get_scheduling_group(), [this, req = std::move(req)]() mutable {
return do_update_leadership(from_ntp_leaders(std::move(req.leaders)));
});
}

ss::future<update_leadership_reply>
metadata_dissemination_handler::update_leadership_v2(
update_leadership_request_v2&& req, rpc::streaming_context&) {
Expand All @@ -70,17 +47,29 @@ metadata_dissemination_handler::update_leadership_v2(

ss::future<update_leadership_reply>
metadata_dissemination_handler::do_update_leadership(
std::vector<ntp_leader_revision> leaders) {
ss::chunked_fifo<ntp_leader_revision> leaders) {
vlog(clusterlog.trace, "Received a metadata update");
return _leaders
.invoke_on_all(
[leaders = std::move(leaders)](partition_leaders_table& pl) mutable {
for (auto& leader : leaders) {
pl.update_partition_leader(
leader.ntp, leader.revision, leader.term, leader.leader_id);
}
})
.then([] { return ss::make_ready_future<update_leadership_reply>(); });
co_await ss::parallel_for_each(
boost::irange<ss::shard_id>(0, ss::smp::count),
[this, leaders = std::move(leaders)](ss::shard_id shard) {
ss::chunked_fifo<ntp_leader_revision> local_leaders;
local_leaders.reserve(leaders.size());
std::copy(
leaders.begin(), leaders.end(), std::back_inserter(local_leaders));

return ss::smp::submit_to(
shard, [this, leaders = std::move(local_leaders)] {
for (auto& leader : leaders) {
_leaders.local().update_partition_leader(
leader.ntp,
leader.revision,
leader.term,
leader.leader_id);
}
});
});

co_return update_leadership_reply{};
}

static get_leadership_reply
Expand Down
6 changes: 2 additions & 4 deletions src/v/cluster/metadata_dissemination_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/metadata_dissemination_rpc_service.h"
#include "raft/types.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sharded.hh>

Expand All @@ -38,9 +39,6 @@ class metadata_dissemination_handler
ss::smp_service_group,
ss::sharded<partition_leaders_table>&);

ss::future<update_leadership_reply> update_leadership(
update_leadership_request&&, rpc::streaming_context&) final;

ss::future<get_leadership_reply>
get_leadership(get_leadership_request&&, rpc::streaming_context&) final;

Expand All @@ -49,7 +47,7 @@ class metadata_dissemination_handler

private:
ss::future<update_leadership_reply>
do_update_leadership(std::vector<ntp_leader_revision>);
do_update_leadership(ss::chunked_fifo<ntp_leader_revision>);

ss::sharded<partition_leaders_table>& _leaders;
}; // namespace cluster
Expand Down
5 changes: 0 additions & 5 deletions src/v/cluster/metadata_dissemination_rpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
"cluster/metadata_dissemination_types.h"
],
"methods": [
{
"name": "update_leadership",
"input_type": "update_leadership_request",
"output_type": "update_leadership_reply"
},
{
"name": "get_leadership",
"input_type": "get_leadership_request",
Expand Down
Loading

0 comments on commit 9a9ea29

Please sign in to comment.