Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 22, 2023
1 parent f84aff4 commit 063fcba
Show file tree
Hide file tree
Showing 32 changed files with 619 additions and 810 deletions.
7 changes: 4 additions & 3 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cluster/archival_metadata_stm.h"

#include "bytes/iobuf.h"
#include "bytes/iostream.h"
#include "cloud_storage/partition_manifest.h"
#include "cloud_storage/remote.h"
Expand Down Expand Up @@ -708,7 +709,7 @@ ss::future<std::error_code> archival_metadata_stm::do_add_segments(
co_return errc::success;
}

ss::future<> archival_metadata_stm::apply(model::record_batch b) {
ss::future<> archival_metadata_stm::apply(const model::record_batch& b) {
if (b.header().type == model::record_batch_type::prefix_truncate) {
// Special case handling for prefix_truncate batches: these originate
// in log_eviction_stm, but affect the entire partition, local and
Expand Down Expand Up @@ -801,7 +802,7 @@ ss::future<> archival_metadata_stm::apply(model::record_batch b) {
_manifest->advance_insync_offset(b.last_offset());
}

ss::future<> archival_metadata_stm::handle_raft_snapshot() {
ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
cloud_storage::partition_manifest new_manifest{
_manifest->get_ntp(), _manifest->get_revision_id()};

Expand Down Expand Up @@ -1202,7 +1203,7 @@ archival_metadata_stm::get_segments_to_cleanup() const {

ss::future<> archival_metadata_stm::stop() {
_download_as.request_abort();
co_await raft::state_machine::stop();
co_await persisted_stm<>::stop();
}

const cloud_storage::partition_manifest&
Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ class archival_metadata_stm final : public persisted_stm<> {

model::offset max_collectible_offset() override;

const char* get_name() const final { return "archival_metadata_stm"; }
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

private:
ss::future<std::error_code> do_add_segments(
std::vector<cloud_storage::segment_meta>,
Expand All @@ -235,8 +238,8 @@ class archival_metadata_stm final : public persisted_stm<> {
ss::future<std::error_code>
do_replicate_commands(model::record_batch, ss::abort_source&);

ss::future<> apply(model::record_batch batch) override;
ss::future<> handle_raft_snapshot() override;
ss::future<> apply(const model::record_batch& batch) override;
ss::future<> apply_raft_snapshot(const iobuf&) override;

ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) override;
ss::future<stm_snapshot> take_local_snapshot() override;
Expand Down
5 changes: 3 additions & 2 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "cluster/id_allocator_stm.h"

#include "bytes/iobuf.h"
#include "cluster/logger.h"
#include "cluster/types.h"
#include "config/configuration.h"
Expand Down Expand Up @@ -118,7 +119,7 @@ id_allocator_stm::do_allocate_id(model::timeout_clock::duration timeout) {
co_return stm_allocation_result{id, raft::errc::success};
}

ss::future<> id_allocator_stm::apply(model::record_batch b) {
ss::future<> id_allocator_stm::apply(const model::record_batch& b) {
if (b.header().type != model::record_batch_type::id_allocator) {
return ss::now();
}
Expand Down Expand Up @@ -202,7 +203,7 @@ ss::future<stm_snapshot> id_allocator_stm::take_local_snapshot() {
std::logic_error("id_allocator_stm doesn't support snapshots"));
}

ss::future<> id_allocator_stm::handle_raft_snapshot() {
ss::future<> id_allocator_stm::apply_raft_snapshot(const iobuf&) {
_next_snapshot = _raft->start_offset();
_processed = 0;
set_next(_next_snapshot);
Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/id_allocator_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class id_allocator_stm final : public persisted_stm<> {
ss::future<stm_allocation_result>
allocate_id(model::timeout_clock::duration timeout);

const char* get_name() const final { return "id_allocator_stm"; }
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

private:
// legacy structs left for backward compatibility with the "old"
// on-disk log format
Expand Down Expand Up @@ -95,12 +98,12 @@ class id_allocator_stm final : public persisted_stm<> {
do_allocate_id(model::timeout_clock::duration);
ss::future<bool> set_state(int64_t, model::timeout_clock::duration);

ss::future<> apply(model::record_batch) override;
ss::future<> apply(const model::record_batch&) final;

ss::future<> write_snapshot();
ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) override;
ss::future<stm_snapshot> take_local_snapshot() override;
ss::future<> handle_raft_snapshot() override;
ss::future<> apply_raft_snapshot(const iobuf&) final;
ss::future<bool> sync(model::timeout_clock::duration);

mutex _lock;
Expand Down
39 changes: 14 additions & 25 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "bytes/iostream.h"
#include "cluster/prefix_truncate_record.h"
#include "model/fundamental.h"
#include "raft/consensus.h"
#include "raft/types.h"
#include "serde/serde.h"
Expand All @@ -36,12 +37,8 @@ struct snapshot_data
};

log_eviction_stm::log_eviction_stm(
raft::consensus* raft,
ss::logger& logger,
ss::abort_source& as,
storage::kvstore& kvstore)
: persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore)
, _as(as) {}
raft::consensus* raft, ss::logger& logger, storage::kvstore& kvstore)
: persisted_stm("log_eviction_stm.snapshot", logger, raft, kvstore) {}

ss::future<> log_eviction_stm::start() {
ssx::spawn_with_gate(_gate, [this] { return monitor_log_eviction(); });
Expand All @@ -51,8 +48,12 @@ ss::future<> log_eviction_stm::start() {
}

ss::future<> log_eviction_stm::stop() {
vlog(_log.info, "DBG: log_eviction_stm::stop()");
_as.request_abort();
_has_pending_truncation.broken();
vlog(_log.info, "DBG: log_eviction_stm::persisted_stm::stop()");
co_await persisted_stm::stop();
vlog(_log.info, "DBG: log_eviction_stm::after_stop()");
}

ss::future<> log_eviction_stm::handle_log_eviction_events() {
Expand Down Expand Up @@ -192,8 +193,10 @@ log_eviction_stm::do_write_raft_snapshot(model::offset truncation_point) {
_log.debug,
"Requesting raft snapshot with final offset: {}",
truncation_point);
auto snapshot_data = co_await _raft->stm_manager()->take_snapshot(
truncation_point);
co_await _raft->write_snapshot(
raft::write_snapshot_cfg(truncation_point, iobuf()));
raft::write_snapshot_cfg(truncation_point, std::move(snapshot_data)));
}

ss::future<result<model::offset, std::error_code>>
Expand Down Expand Up @@ -317,7 +320,7 @@ ss::future<log_eviction_stm::offset_result> log_eviction_stm::replicate_command(
co_return result.value().last_offset;
}

ss::future<> log_eviction_stm::apply(model::record_batch batch) {
ss::future<> log_eviction_stm::apply(const model::record_batch& batch) {
if (likely(
batch.header().type != model::record_batch_type::prefix_truncate)) {
co_return;
Expand Down Expand Up @@ -366,30 +369,16 @@ ss::future<> log_eviction_stm::apply(model::record_batch batch) {
}
}

ss::future<> log_eviction_stm::handle_raft_snapshot() {
/// In the case there is a gap detected in the log, the only path
/// forward is to read the raft snapshot and begin processing from the
/// raft last_snapshot_index
auto raft_snapshot = co_await _raft->open_snapshot();
if (!raft_snapshot) {
throw std::runtime_error{fmt_with_ctx(
fmt::format,
"encountered a gap in the raft log (last_applied: {}, log start "
"offset: {}), but can't find the snapshot - ntp: {}",
last_applied_offset(),
_raft->start_offset(),
_raft->ntp())};
}

auto last_snapshot_index = raft_snapshot->metadata.last_included_index;
co_await raft_snapshot->close();
ss::future<> log_eviction_stm::apply_raft_snapshot(const iobuf&) {
auto last_snapshot_index = model::prev_offset(_raft->start_offset());
_delete_records_eviction_offset = model::offset{};
_storage_eviction_offset = last_snapshot_index;
set_next(model::next_offset(last_snapshot_index));
vlog(
_log.info,
"Handled log eviction new effective start offset: {}",
effective_start_offset());
co_return;
}

ss::future<> log_eviction_stm::apply_local_snapshot(
Expand Down
12 changes: 7 additions & 5 deletions src/v/cluster/log_eviction_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ class consensus;
class log_eviction_stm : public persisted_stm<kvstore_backed_stm_snapshot> {
public:
using offset_result = result<model::offset, std::error_code>;
log_eviction_stm(
raft::consensus*, ss::logger&, ss::abort_source&, storage::kvstore&);
log_eviction_stm(raft::consensus*, ss::logger&, storage::kvstore&);

ss::future<> start() override;

Expand Down Expand Up @@ -103,6 +102,9 @@ class log_eviction_stm : public persisted_stm<kvstore_backed_stm_snapshot> {
return model::next_offset(_delete_records_eviction_offset);
}

const char* get_name() const final { return "log_eviction_stm"; }
ss::future<iobuf> take_snapshot(model::offset) final { co_return iobuf{}; }

protected:
ss::future<> apply_local_snapshot(stm_snapshot_header, iobuf&&) override;

Expand All @@ -117,16 +119,16 @@ class log_eviction_stm : public persisted_stm<kvstore_backed_stm_snapshot> {
ss::future<> monitor_log_eviction();
ss::future<> do_write_raft_snapshot(model::offset);
ss::future<> handle_log_eviction_events();
ss::future<> apply(model::record_batch) override;
ss::future<> handle_raft_snapshot() override;
ss::future<> apply(const model::record_batch&) final;
ss::future<> apply_raft_snapshot(const iobuf&) final;

ss::future<offset_result> replicate_command(
model::record_batch batch,
ss::lowres_clock::time_point deadline,
std::optional<std::reference_wrapper<ss::abort_source>> as);

private:
ss::abort_source& _as;
ss::abort_source _as;

// Offset we are able to truncate based on local retention policy, as
// signaled by the storage layer. This value is not maintained via the
Expand Down
Loading

0 comments on commit 063fcba

Please sign in to comment.