Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mvlog: wrap pager with file abstraction #18457

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/storage/mvlog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ v_cc_library(
entry.cc
entry_stream.cc
entry_stream_utils.cc
file.cc
logger.cc
readable_segment.cc
segment_appender.cc
Expand Down
97 changes: 97 additions & 0 deletions src/v/storage/mvlog/file.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#include "storage/mvlog/file.h"

#include "base/units.h"
#include "io/page_cache.h"
#include "io/pager.h"
#include "io/paging_data_source.h"
#include "io/persistence.h"
#include "io/scheduler.h"

namespace storage::experimental::mvlog {
file::~file() = default;

size_t file::size() const {
vassert(pager_, "file has been closed");
return pager_->size();
}

ss::input_stream<char> file::make_stream(uint64_t offset, uint64_t length) {
vassert(pager_, "file has been closed");
auto max_len = size() - offset;
return ss::input_stream<char>(
ss::data_source(std::make_unique<::experimental::io::paging_data_source>(
pager_.get(),
::experimental::io::paging_data_source::config{
offset, std::min(max_len, length)})));
}

ss::future<> file::append(iobuf buf) {
vassert(pager_, "file has been closed");
for (auto& io_frag : buf) {
co_await pager_->append(std::move(io_frag).release());
}
}

ss::future<> file::close() {
vassert(pager_, "file has been closed");
auto pager = std::move(pager_);
// First stop IOs to the file.
co_await pager->close();
pager.reset();

// Then close the file handle.
co_await file_->close();
}

file::file(
std::filesystem::path path,
file_t f,
std::unique_ptr<::experimental::io::pager> pager)
: path_(std::move(path))
, file_(std::move(f))
, pager_(std::move(pager)) {}

file_manager::~file_manager() = default;

ss::future<std::unique_ptr<file>>
file_manager::open_file(std::filesystem::path path) {
auto f = co_await storage_->open(path);
auto pager = std::make_unique<::experimental::io::pager>(
path, 0, storage_.get(), cache_.get(), scheduler_.get());
co_return std::unique_ptr<file>{
new file(std::move(path), std::move(f), std::move(pager))};
}

ss::future<std::unique_ptr<file>>
file_manager::create_file(std::filesystem::path path) {
auto f = co_await storage_->create(path);
auto pager = std::make_unique<::experimental::io::pager>(
path, 0, storage_.get(), cache_.get(), scheduler_.get());
co_return std::unique_ptr<file>{
new file(std::move(path), std::move(f), std::move(pager))};
}

ss::future<> file_manager::remove_file(std::filesystem::path path) {
co_return co_await ss::remove_file(path.string());
}

file_manager::file_manager(
size_t cache_size_bytes,
size_t small_queue_size_bytes,
size_t scheduler_num_files)
: storage_(std::make_unique<::experimental::io::disk_persistence>())
, cache_(std::make_unique<::experimental::io::page_cache>(
::experimental::io::page_cache::config{
.cache_size = cache_size_bytes, .small_size = small_queue_size_bytes}))
, scheduler_(
std::make_unique<::experimental::io::scheduler>(scheduler_num_files)) {}

} // namespace storage::experimental::mvlog
109 changes: 109 additions & 0 deletions src/v/storage/mvlog/file.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "base/seastarx.h"
#include "base/units.h"
#include "bytes/iobuf.h"
#include "io/persistence.h"

#include <seastar/core/file.hh>
#include <seastar/core/future.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/seastar.hh>

#include <memory>

namespace experimental::io {
class page_cache;
class scheduler;
class pager;
} // namespace experimental::io

namespace storage::experimental::mvlog {

class file_manager;

// Wraps a pager, managing IO to a given file. This object is expected to live
// for the duration of the process, or until the underlying file is removed.
class file {
public:
using file_t = ss::shared_ptr<::experimental::io::persistence::file>;
file(file&) = delete;
file(file&&) = delete;
file& operator=(file&) = delete;
file& operator=(file&&) = delete;
~file();

// This file must outlive the returned streams.
ss::input_stream<char> make_stream(uint64_t offset, uint64_t length);

// Appends the given iobuf to the file.
ss::future<> append(iobuf);

// TODO(awong): implement.
ss::future<> flush() { return ss::make_ready_future(); }

// Stops remaing IO to the file and closes the handle.
ss::future<> close();

std::filesystem::path filepath() const { return path_; }
size_t size() const;

private:
friend class file_manager;
file(
std::filesystem::path,
file_t,
std::unique_ptr<::experimental::io::pager>);

std::filesystem::path path_;
file_t file_;
std::unique_ptr<::experimental::io::pager> pager_;
};

// Simple wrapper around paging infrastructure, meant as a stop-gap until the
// io:: public interface is more fleshed out.
//
// TODO: This class doesn't implement any kind of concurrency control or file
// descriptor management. Callers should be wary about concurrent access to a
// single file.
class file_manager {
public:
file_manager(file_manager&) = delete;
file_manager(file_manager&&) = delete;
file_manager& operator=(file_manager&) = delete;
file_manager& operator=(file_manager&&) = delete;

// NOTE: defaults are for tests only and are not tuned for production.
explicit file_manager(
size_t cache_size_bytes = 2_MiB,
size_t small_queue_size_bytes = 1_MiB,
size_t scheduler_num_files = 100);
~file_manager();

ss::future<std::unique_ptr<file>> create_file(std::filesystem::path);
ss::future<std::unique_ptr<file>> open_file(std::filesystem::path);
ss::future<> remove_file(std::filesystem::path);

private:
friend class file;

// Persistence interface, in charge of accessing underlying files (opening,
// appending, etc).
std::unique_ptr<::experimental::io::persistence> storage_;

// Page cache to buffer appends and cache recent pages.
std::unique_ptr<::experimental::io::page_cache> cache_;

// Scheduling policy used to dispatch IOs.
std::unique_ptr<::experimental::io::scheduler> scheduler_;
};

} // namespace storage::experimental::mvlog
1 change: 1 addition & 0 deletions src/v/storage/mvlog/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rp_test(
batch_collecting_stream_utils_test.cc
batch_collector_test.cc
entry_stream_utils_test.cc
file_test.cc
segment_appender_test.cc
segment_io_test.cc
segment_reader_test.cc
Expand Down
Loading
Loading