Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvlog: miscellaneous non-functional changes #18519

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/v/container/include/container/interval_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class interval_set {
*/
void erase(const_iterator it);

/**
* Return the number of intervals in the set.
*/
[[nodiscard]] size_t size() const;

private:
/**
* Extend the interval being pointed at with any intervals that overlap
Expand Down Expand Up @@ -230,3 +235,8 @@ template<std::integral T>
void interval_set<T>::erase(interval_set<T>::const_iterator it) {
set_.erase(it);
}

template<std::integral T>
size_t interval_set<T>::size() const {
return set_.size();
}
6 changes: 6 additions & 0 deletions src/v/container/tests/interval_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ TEST(IntervalSet, InsertWithGap) {
// [0, 10) [11, 21): a gap at [10, 11).
EXPECT_TRUE(set.insert({0, 10}).second);
EXPECT_TRUE(set.insert({11, 10}).second);
EXPECT_EQ(2, set.size());

// We can't find the gap.
auto found = set.find(10);
Expand Down Expand Up @@ -140,6 +141,7 @@ TEST(IntervalSet, InsertOverlapFront) {
EXPECT_EQ(ret1, ret3);
EXPECT_EQ(ret1.first->start, 0);
EXPECT_EQ(ret1.first->end, 100);
EXPECT_EQ(1, set.size());

check_no_overlap(set);
}
Expand Down Expand Up @@ -212,6 +214,7 @@ TEST(IntervalSet, Erase) {
set_t set;
auto res = set.insert({0, 10});
EXPECT_FALSE(set.empty());
EXPECT_EQ(1, set.size());
set.erase(res.first);
EXPECT_TRUE(set.empty());
}
Expand All @@ -222,9 +225,11 @@ TEST(IntervalSet, EraseMerged) {
EXPECT_TRUE(set.insert({20, 10}).second);
EXPECT_TRUE(set.insert({10, 10}).second);
EXPECT_FALSE(set.empty());
EXPECT_EQ(1, set.size());
auto found = set.find(0);
EXPECT_NE(found, set.end());
set.erase(found);
EXPECT_EQ(0, set.size());
EXPECT_TRUE(set.empty());
}

Expand All @@ -233,6 +238,7 @@ TEST(IntervalSet, EraseBeginToEnd) {
EXPECT_TRUE(set.insert({0, 10}).second);
EXPECT_TRUE(set.insert({20, 10}).second);
EXPECT_TRUE(set.insert({40, 10}).second);
EXPECT_EQ(3, set.size());
EXPECT_TRUE(set.begin()->start == 0);
EXPECT_TRUE(set.begin()->end == 10);

Expand Down
11 changes: 4 additions & 7 deletions src/v/storage/mvlog/readable_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,25 @@

#include <memory>

namespace experimental::io {
class pager;
}

namespace storage::experimental::mvlog {

class file;
class segment_reader;

// A readable segment file. This is a long-lived object, responsible for
// passing out short-lived readers.
class readable_segment {
public:
explicit readable_segment(::experimental::io::pager* pager)
: pager_(pager) {}
explicit readable_segment(file* f)
: file_(f) {}

std::unique_ptr<segment_reader> make_reader();
size_t num_readers() const { return num_readers_; }

private:
friend class segment_reader;

::experimental::io::pager* pager_;
file* file_;

size_t num_readers_{0};
};
Expand Down
12 changes: 4 additions & 8 deletions src/v/storage/mvlog/segment_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
#include "storage/mvlog/segment_appender.h"

#include "base/vlog.h"
#include "io/pager.h"
#include "model/record.h"
#include "storage/mvlog/entry.h"
#include "storage/mvlog/entry_stream_utils.h"
#include "storage/mvlog/file.h"
#include "storage/mvlog/logger.h"
#include "storage/record_batch_utils.h"

Expand All @@ -34,22 +34,18 @@ ss::future<> segment_appender::append(model::record_batch batch) {
body_size,
entry_type::record_batch,
};
auto orig_size = pager_->size();
auto orig_size = file_->size();
iobuf buf;
buf.append(entry_header_to_iobuf(entry_hdr));
buf.append(std::move(entry_body_buf));
// TODO(awong): the entire buffer probably needs to be appended atomically.
for (auto& io_frag : buf) {
co_await pager_->append(std::move(io_frag).release());
}
// TODO(awong): add a filename to this message.
co_await file_->append(std::move(buf));
vlog(
log.trace,
"Appended offsets [{}, {}], pos [{}, {})",
batch.base_offset(),
batch.last_offset(),
orig_size,
pager_->size());
file_->size());
}

} // namespace storage::experimental::mvlog
14 changes: 6 additions & 8 deletions src/v/storage/mvlog/segment_appender.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,26 @@
#include "model/record.h"
#include "storage/mvlog/entry.h"

namespace experimental::io {
class pager;
} // namespace experimental::io

namespace storage::experimental::mvlog {

class file;

// Encapsulates writes to a given segment file.
//
// This class is not thread-safe. It is expected that callers will externally
// serialize calls into the appender.
class segment_appender {
public:
explicit segment_appender(::experimental::io::pager* pager)
: pager_(pager) {}
explicit segment_appender(file* file)
: file_(file) {}

// Serializes and appends the given batch to the underlying segment.
// Callers are expected to flush the file after this returns.
ss::future<> append(model::record_batch);

private:
// The pager with which to perform I/O.
::experimental::io::pager* pager_;
// The paging file with which to perform I/O.
file* file_;
};

} // namespace storage::experimental::mvlog
7 changes: 3 additions & 4 deletions src/v/storage/mvlog/segment_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
#include "storage/mvlog/segment_reader.h"

#include "base/vlog.h"
#include "io/pager.h"
#include "io/paging_data_source.h"
#include "storage/mvlog/file.h"
#include "storage/mvlog/logger.h"
#include "storage/mvlog/readable_segment.h"
#include "storage/mvlog/skipping_data_source.h"
Expand All @@ -33,7 +32,7 @@ segment_reader::make_read_intervals(size_t start_pos, size_t length) const {
}

ss::input_stream<char> segment_reader::make_stream(size_t start_pos) const {
return make_stream(start_pos, segment_->pager_->size() - start_pos);
return make_stream(start_pos, segment_->file_->size() - start_pos);
}

ss::input_stream<char>
Expand All @@ -48,7 +47,7 @@ segment_reader::make_stream(size_t start_pos, size_t length) const {
}
return ss::input_stream<char>(
ss::data_source(std::make_unique<skipping_data_source>(
segment_->pager_, std::move(read_intervals))));
segment_->file_, std::move(read_intervals))));
}

} // namespace storage::experimental::mvlog
15 changes: 4 additions & 11 deletions src/v/storage/mvlog/skipping_data_source.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
#include "storage/mvlog/skipping_data_source.h"

#include "io/pager.h"
#include "io/paging_data_source.h"
#include "storage/mvlog/file.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/temporary_buffer.hh>

using ::experimental::io::paging_data_source;

namespace storage::experimental::mvlog {

ss::future<ss::temporary_buffer<char>> skipping_data_source::get() noexcept {
Expand All @@ -28,20 +26,15 @@ ss::future<ss::temporary_buffer<char>> skipping_data_source::get() noexcept {
co_return ss::temporary_buffer<char>();
}
auto first_read = *reads_.begin();
const auto end_pos = pager_->size();
const auto end_pos = file_->size();
if (first_read.offset >= end_pos) {
// Skip the read if starts past the end of the file.
reads_.pop_front();
continue;
}
const auto max_len = end_pos - first_read.offset;

// Cap the read to the end of the file.
cur_stream_ = ss::input_stream<char>(
ss::data_source(std::make_unique<paging_data_source>(
pager_,
paging_data_source::config{
first_read.offset, std::min(max_len, first_read.length)})));
cur_stream_ = file_->make_stream(
first_read.offset, std::min(max_len, first_read.length));
reads_.pop_front();
}
// Keep using the stream until it hits the end of the stream and the
Expand Down
11 changes: 4 additions & 7 deletions src/v/storage/mvlog/skipping_data_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>

namespace experimental::io {
class pager;
} // namespace experimental::io

namespace storage::experimental::mvlog {
class file;

class skipping_data_source final : public seastar::data_source_impl {
public:
Expand All @@ -28,16 +25,16 @@ class skipping_data_source final : public seastar::data_source_impl {
uint64_t length;
};
using read_list_t = ss::chunked_fifo<read_interval>;
skipping_data_source(::experimental::io::pager* pager, read_list_t reads)
skipping_data_source(file* f, read_list_t reads)
: reads_(std::move(reads))
, pager_(pager) {}
, file_(f) {}

ss::future<ss::temporary_buffer<char>> get() noexcept override;

private:
read_list_t reads_;
std::optional<ss::input_stream<char>> cur_stream_;
::experimental::io::pager* pager_;
file* file_;
};

} // namespace storage::experimental::mvlog
46 changes: 12 additions & 34 deletions src/v/storage/mvlog/tests/segment_appender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "base/units.h"
#include "io/page_cache.h"
#include "io/pager.h"
#include "io/paging_data_source.h"
#include "io/persistence.h"
#include "io/scheduler.h"
#include "model/record.h"
#include "model/tests/random_batch.h"
#include "storage/mvlog/entry_stream_utils.h"
#include "storage/mvlog/file.h"
#include "storage/mvlog/segment_appender.h"
#include "storage/record_batch_utils.h"

Expand All @@ -29,19 +24,11 @@ using namespace ::experimental;
class SegmentAppenderTest : public ::testing::Test {
public:
void SetUp() override {
storage_ = std::make_unique<io::disk_persistence>();
storage_->create(file_.string()).get()->close().get();
cleanup_files_.emplace_back(file_);

io::page_cache::config cache_config{
.cache_size = 2_MiB, .small_size = 1_MiB};
cache_ = std::make_unique<io::page_cache>(cache_config);
scheduler_ = std::make_unique<io::scheduler>(100);
pager_ = std::make_unique<io::pager>(
file_, 0, storage_.get(), cache_.get(), scheduler_.get());
paging_file_ = file_manager_.create_file(file_).get();
}
void TearDown() override {
pager_->close().get();
paging_file_->close().get();
for (auto& file : cleanup_files_) {
try {
ss::remove_file(file.string()).get();
Expand All @@ -52,15 +39,13 @@ class SegmentAppenderTest : public ::testing::Test {

protected:
const std::filesystem::path file_{"segment"};
std::unique_ptr<io::persistence> storage_;
std::unique_ptr<io::page_cache> cache_;
std::unique_ptr<io::scheduler> scheduler_;
std::unique_ptr<io::pager> pager_;
file_manager file_manager_;
std::unique_ptr<file> paging_file_;
std::vector<std::filesystem::path> cleanup_files_;
};

TEST_F(SegmentAppenderTest, TestAppendRecordBatches) {
segment_appender appender(pager_.get());
segment_appender appender(paging_file_.get());
auto batches = model::test::make_random_batches().get();

size_t prev_end_pos = 0;
Expand All @@ -75,16 +60,13 @@ TEST_F(SegmentAppenderTest, TestAppendRecordBatches) {

appender.append(batch.copy()).get();
ASSERT_EQ(
pager_->size(),
paging_file_->size(),
prev_end_pos + entry_body_buf.size_bytes()
+ packed_entry_header_size);

// The resulting pages should contain the header...
auto hdr_stream = ss::input_stream<char>(
ss::data_source(std::make_unique<io::paging_data_source>(
pager_.get(),
io::paging_data_source::config{
prev_end_pos, packed_entry_header_size})));
auto hdr_stream = paging_file_->make_stream(
prev_end_pos, packed_entry_header_size);
iobuf hdr_buf;
hdr_buf.append(hdr_stream.read_exactly(packed_entry_header_size).get());

Expand All @@ -95,16 +77,12 @@ TEST_F(SegmentAppenderTest, TestAppendRecordBatches) {
ASSERT_EQ(entry_header_crc(hdr.body_size, hdr.type), hdr.header_crc);

// ... followed by the entry body.
auto body_stream = ss::input_stream<char>(
ss::data_source(std::make_unique<io::paging_data_source>(
pager_.get(),
io::paging_data_source::config{
prev_end_pos + packed_entry_header_size,
entry_body_buf.size_bytes()})));
auto body_stream = paging_file_->make_stream(
prev_end_pos + packed_entry_header_size, entry_body_buf.size_bytes());
iobuf body_buf;
body_buf.append(
body_stream.read_exactly(entry_body_buf.size_bytes()).get());
ASSERT_EQ(entry_body_buf, body_buf);
prev_end_pos = pager_->size();
prev_end_pos = paging_file_->size();
}
}
Loading
Loading