From ffc6a169f845eb794dea4ed55a9d884026de9e40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Wed, 10 Jul 2024 15:17:14 +0000 Subject: [PATCH] s/log: fixed race condition in append and truncate prefix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixed a race condition which may lead to a situation in which the same offset was assigned to two different records appended to the log. The race condition was happening when one one fiber was appending batches to the log while the other one was prefix truncating it. (Raft does it when taking a snapshot). In this situation it might happened that the batches were appended to the segment which was about to be deleted in `remove_segment_permanently`. This lead to a situation in which an appended batch was lost and the same offset was assigned to the next one. This lead to assertion triggered in `mux_state_machine` Fixes: Signed-off-by: Michał Maślanka --- src/v/storage/disk_log_appender.cc | 2 +- src/v/storage/disk_log_impl.cc | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/v/storage/disk_log_appender.cc b/src/v/storage/disk_log_appender.cc index 98f133c3dd62..dd736e9588bf 100644 --- a/src/v/storage/disk_log_appender.cc +++ b/src/v/storage/disk_log_appender.cc @@ -50,7 +50,7 @@ ss::future<> disk_log_appender::initialize() { } bool disk_log_appender::segment_is_appendable(model::term_id batch_term) const { - if (!_seg || !_seg->has_appender()) { + if (!_seg || !_seg->has_appender() || _seg->is_tombstone()) { // The latest segment with which this log_appender has called // initialize() has been rolled and no longer has an segment appender // (e.g. because segment.ms rolled onto a new segment). There is likely diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 1ad03a5c103d..7453ff02e56f 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1706,7 +1706,7 @@ ss::future<> disk_log_impl::maybe_roll_unlocked( co_return co_await new_segment(next_offset, t, iopc); } auto ptr = _segs.back(); - if (!ptr->has_appender()) { + if (!ptr->has_appender() || ptr->is_tombstone()) { co_return co_await new_segment(next_offset, t, iopc); } bool size_should_roll = false; @@ -2522,8 +2522,17 @@ disk_log_impl::remove_prefix_full_segments(truncate_prefix_config cfg) { }, [this] { auto ptr = _segs.front(); + /** + * If segment has outstanding locks wait for it to be unlocked before + * prefixing truncating it, this way the prefix truncation will + * naturally yield to the appender and it will be retried + */ + if (ptr->has_outstanding_locks()) { + return ss::sleep(1ms); + } _segs.pop_front(); _probe->add_bytes_prefix_truncated(ptr->file_size()); + return remove_segment_permanently(ptr, "remove_prefix_full_segments"); }); }