diff --git a/src/v/bytes/CMakeLists.txt b/src/v/bytes/CMakeLists.txt index 4628b525e0df..17a1e8c3629d 100644 --- a/src/v/bytes/CMakeLists.txt +++ b/src/v/bytes/CMakeLists.txt @@ -3,6 +3,9 @@ v_cc_library( bytes SRCS iobuf.cc + bytes.cc + iostream.cc + scattered_message.cc DEPS absl::hash Seastar::seastar diff --git a/src/v/bytes/bytes.cc b/src/v/bytes/bytes.cc new file mode 100644 index 000000000000..6f7b8b2a2f36 --- /dev/null +++ b/src/v/bytes/bytes.cc @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "bytes/bytes.h" + +ss::sstring to_hex(bytes_view b) { + static constexpr std::string_view digits{"0123456789abcdef"}; + ss::sstring out = ss::uninitialized_string(b.size() * 2); + const auto end = b.size(); + for (size_t i = 0; i != end; ++i) { + uint8_t x = b[i]; + out[2 * i] = digits[x >> uint8_t(4)]; + out[2 * i + 1] = digits[x & uint8_t(0xf)]; + } + return out; +} + +ss::sstring to_hex(const bytes& b) { return to_hex(bytes_view(b)); } + +std::ostream& operator<<(std::ostream& os, const bytes& b) { + return os << bytes_view(b); +} + +std::ostream& operator<<(std::ostream& os, const bytes_opt& b) { + if (b) { + return os << *b; + } + return os << "empty"; +} + +namespace std { +std::ostream& operator<<(std::ostream& os, const bytes_view& b) { + fmt::print(os, "{{bytes:{}}}", b.size()); + return os; +} +} // namespace std + +bool bytes_type_cmp::operator()(const bytes& lhs, const bytes_view& rhs) const { + return lhs < rhs; +} + +bool bytes_type_cmp::operator()(const bytes& lhs, const bytes& rhs) const { + return lhs < rhs; +} diff --git a/src/v/bytes/iobuf.cc b/src/v/bytes/iobuf.cc index 2411c0c234f5..19f2b94a40ce 100644 --- a/src/v/bytes/iobuf.cc +++ b/src/v/bytes/iobuf.cc @@ -10,10 +10,7 @@ #include "bytes/iobuf.h" #include "base/vassert.h" -#include "bytes/bytes.h" #include "bytes/details/io_allocation_size.h" -#include "bytes/iostream.h" -#include "bytes/scattered_message.h" #include #include @@ -28,106 +25,6 @@ std::ostream& operator<<(std::ostream& o, const iobuf& io) { return o << "{bytes=" << io.size_bytes() << ", fragments=" << std::distance(io.cbegin(), io.cend()) << "}"; } -ss::scattered_message iobuf_as_scattered(iobuf b) { - ss::scattered_message msg; - auto in = iobuf::iterator_consumer(b.cbegin(), b.cend()); - int32_t chunk_no = 0; - in.consume( - b.size_bytes(), [&msg, &chunk_no, &b](const char* src, size_t sz) { - ++chunk_no; - vassert( - chunk_no <= std::numeric_limits::max(), - "Invalid construction of scattered_message. fragment coutn exceeds " - "max count:{}. Usually a bug with small append() to iobuf. {}", - chunk_no, - b); - msg.append_static(src, sz); - return ss::stop_iteration::no; - }); - msg.on_delete([b = std::move(b)] {}); - return msg; -} - -ss::future<> -write_iobuf_to_output_stream(iobuf buf, ss::output_stream& output) { - return ss::do_with(std::move(buf), [&output](iobuf& buf) { - return ss::do_for_each(buf, [&output](iobuf::fragment& f) { - return output.write(f.get(), f.size()); - }); - }); -} - -ss::future read_iobuf_exactly(ss::input_stream& in, size_t n) { - return ss::do_with(iobuf{}, n, [&in](iobuf& b, size_t& n) { - return ss::do_until( - [&n] { return n == 0; }, - [&n, &in, &b] { - return in.read_up_to(n).then( - [&n, &b](ss::temporary_buffer buf) { - if (buf.empty()) { - n = 0; - return; - } - n -= buf.size(); - b.append(std::move(buf)); - }); - }) - .then([&b] { return std::move(b); }); - }); -} - -ss::output_stream make_iobuf_ref_output_stream(iobuf& io) { - struct iobuf_output_stream final : ss::data_sink_impl { - explicit iobuf_output_stream(iobuf& i) - : io(i) {} - ss::future<> put(ss::net::packet data) final { - auto all = data.release(); - for (auto& b : all) { - io.append(std::move(b)); - } - return ss::make_ready_future<>(); - } - ss::future<> put(std::vector> all) final { - for (auto& b : all) { - io.append(std::move(b)); - } - return ss::make_ready_future<>(); - } - ss::future<> put(ss::temporary_buffer buf) final { - io.append(std::move(buf)); - return ss::make_ready_future<>(); - } - ss::future<> flush() final { return ss::make_ready_future<>(); } - ss::future<> close() final { return ss::make_ready_future<>(); } - iobuf& io; - }; - const size_t sz = io.size_bytes(); - return ss::output_stream( - ss::data_sink(std::make_unique(io)), sz); -} -ss::input_stream make_iobuf_input_stream(iobuf io) { - struct iobuf_input_stream final : ss::data_source_impl { - explicit iobuf_input_stream(iobuf i) - : io(std::move(i)) {} - ss::future> skip(uint64_t n) final { - io.trim_front(n); - return get(); - } - ss::future> get() final { - if (io.begin() == io.end()) { - return ss::make_ready_future>(); - } - auto buf = io.begin()->share(); - io.pop_front(); - return ss::make_ready_future>( - std::move(buf)); - } - iobuf io; - }; - auto ds = ss::data_source( - std::make_unique(std::move(io))); - return ss::input_stream(std::move(ds)); -} iobuf iobuf::copy() const { auto in = iobuf::iterator_consumer(cbegin(), cend()); @@ -315,43 +212,3 @@ iobuf::placeholder iobuf::reserve(size_t sz) { it->reserve(sz); return p; } - -ss::sstring to_hex(bytes_view b) { - static constexpr std::string_view digits{"0123456789abcdef"}; - ss::sstring out = ss::uninitialized_string(b.size() * 2); - const auto end = b.size(); - for (size_t i = 0; i != end; ++i) { - uint8_t x = b[i]; - out[2 * i] = digits[x >> uint8_t(4)]; - out[2 * i + 1] = digits[x & uint8_t(0xf)]; - } - return out; -} - -ss::sstring to_hex(const bytes& b) { return to_hex(bytes_view(b)); } - -std::ostream& operator<<(std::ostream& os, const bytes& b) { - return os << bytes_view(b); -} - -std::ostream& operator<<(std::ostream& os, const bytes_opt& b) { - if (b) { - return os << *b; - } - return os << "empty"; -} - -namespace std { -std::ostream& operator<<(std::ostream& os, const bytes_view& b) { - fmt::print(os, "{{bytes:{}}}", b.size()); - return os; -} -} // namespace std - -bool bytes_type_cmp::operator()(const bytes& lhs, const bytes_view& rhs) const { - return lhs < rhs; -} - -bool bytes_type_cmp::operator()(const bytes& lhs, const bytes& rhs) const { - return lhs < rhs; -} diff --git a/src/v/bytes/iostream.cc b/src/v/bytes/iostream.cc new file mode 100644 index 000000000000..3dac70a0ae38 --- /dev/null +++ b/src/v/bytes/iostream.cc @@ -0,0 +1,93 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "bytes/iostream.h" + +ss::input_stream make_iobuf_input_stream(iobuf io) { + struct iobuf_input_stream final : ss::data_source_impl { + explicit iobuf_input_stream(iobuf i) + : io(std::move(i)) {} + ss::future> skip(uint64_t n) final { + io.trim_front(n); + return get(); + } + ss::future> get() final { + if (io.begin() == io.end()) { + return ss::make_ready_future>(); + } + auto buf = io.begin()->share(); + io.pop_front(); + return ss::make_ready_future>( + std::move(buf)); + } + iobuf io; + }; + auto ds = ss::data_source( + std::make_unique(std::move(io))); + return ss::input_stream(std::move(ds)); +} + +ss::output_stream make_iobuf_ref_output_stream(iobuf& io) { + struct iobuf_output_stream final : ss::data_sink_impl { + explicit iobuf_output_stream(iobuf& i) + : io(i) {} + ss::future<> put(ss::net::packet data) final { + auto all = data.release(); + for (auto& b : all) { + io.append(std::move(b)); + } + return ss::make_ready_future<>(); + } + ss::future<> put(std::vector> all) final { + for (auto& b : all) { + io.append(std::move(b)); + } + return ss::make_ready_future<>(); + } + ss::future<> put(ss::temporary_buffer buf) final { + io.append(std::move(buf)); + return ss::make_ready_future<>(); + } + ss::future<> flush() final { return ss::make_ready_future<>(); } + ss::future<> close() final { return ss::make_ready_future<>(); } + iobuf& io; + }; + const size_t sz = io.size_bytes(); + return ss::output_stream( + ss::data_sink(std::make_unique(io)), sz); +} + +ss::future read_iobuf_exactly(ss::input_stream& in, size_t n) { + return ss::do_with(iobuf{}, n, [&in](iobuf& b, size_t& n) { + return ss::do_until( + [&n] { return n == 0; }, + [&n, &in, &b] { + return in.read_up_to(n).then( + [&n, &b](ss::temporary_buffer buf) { + if (buf.empty()) { + n = 0; + return; + } + n -= buf.size(); + b.append(std::move(buf)); + }); + }) + .then([&b] { return std::move(b); }); + }); +} + +ss::future<> +write_iobuf_to_output_stream(iobuf buf, ss::output_stream& output) { + return ss::do_with(std::move(buf), [&output](iobuf& buf) { + return ss::do_for_each(buf, [&output](iobuf::fragment& f) { + return output.write(f.get(), f.size()); + }); + }); +} diff --git a/src/v/bytes/scattered_message.cc b/src/v/bytes/scattered_message.cc new file mode 100644 index 000000000000..59eb1c2cdcf8 --- /dev/null +++ b/src/v/bytes/scattered_message.cc @@ -0,0 +1,31 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#include "bytes/scattered_message.h" + +ss::scattered_message iobuf_as_scattered(iobuf b) { + ss::scattered_message msg; + auto in = iobuf::iterator_consumer(b.cbegin(), b.cend()); + int32_t chunk_no = 0; + in.consume( + b.size_bytes(), [&msg, &chunk_no, &b](const char* src, size_t sz) { + ++chunk_no; + vassert( + chunk_no <= std::numeric_limits::max(), + "Invalid construction of scattered_message. fragment coutn exceeds " + "max count:{}. Usually a bug with small append() to iobuf. {}", + chunk_no, + b); + msg.append_static(src, sz); + return ss::stop_iteration::no; + }); + msg.on_delete([b = std::move(b)] {}); + return msg; +} diff --git a/src/v/cluster/tests/randoms.h b/src/v/cluster/tests/randoms.h index 535ff0857900..21b41140ff4c 100644 --- a/src/v/cluster/tests/randoms.h +++ b/src/v/cluster/tests/randoms.h @@ -14,11 +14,13 @@ #include "cluster/health_monitor_types.h" #include "cluster/partition_balancer_state.h" #include "cluster/partition_balancer_types.h" +#include "cluster/producer_state.h" #include "cluster/rm_stm_types.h" #include "model/tests/randoms.h" #include "random/generators.h" #include "storage/tests/randoms.h" #include "test_utils/randoms.h" +#include "utils/prefix_logger.h" #include diff --git a/src/v/container/include/container/chunked_hash_map.h b/src/v/container/include/container/chunked_hash_map.h index 599385959095..9064c93b930d 100644 --- a/src/v/container/include/container/chunked_hash_map.h +++ b/src/v/container/include/container/chunked_hash_map.h @@ -10,9 +10,10 @@ */ #pragma once +#include "container/fragmented_vector.h" + #include #include -#include #include diff --git a/src/v/container/include/container/interval_set.h b/src/v/container/include/container/interval_set.h index b628cac41ba3..a2c8bbe3e7c7 100644 --- a/src/v/container/include/container/interval_set.h +++ b/src/v/container/include/container/interval_set.h @@ -10,7 +10,7 @@ */ #pragma once -#include +#include /** * A container that contains non-empty, open intervals. @@ -29,25 +29,9 @@ */ template class interval_set { - struct key { - // Inclusive. - T start; - - // Exclusive. - T end; - }; - struct compare { - using is_transparent = void; - - bool operator()(const key& a, const key& b) const { - return a.start < b.start; - } - - bool operator()(const T& a, const key& b) const { return a < b.start; } - bool operator()(const key& a, const T& b) const { return a.start < b; } - bool operator()(const T& a, const T& b) const { return a < b; } - }; - using set_t = absl::btree_set; + // Key = interval start (inclusive) + // Value = interval end (exclusive) + using set_t = absl::btree_map; public: using const_iterator = set_t::const_iterator; @@ -103,6 +87,12 @@ class interval_set { */ [[nodiscard]] size_t size() const; + /** + * Convenience wrappers. + */ + static auto to_start(const_iterator it) { return it->first; } + static auto to_end(const_iterator it) { return it->second; } + private: /** * Extend the interval being pointed at with any intervals that overlap @@ -113,20 +103,22 @@ class interval_set { std::pair merge_right(const_iterator start_it); set_t set_; + + static auto& to_end(iterator it) { return it->second; } }; template std::pair::const_iterator, bool> interval_set::merge_right(const_iterator start_it) { - auto start = start_it->start; - auto merged_end = start_it->end; + auto start = to_start(start_it); + auto merged_end = to_end(start_it); auto next_it = std::next(start_it); auto merge_end_it = next_it; // Seek forward as long as the next interval if it overlaps with our merged // interval. NOTE: <= because these are open intervals. - while (merge_end_it != set_.end() && merge_end_it->start <= merged_end) { - merged_end = std::max(merge_end_it->end, merged_end); + while (merge_end_it != set_.end() && to_start(merge_end_it) <= merged_end) { + merged_end = std::max(to_end(merge_end_it), merged_end); merge_end_it = std::next(merge_end_it); } if (merge_end_it == next_it) { @@ -136,7 +128,7 @@ interval_set::merge_right(const_iterator start_it) { // Replace our initial iterator and subsequent intervals with a merged // version. set_.erase(start_it, merge_end_it); - return set_.emplace(key{start, merged_end}); + return set_.emplace(start, merged_end); } template @@ -150,7 +142,7 @@ interval_set::insert(interval interval) { const auto input_start = interval.start; const auto input_end = input_start + length; if (set_.empty()) { - return set_.emplace(key{input_start, input_end}); + return set_.emplace(input_start, input_end); } auto it = set_.lower_bound(input_start); @@ -162,8 +154,8 @@ interval_set::insert(interval interval) { // [ ) case 2 // In either case, just merge the expand the bounds of the existing // iterator. - if (it != set_.end() && input_start == it->start) { - it->end = std::max(input_end, it->end); + if (it != set_.end() && input_start == to_start(it)) { + to_end(it) = std::max(input_end, to_end(it)); return merge_right(it); } @@ -178,14 +170,14 @@ interval_set::insert(interval interval) { // with it. if (it != set_.begin()) { auto prev = std::prev(it); - if (prev->end >= input_start) { - prev->end = std::max(input_end, prev->end); + if (to_end(prev) >= input_start) { + to_end(prev) = std::max(input_end, to_end(prev)); return merge_right(prev); } // Intentional fallthrough. } // Case 2: there's no overlap to the left. Just insert and merge forward. - auto ret = set_.emplace(key{input_start, input_end}); + auto ret = set_.emplace(input_start, input_end); return merge_right(ret.first); } @@ -198,7 +190,7 @@ interval_set::const_iterator interval_set::find(T index) const { } it = std::prev(it); - } else if (it->start == index) { + } else if (to_start(it) == index) { return it; } else if (it == set_.cbegin()) { // Equality condition failing before this means that the index is @@ -208,8 +200,8 @@ interval_set::const_iterator interval_set::find(T index) const { --it; } - assert(it->start < index); - if (index < it->end) { + assert(to_start(it) < index); + if (index < to_end(it)) { return it; } diff --git a/src/v/container/tests/bench_utils.h b/src/v/container/tests/bench_utils.h index 736dd5bffe5d..7155a89f0b90 100644 --- a/src/v/container/tests/bench_utils.h +++ b/src/v/container/tests/bench_utils.h @@ -11,7 +11,6 @@ #pragma once #include "base/type_traits.h" #include "random/generators.h" -#include "utils/functional.h" #include diff --git a/src/v/container/tests/interval_set_test.cc b/src/v/container/tests/interval_set_test.cc index 077c4ce994cd..17d001f03bd8 100644 --- a/src/v/container/tests/interval_set_test.cc +++ b/src/v/container/tests/interval_set_test.cc @@ -27,8 +27,8 @@ namespace { void check_no_overlap(const set_t& s) { std::optional prev_last; for (const auto& interval : s) { - auto start = interval.start; - auto end = interval.end; + auto start = interval.first; + auto end = interval.second; EXPECT_GT(end, start); if (prev_last) { EXPECT_GT(start, prev_last.value()); @@ -63,8 +63,8 @@ TEST(IntervalSet, InsertMergeOverlappingIntervals) { // [i, i + 10) const auto ret1 = set.insert({0, 10}); EXPECT_TRUE(ret1.second); - EXPECT_EQ(ret1.first->start, 0); - EXPECT_EQ(ret1.first->end, 10); + EXPECT_EQ(ret1.first->first, 0); + EXPECT_EQ(ret1.first->second, 10); // Insertion of a sub-interval results in the first interval being // extended. Note that because of this, the iterators are safe (i.e. @@ -73,8 +73,8 @@ TEST(IntervalSet, InsertMergeOverlappingIntervals) { auto expected_end = 10 + i; EXPECT_TRUE(ret2.second); EXPECT_EQ(ret1, ret2); - EXPECT_EQ(ret1.first->start, 0); - EXPECT_EQ(ret1.first->end, expected_end); + EXPECT_EQ(ret1.first->first, 0); + EXPECT_EQ(ret1.first->second, expected_end); // Confirm we can still find the expanded intervals. const auto found_iter_first = set.find(0); @@ -99,8 +99,8 @@ TEST(IntervalSet, InsertMergesAdjacentIntervals) { // Since the intervals were exactly adjacent, they should be merged. EXPECT_EQ(ret1, ret2); - EXPECT_EQ(ret1.first->start, 0); - EXPECT_EQ(ret1.first->end, 20); + EXPECT_EQ(ret1.first->first, 0); + EXPECT_EQ(ret1.first->second, 20); check_no_overlap(set); } @@ -139,8 +139,8 @@ TEST(IntervalSet, InsertOverlapFront) { EXPECT_TRUE(ret3.second); EXPECT_EQ(ret1, ret2); EXPECT_EQ(ret1, ret3); - EXPECT_EQ(ret1.first->start, 0); - EXPECT_EQ(ret1.first->end, 100); + EXPECT_EQ(ret1.first->first, 0); + EXPECT_EQ(ret1.first->second, 100); EXPECT_EQ(1, set.size()); check_no_overlap(set); @@ -150,8 +150,8 @@ TEST(IntervalSet, InsertOverlapCompletely) { set_t set; EXPECT_TRUE(set.insert({10, 20}).second); EXPECT_TRUE(set.insert({0, 100}).second); - EXPECT_EQ(set.begin()->start, 0); - EXPECT_EQ(set.begin()->end, 100); + EXPECT_EQ(set.begin()->first, 0); + EXPECT_EQ(set.begin()->second, 100); check_no_overlap(set); } @@ -239,16 +239,16 @@ TEST(IntervalSet, EraseBeginToEnd) { EXPECT_TRUE(set.insert({20, 10}).second); EXPECT_TRUE(set.insert({40, 10}).second); EXPECT_EQ(3, set.size()); - EXPECT_TRUE(set.begin()->start == 0); - EXPECT_TRUE(set.begin()->end == 10); + EXPECT_TRUE(set.begin()->first == 0); + EXPECT_TRUE(set.begin()->second == 10); set.erase(set.begin()); - EXPECT_EQ(set.begin()->start, 20); - EXPECT_EQ(set.begin()->end, 30); + EXPECT_EQ(set.begin()->first, 20); + EXPECT_EQ(set.begin()->second, 30); set.erase(set.begin()); - EXPECT_EQ(set.begin()->start, 40); - EXPECT_EQ(set.begin()->end, 50); + EXPECT_EQ(set.begin()->first, 40); + EXPECT_EQ(set.begin()->second, 50); set.erase(set.begin()); EXPECT_EQ(set.begin(), set.end()); @@ -306,11 +306,11 @@ TEST_P(RandomizedIntervalSetTest, RandomInsertsSequentialErase) { while (!filled_intervals.empty()) { auto it = filled_intervals.begin(); if (highest_so_far) { - EXPECT_GT(it->start, *highest_so_far); + EXPECT_GT(it->first, *highest_so_far); } - highest_so_far = it->end; - interval_filled_size += it->end - it->start; - for (auto i = it->start; i < it->end; i++) { + highest_so_far = it->second; + interval_filled_size += it->second - it->first; + for (auto i = it->first; i < it->second; i++) { EXPECT_EQ(buf[i], 'X'); } filled_intervals.erase(it); diff --git a/src/v/container/tests/vector_bench.cc b/src/v/container/tests/vector_bench.cc index e60832960760..c8dded90c92e 100644 --- a/src/v/container/tests/vector_bench.cc +++ b/src/v/container/tests/vector_bench.cc @@ -13,7 +13,6 @@ #include "container/fragmented_vector.h" #include "container/tests/bench_utils.h" #include "random/generators.h" -#include "utils/functional.h" #include diff --git a/src/v/serde/test/serde_test.cc b/src/v/serde/test/serde_test.cc index 99d91f8b24b1..8050dc4ad9ea 100644 --- a/src/v/serde/test/serde_test.cc +++ b/src/v/serde/test/serde_test.cc @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0 +#include "bytes/random.h" #include "container/fragmented_vector.h" #include "hashing/crc32c.h" #include "model/fundamental.h" diff --git a/src/v/ssx/include/ssx/thread_worker.h b/src/v/ssx/include/ssx/thread_worker.h index 83bbdd400be9..250133217322 100644 --- a/src/v/ssx/include/ssx/thread_worker.h +++ b/src/v/ssx/include/ssx/thread_worker.h @@ -13,7 +13,6 @@ #include "base/seastarx.h" #include "base/vassert.h" -#include "utils/mutex.h" #include #include diff --git a/src/v/storage/mvlog/segment_reader.cc b/src/v/storage/mvlog/segment_reader.cc index b1b8d69dbfc7..48747b2168ac 100644 --- a/src/v/storage/mvlog/segment_reader.cc +++ b/src/v/storage/mvlog/segment_reader.cc @@ -26,6 +26,16 @@ segment_reader::segment_reader( } segment_reader::~segment_reader() { --segment_->num_readers_; } +namespace { +auto to_start(interval_set::const_iterator it) { + return interval_set::to_start(it); +} + +auto to_end(interval_set::const_iterator it) { + return interval_set::to_end(it); +} +} // namespace + skipping_data_source::read_list_t segment_reader::make_read_intervals(size_t start_pos, size_t length) const { auto gap_it = gaps_.begin(); @@ -37,22 +47,22 @@ segment_reader::make_read_intervals(size_t start_pos, size_t length) const { // intervals to read, and skipping any that are entirely below the start // position. while (gap_it != gaps_.end() && cur_iter_pos <= max_pos) { - const auto next_gap_max = gap_it->end - 1; + const auto next_gap_max = to_end(gap_it) - 1; if (cur_iter_pos > next_gap_max) { // We are ahead of the next gap. Drop it from the list to consider. ++gap_it; continue; } - if (cur_iter_pos >= gap_it->start) { + if (cur_iter_pos >= to_start(gap_it)) { // We are in the middle of a gap. Skip to just past the end. // NOTE: the gap end is exclusive. - cur_iter_pos = gap_it->end; + cur_iter_pos = to_end(gap_it); ++gap_it; continue; } // The next gap is ahead of us. Read up to the start of it and skip // over the gap. - const auto read_max_pos = std::min(max_pos, gap_it->start - 1); + const auto read_max_pos = std::min(max_pos, to_start(gap_it) - 1); const auto read_length = read_max_pos - cur_iter_pos + 1; read_intervals.emplace_back(cur_iter_pos, read_length); vlog( @@ -62,7 +72,7 @@ segment_reader::make_read_intervals(size_t start_pos, size_t length) const { read_max_pos + 1, start_pos, max_pos + 1); - cur_iter_pos = gap_it->end; + cur_iter_pos = to_end(gap_it); ++gap_it; } // No more gaps, read the rest of the range. diff --git a/src/v/storage/mvlog/tests/active_segment_test.cc b/src/v/storage/mvlog/tests/active_segment_test.cc index 37a9bb7cdc9d..6d9cddd3b36f 100644 --- a/src/v/storage/mvlog/tests/active_segment_test.cc +++ b/src/v/storage/mvlog/tests/active_segment_test.cc @@ -17,6 +17,7 @@ #include "test_utils/gtest_utils.h" #include +#include #include diff --git a/src/v/test_utils/gtest_utils.cc b/src/v/test_utils/gtest_utils.cc index 67f486abd996..e570f527b425 100644 --- a/src/v/test_utils/gtest_utils.cc +++ b/src/v/test_utils/gtest_utils.cc @@ -11,7 +11,6 @@ #include "test_utils/gtest_utils.h" #include "base/vassert.h" -#include "random/generators.h" #include diff --git a/src/v/test_utils/randoms.h b/src/v/test_utils/randoms.h index 6e35f31f0efb..f628f9ed5e51 100644 --- a/src/v/test_utils/randoms.h +++ b/src/v/test_utils/randoms.h @@ -10,8 +10,6 @@ */ #pragma once -#include "bytes/random.h" -#include "cluster/producer_state.h" #include "container/fragmented_vector.h" #include "random/generators.h" #include "utils/tristate.h"