Skip to content

Commit

Permalink
c/persisted_stm: removed redundant _insync_offset
Browse files Browse the repository at this point in the history
Reoved `_insync_offset` from `cluster::persisted_stm` tracking this
offset was redundant to `_next` as it is always offset previous to
`_next` and can be replaced with `last_applied_offset()`

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Aug 30, 2023
1 parent ecea946 commit 42f32ed
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 45 deletions.
20 changes: 7 additions & 13 deletions src/v/cluster/archival_metadata_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -728,11 +728,10 @@ ss::future<> archival_metadata_stm::apply(const model::record_batch& b) {
apply_update_start_kafka_offset(val.kafka_start_offset);
}
});
_insync_offset = b.last_offset();

co_return;
}
if (b.header().type != model::record_batch_type::archival_metadata) {
_insync_offset = b.last_offset();
co_return;
}

Expand Down Expand Up @@ -798,7 +797,6 @@ ss::future<> archival_metadata_stm::apply(const model::record_batch& b) {
};
});

_insync_offset = b.last_offset();
_manifest->advance_insync_offset(b.last_offset());
}

Expand All @@ -822,7 +820,6 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
cloud_storage_clients::bucket_name{*bucket}, new_manifest, rc_node);

if (res == cloud_storage::download_result::notfound) {
_insync_offset = model::prev_offset(_raft->start_offset());
set_next(_raft->start_offset());
vlog(_logger.info, "handled log eviction, the manifest is absent");
co_return;
Expand All @@ -843,12 +840,9 @@ ss::future<> archival_metadata_stm::apply_raft_snapshot(const iobuf&) {
if (iso == model::offset{}) {
// Handle legacy manifests which don't have the 'insync_offset'
// field.
_insync_offset = _manifest->get_last_offset();
} else {
_insync_offset = iso;
iso = _manifest->get_last_offset();
}
auto next_offset = std::max(
_raft->start_offset(), model::next_offset(_insync_offset));
auto next_offset = std::max(_raft->start_offset(), model::next_offset(iso));
set_next(next_offset);

vlog(
Expand Down Expand Up @@ -945,12 +939,12 @@ ss::future<stm_snapshot> archival_metadata_stm::take_local_snapshot() {
vlog(
_logger.debug,
"creating snapshot at offset: {}, remote start_offset: {}, "
"last_offset: "
"{}",
_insync_offset,
"last_offset: {}",
last_applied_offset(),
get_start_offset(),
get_last_offset());
co_return stm_snapshot::create(0, _insync_offset, std::move(snap_data));
co_return stm_snapshot::create(
0, last_applied_offset(), std::move(snap_data));
}

model::offset archival_metadata_stm::max_collectible_offset() {
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/archival_metadata_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class archival_metadata_stm final : public persisted_stm<> {

// Users of the stm need to know insync offset in order to pass
// the proper value to mark_clean
model::offset get_insync_offset() const { return _insync_offset; }
model::offset get_insync_offset() const { return last_applied_offset(); }

model::offset get_last_clean_at() const { return _last_clean_at; };

Expand Down Expand Up @@ -292,7 +292,7 @@ class archival_metadata_stm final : public persisted_stm<> {
ss::shared_ptr<cloud_storage::partition_manifest> _manifest;

// The offset of the last mark_clean_cmd applied: if the manifest is
// clean, this will equal _insync_offset.
// clean, this will equal last_applied_offset.
model::offset _last_clean_at;

// The offset of the last record that modified this stm
Expand Down
10 changes: 3 additions & 7 deletions src/v/cluster/id_allocator_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ id_allocator_stm::sync(model::timeout_clock::duration timeout) {
_curr_id = _state;
_curr_batch = 0;
_processed = 0;
_next_snapshot = _insync_offset;
_next_snapshot = last_applied_offset();
}
if (_procesing_legacy) {
for (auto& cmd : _cache) {
Expand Down Expand Up @@ -129,8 +129,6 @@ ss::future<> id_allocator_stm::apply(const model::record_batch& b) {
auto& record = *r.begin();
auto rk = reflection::adl<uint8_t>{}.from(record.release_key());

_insync_offset = b.last_offset();

if (rk == allocation_cmd::record_key) {
allocation_cmd cmd = reflection::adl<allocation_cmd>{}.from(
record.release_value());
Expand Down Expand Up @@ -162,7 +160,7 @@ ss::future<> id_allocator_stm::apply(const model::record_batch& b) {
_state = cmd.next_state;

if (_next_snapshot() < 0) {
_next_snapshot = _insync_offset;
_next_snapshot = last_applied_offset();
_processed = 0;
}

Expand All @@ -186,7 +184,7 @@ ss::future<> id_allocator_stm::write_snapshot() {
return _raft
->write_snapshot(raft::write_snapshot_cfg(_next_snapshot, iobuf()))
.then([this] {
_next_snapshot = _insync_offset;
_next_snapshot = last_applied_offset();
_processed = 0;
})
.finally([this] { _is_writing_snapshot = false; });
Expand All @@ -206,8 +204,6 @@ ss::future<stm_snapshot> id_allocator_stm::take_local_snapshot() {
ss::future<> id_allocator_stm::apply_raft_snapshot(const iobuf&) {
_next_snapshot = _raft->start_offset();
_processed = 0;
set_next(_next_snapshot);
_insync_offset = model::prev_offset(_next_snapshot);
return ss::now();
}

Expand Down
10 changes: 3 additions & 7 deletions src/v/cluster/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,13 @@ persisted_stm<T>::ensure_local_snapshot_exists(model::offset target_offset) {
return wait(target_offset, model::no_timeout)
.then([this, target_offset]() {
vassert(
target_offset <= _insync_offset,
target_offset < next(),
"[{} ({})] after we waited for target_offset ({}) "
"_insync_offset "
"({}) should have matched it or bypassed",
"next ({}) must be greater",
_raft->ntp(),
name(),
target_offset,
_insync_offset);
next());
return do_write_local_snapshot();
});
});
Expand Down Expand Up @@ -535,7 +534,6 @@ ss::future<> persisted_stm<T>::start() {
snapshot.header, std::move(snapshot.data));
set_next(next_offset);
_last_snapshot_offset = snapshot.header.offset;
_insync_offset = snapshot.header.offset;
} else {
// This can happen on an out-of-date replica that re-joins the group
// after other replicas have already evicted logs to some offset
Expand All @@ -550,7 +548,6 @@ ss::future<> persisted_stm<T>::start() {
_log.debug,
"start with non-applied snapshot, set_next {}",
next_offset);
_insync_offset = model::prev_offset(next_offset);
set_next(next_offset);
}

Expand All @@ -559,7 +556,6 @@ ss::future<> persisted_stm<T>::start() {
vlog(_log.debug, "start without snapshot, maybe set_next {}", offset);

if (offset >= model::offset(0)) {
_insync_offset = model::prev_offset(offset);
set_next(offset);
}
}
Expand Down
1 change: 0 additions & 1 deletion src/v/cluster/persisted_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ class persisted_stm

bool _is_catching_up{false};
model::term_id _insync_term;
model::offset _insync_offset;
raft::consensus* _raft;
prefix_logger _log;
ss::gate _gate;
Expand Down
13 changes: 5 additions & 8 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ rm_stm::replicate_tx(model::batch_identity bid, model::record_batch_reader br) {
// this is the first attempt in the tx, reset dedupe cache
reset_seq(bid, synced_term);

_mem_state.estimated[bid.pid] = _insync_offset;
_mem_state.estimated[bid.pid] = last_applied_offset();
}

auto expiration_it = _log_state.expiration.find(bid.pid);
Expand Down Expand Up @@ -2144,8 +2144,6 @@ ss::future<> rm_stm::apply(const model::record_batch& b) {
}
}

_insync_offset = last_offset;

compact_snapshot();
if (_is_autoabort_enabled && !_is_autoabort_active) {
abort_old_txes();
Expand Down Expand Up @@ -2498,7 +2496,7 @@ ss::future<stm_snapshot> rm_stm::take_local_snapshot() {
vlog(
_ctx_log.trace,
"taking snapshot with last included offset of: {}",
model::prev_offset(_insync_offset));
last_applied_offset());

fragmented_vector<abort_index> abort_indexes;
fragmented_vector<abort_index> expired_abort_indexes;
Expand Down Expand Up @@ -2584,7 +2582,7 @@ ss::future<stm_snapshot> rm_stm::take_local_snapshot() {
tx_ss.seqs.push_back(entry.second.entry.copy());
}
}
tx_ss.offset = _insync_offset;
tx_ss.offset = last_applied_offset();

for (const auto& entry : _log_state.current_txes) {
tx_ss.tx_data.push_back(tx_snapshot::tx_data_snapshot{
Expand All @@ -2607,7 +2605,7 @@ ss::future<stm_snapshot> rm_stm::take_local_snapshot() {
for (const auto& entry : _log_state.seq_table) {
tx_ss.seqs.push_back(entry.second.entry.copy());
}
tx_ss.offset = _insync_offset;
tx_ss.offset = last_applied_offset();

for (const auto& entry : _log_state.current_txes) {
tx_ss.tx_seqs.push_back(tx_snapshot_v3::tx_seqs_snapshot{
Expand All @@ -2627,7 +2625,7 @@ ss::future<stm_snapshot> rm_stm::take_local_snapshot() {
}
return fut_serialize.then([version, &tx_ss_buf, this]() {
return stm_snapshot::create(
version, _insync_offset, std::move(tx_ss_buf));
version, last_applied_offset(), std::move(tx_ss_buf));
});
});
});
Expand Down Expand Up @@ -2733,7 +2731,6 @@ ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
_log_state.reset();
_mem_state = mem_state{_tx_root_tracker};
set_next(_raft->start_offset());
_insync_offset = model::prev_offset(_raft->start_offset());
return ss::now();
});
}
Expand Down
11 changes: 4 additions & 7 deletions src/v/cluster/tm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -882,25 +882,25 @@ ss::future<stm_snapshot> tm_stm::do_take_snapshot() {
auto snapshot_version = active_snapshot_version();
if (snapshot_version == tm_snapshot_v0::version) {
tm_snapshot_v0 tm_ss;
tm_ss.offset = _insync_offset;
tm_ss.offset = last_applied_offset();
tm_ss.transactions = _cache->get_log_transactions();

iobuf tm_ss_buf;
reflection::adl<tm_snapshot_v0>{}.to(tm_ss_buf, std::move(tm_ss));

co_return stm_snapshot::create(
tm_snapshot_v0::version, _insync_offset, std::move(tm_ss_buf));
tm_snapshot_v0::version, last_applied_offset(), std::move(tm_ss_buf));
} else {
tm_snapshot tm_ss;
tm_ss.offset = _insync_offset;
tm_ss.offset = last_applied_offset();
tm_ss.transactions = _cache->get_log_transactions();
tm_ss.hash_ranges = _hosted_txes;

iobuf tm_ss_buf;
reflection::adl<tm_snapshot>{}.to(tm_ss_buf, std::move(tm_ss));

co_return stm_snapshot::create(
tm_snapshot::version, _insync_offset, std::move(tm_ss_buf));
tm_snapshot::version, last_applied_offset(), std::move(tm_ss_buf));
}
}

Expand Down Expand Up @@ -1026,7 +1026,6 @@ ss::future<> tm_stm::apply_hosted_transactions(model::record_batch b) {

ss::future<> tm_stm::apply(const model::record_batch& b) {
const auto& hdr = b.header();
_insync_offset = b.last_offset();

if (hdr.type == model::record_batch_type::tm_update) {
return apply_tm_update(hdr, b.copy());
Expand Down Expand Up @@ -1131,8 +1130,6 @@ ss::future<> tm_stm::apply_raft_snapshot(const iobuf&) {
_cache->clear_log();
_cache->clear_mem();
_pid_tx_id.clear();
set_next(_raft->start_offset());
_insync_offset = model::prev_offset(_raft->start_offset());
return ss::now();
});
}
Expand Down

0 comments on commit 42f32ed

Please sign in to comment.