diff --git a/src/v/storage/mvlog/CMakeLists.txt b/src/v/storage/mvlog/CMakeLists.txt index c177ee81e63b5..41cc275db04e4 100644 --- a/src/v/storage/mvlog/CMakeLists.txt +++ b/src/v/storage/mvlog/CMakeLists.txt @@ -8,6 +8,7 @@ v_cc_library( entry.cc entry_stream.cc entry_stream_utils.cc + file.cc logger.cc readable_segment.cc segment_appender.cc diff --git a/src/v/storage/mvlog/file.cc b/src/v/storage/mvlog/file.cc new file mode 100644 index 0000000000000..43bc9f80cb821 --- /dev/null +++ b/src/v/storage/mvlog/file.cc @@ -0,0 +1,97 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#include "storage/mvlog/file.h" + +#include "base/units.h" +#include "io/page_cache.h" +#include "io/pager.h" +#include "io/paging_data_source.h" +#include "io/persistence.h" +#include "io/scheduler.h" + +namespace storage::experimental::mvlog { +file::~file() = default; + +size_t file::size() const { + vassert(pager_, "file has been closed"); + return pager_->size(); +} + +ss::input_stream file::make_stream(uint64_t offset, uint64_t length) { + vassert(pager_, "file has been closed"); + auto max_len = size() - offset; + return ss::input_stream( + ss::data_source(std::make_unique<::experimental::io::paging_data_source>( + pager_.get(), + ::experimental::io::paging_data_source::config{ + offset, std::min(max_len, length)}))); +} + +ss::future<> file::append(iobuf buf) { + vassert(pager_, "file has been closed"); + for (auto& io_frag : buf) { + co_await pager_->append(std::move(io_frag).release()); + } +} + +ss::future<> file::close() { + vassert(pager_, "file has been closed"); + auto pager = std::move(pager_); + // First stop IOs to the file. + co_await pager->close(); + pager.reset(); + + // Then close the file handle. + co_await file_->close(); +} + +file::file( + std::filesystem::path path, + file_t f, + std::unique_ptr<::experimental::io::pager> pager) + : path_(std::move(path)) + , file_(std::move(f)) + , pager_(std::move(pager)) {} + +file_manager::~file_manager() = default; + +ss::future> +file_manager::open_file(std::filesystem::path path) { + auto f = co_await storage_->open(path); + auto pager = std::make_unique<::experimental::io::pager>( + path, 0, storage_.get(), cache_.get(), scheduler_.get()); + co_return std::unique_ptr{ + new file(std::move(path), std::move(f), std::move(pager))}; +} + +ss::future> +file_manager::create_file(std::filesystem::path path) { + auto f = co_await storage_->create(path); + auto pager = std::make_unique<::experimental::io::pager>( + path, 0, storage_.get(), cache_.get(), scheduler_.get()); + co_return std::unique_ptr{ + new file(std::move(path), std::move(f), std::move(pager))}; +} + +ss::future<> file_manager::remove_file(std::filesystem::path path) { + co_return co_await ss::remove_file(path.string()); +} + +file_manager::file_manager( + size_t cache_size_bytes, + size_t small_queue_size_bytes, + size_t scheduler_num_files) + : storage_(std::make_unique<::experimental::io::disk_persistence>()) + , cache_(std::make_unique<::experimental::io::page_cache>( + ::experimental::io::page_cache::config{ + .cache_size = cache_size_bytes, .small_size = small_queue_size_bytes})) + , scheduler_( + std::make_unique<::experimental::io::scheduler>(scheduler_num_files)) {} + +} // namespace storage::experimental::mvlog diff --git a/src/v/storage/mvlog/file.h b/src/v/storage/mvlog/file.h new file mode 100644 index 0000000000000..c0ee6882da6ca --- /dev/null +++ b/src/v/storage/mvlog/file.h @@ -0,0 +1,109 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "base/seastarx.h" +#include "base/units.h" +#include "bytes/iobuf.h" +#include "io/persistence.h" + +#include +#include +#include +#include + +#include + +namespace experimental::io { +class page_cache; +class scheduler; +class pager; +} // namespace experimental::io + +namespace storage::experimental::mvlog { + +class file_manager; + +// Wraps a pager, managing IO to a given file. This object is expected to live +// for the duration of the process, or until the underlying file is removed. +class file { +public: + using file_t = ss::shared_ptr<::experimental::io::persistence::file>; + file(file&) = delete; + file(file&&) = delete; + file& operator=(file&) = delete; + file& operator=(file&&) = delete; + ~file(); + + // This file must outlive the returned streams. + ss::input_stream make_stream(uint64_t offset, uint64_t length); + + // Appends the given iobuf to the file. + ss::future<> append(iobuf); + + // TODO(awong): implement. + ss::future<> flush() { return ss::make_ready_future(); } + + // Stops remaing IO to the file and closes the handle. + ss::future<> close(); + + std::filesystem::path filepath() const { return path_; } + size_t size() const; + +private: + friend class file_manager; + file( + std::filesystem::path, + file_t, + std::unique_ptr<::experimental::io::pager>); + + std::filesystem::path path_; + file_t file_; + std::unique_ptr<::experimental::io::pager> pager_; +}; + +// Simple wrapper around paging infrastructure, meant as a stop-gap until the +// io:: public interface is more fleshed out. +// +// TODO: This class doesn't implement any kind of concurrency control or file +// descriptor management. Callers should be wary about concurrent access to a +// single file. +class file_manager { +public: + file_manager(file_manager&) = delete; + file_manager(file_manager&&) = delete; + file_manager& operator=(file_manager&) = delete; + file_manager& operator=(file_manager&&) = delete; + + // NOTE: defaults are for tests only and are not tuned for production. + explicit file_manager( + size_t cache_size_bytes = 2_MiB, + size_t small_queue_size_bytes = 1_MiB, + size_t scheduler_num_files = 100); + ~file_manager(); + + ss::future> create_file(std::filesystem::path); + ss::future> open_file(std::filesystem::path); + ss::future<> remove_file(std::filesystem::path); + +private: + friend class file; + + // Persistence interface, in charge of accessing underlying files (opening, + // appending, etc). + std::unique_ptr<::experimental::io::persistence> storage_; + + // Page cache to buffer appends and cache recent pages. + std::unique_ptr<::experimental::io::page_cache> cache_; + + // Scheduling policy used to dispatch IOs. + std::unique_ptr<::experimental::io::scheduler> scheduler_; +}; + +} // namespace storage::experimental::mvlog diff --git a/src/v/storage/mvlog/tests/CMakeLists.txt b/src/v/storage/mvlog/tests/CMakeLists.txt index 7925c0bed9ce6..fefb70908fbb9 100644 --- a/src/v/storage/mvlog/tests/CMakeLists.txt +++ b/src/v/storage/mvlog/tests/CMakeLists.txt @@ -7,6 +7,7 @@ rp_test( batch_collecting_stream_utils_test.cc batch_collector_test.cc entry_stream_utils_test.cc + file_test.cc segment_appender_test.cc segment_io_test.cc segment_reader_test.cc diff --git a/src/v/storage/mvlog/tests/file_test.cc b/src/v/storage/mvlog/tests/file_test.cc new file mode 100644 index 0000000000000..a88875540f659 --- /dev/null +++ b/src/v/storage/mvlog/tests/file_test.cc @@ -0,0 +1,219 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "bytes/random.h" +#include "container/fragmented_vector.h" +#include "storage/mvlog/file.h" +#include "test_utils/gtest_utils.h" +#include "test_utils/test.h" + +#include +#include +#include +#include + +#include + +#include +#include + +using namespace storage::experimental::mvlog; +using namespace experimental; + +class FileTest : public seastar_test { +public: + FileTest() + : base_dir_(get_test_directory()) + , file_mgr_(std::make_unique()) {} + + ss::future<> SetUpAsync() override { + co_await ss::recursive_touch_directory(base_dir_); + } + ss::future<> TearDownAsync() override { + for (auto& file : files_) { + co_await file->close(); + } + co_await ss::recursive_remove_directory( + std::filesystem::path{base_dir_}); + } + + std::filesystem::path test_path(ss::sstring component) { + return {fmt::format("{}/{}", base_dir_, component)}; + } + + // Creates the file and tracks for close. + ss::future tracked_create_file(std::filesystem::path path) { + auto fut = co_await ss::coroutine::as_future( + file_mgr_->create_file(path)); + if (fut.failed()) { + std::rethrow_exception(fut.get_exception()); + } + auto f = std::move(fut.get()); + auto* ret = f.get(); + files_.emplace_back(std::move(f)); + co_return ret; + } + + // Opens the file and tracks for close. + ss::future tracked_open_file(std::filesystem::path path) { + auto fut = co_await ss::coroutine::as_future( + file_mgr_->open_file(path)); + if (fut.failed()) { + std::rethrow_exception(fut.get_exception()); + } + auto f = std::move(fut.get()); + auto* ret = f.get(); + files_.emplace_back(std::move(f)); + co_return ret; + } + +protected: + const ss::sstring base_dir_; + std::unique_ptr file_mgr_; + chunked_vector> files_; +}; + +TEST_F_CORO(FileTest, TestCreate) { + const auto path = test_path("foo"); + auto created = co_await tracked_create_file(path); + ASSERT_EQ_CORO(created->filepath(), path); + + // Shouldn't be able to create the file if it already exists. + ASSERT_THROW_CORO( + co_await tracked_create_file(path), std::filesystem::filesystem_error); +} + +TEST_F_CORO(FileTest, TestOpen) { + const auto path = test_path("foo"); + ASSERT_THROW_CORO( + co_await tracked_open_file(path), std::filesystem::filesystem_error); + + auto created = co_await tracked_create_file(path); + auto opened = co_await tracked_open_file(path); + + // The file manager isn't smart enough to differentiate multiple handles to + // the same file. + ASSERT_NE_CORO(created, opened); + + // But they should indeed be pointing at the same file. + ASSERT_EQ_CORO(created->filepath(), path); + ASSERT_EQ_CORO(opened->filepath(), path); +} + +TEST_F_CORO(FileTest, TestRemove) { + const auto path = test_path("foo"); + auto created = co_await tracked_create_file(path); + ASSERT_EQ_CORO(created->filepath(), path); + + auto path_str = path.string(); + ASSERT_TRUE_CORO(co_await ss::file_exists(path_str)); + co_await file_mgr_->remove_file(path); + + ASSERT_FALSE_CORO(co_await ss::file_exists(path_str)); +} + +TEST_F_CORO(FileTest, TestAppend) { + const auto path = test_path("foo"); + auto created = co_await tracked_create_file(path); + + iobuf expected_buf; + for (int i = 0; i < 10; i++) { + auto buf = random_generators::make_iobuf(); + co_await created->append(buf.copy()); + expected_buf.append(std::move(buf)); + ASSERT_EQ_CORO(created->size(), expected_buf.size_bytes()); + + // Read the file and check that it matches what has been appended. + iobuf actual_stream_buf; + auto stream = created->make_stream(0, created->size()); + actual_stream_buf.append(co_await stream.read_exactly(created->size())); + + ASSERT_EQ_CORO(actual_stream_buf, expected_buf) << fmt::format( + "{}\nvs\n{}", + actual_stream_buf.hexdump(1024), + expected_buf.hexdump(1024)); + } +} + +TEST_F(FileTest, TestAppendAfterClose) { + const auto path = test_path("foo"); + auto created = file_mgr_->create_file(path).get(); + created->close().get(); + + auto buf = random_generators::make_iobuf(); + EXPECT_DEATH(created->append(buf.copy()).get(), "file has been closed"); +} + +TEST_F(FileTest, TestStreamAfterClose) { + const auto path = test_path("foo"); + auto created = file_mgr_->create_file(path).get(); + auto buf = random_generators::make_iobuf(); + created->append(buf.copy()).get(); + created->close().get(); + EXPECT_DEATH( + created->make_stream(0, created->size()), "file has been closed"); +} + +TEST_F_CORO(FileTest, TestReadStream) { + const auto path = test_path("foo"); + auto file = co_await tracked_create_file(path); + const auto num_appends = 10; + const auto append_len = 10; + const auto file_size = append_len * num_appends; + iobuf expected_full; + for (int i = 0; i < num_appends; i++) { + auto buf = random_generators::make_iobuf(append_len); + co_await file->append(buf.copy()); + expected_full.append(std::move(buf)); + } + ASSERT_EQ_CORO(file->size(), file_size); + + // Do an exhaustive sweep of ranges in the file. + for (int start = 0; start < file_size; start++) { + for (int end = start; end < file_size; end++) { + const auto len = end - start; + auto str = file->make_stream(start, len); + iobuf actual_stream_buf; + actual_stream_buf.append(co_await str.read_exactly(len)); + ASSERT_EQ_CORO(actual_stream_buf.size_bytes(), len); + + auto expected_buf = expected_full.share(start, len); + ASSERT_EQ_CORO(actual_stream_buf, expected_buf) << fmt::format( + "{}\nvs\n{}", + actual_stream_buf.hexdump(1024), + expected_buf.hexdump(1024)); + } + } +} + +TEST_F_CORO(FileTest, TestReadStreamNearEnd) { + const auto path = test_path("foo"); + auto file = co_await tracked_create_file(path); + const auto file_size = 128; + auto buf = random_generators::make_iobuf(file_size); + co_await file->append(buf.copy()); + ASSERT_EQ_CORO(file->size(), file_size); + + for (int i = 0; i < 5; i++) { + // All reads starting past the end should be empty. + auto past_end_str = file->make_stream(file_size, i); + auto read_buf = co_await past_end_str.read(); + ASSERT_TRUE_CORO(read_buf.empty()); + } + + for (int i = 0; i < 5; i++) { + // Reads ending past the end should be truncated. + auto len_past_end = i + 10; + auto past_end_str = file->make_stream(file_size - i, len_past_end); + auto read_buf = co_await past_end_str.read_exactly(len_past_end); + + // The resulting read should only read to the end of what is appended. + ASSERT_EQ_CORO(read_buf.size(), i); + } +}