Skip to content

Commit

Permalink
Merge pull request #18457 from andrwng/io-mvlog-file
Browse files Browse the repository at this point in the history
mvlog: wrap pager with file abstraction
  • Loading branch information
andrwng authored May 14, 2024
2 parents c714906 + b32e65d commit 209f78d
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/storage/mvlog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ v_cc_library(
entry.cc
entry_stream.cc
entry_stream_utils.cc
file.cc
logger.cc
readable_segment.cc
segment_appender.cc
Expand Down
97 changes: 97 additions & 0 deletions src/v/storage/mvlog/file.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#include "storage/mvlog/file.h"

#include "base/units.h"
#include "io/page_cache.h"
#include "io/pager.h"
#include "io/paging_data_source.h"
#include "io/persistence.h"
#include "io/scheduler.h"

namespace storage::experimental::mvlog {
file::~file() = default;

size_t file::size() const {
vassert(pager_, "file has been closed");
return pager_->size();
}

ss::input_stream<char> file::make_stream(uint64_t offset, uint64_t length) {
vassert(pager_, "file has been closed");
auto max_len = size() - offset;
return ss::input_stream<char>(
ss::data_source(std::make_unique<::experimental::io::paging_data_source>(
pager_.get(),
::experimental::io::paging_data_source::config{
offset, std::min(max_len, length)})));
}

ss::future<> file::append(iobuf buf) {
vassert(pager_, "file has been closed");
for (auto& io_frag : buf) {
co_await pager_->append(std::move(io_frag).release());
}
}

ss::future<> file::close() {
vassert(pager_, "file has been closed");
auto pager = std::move(pager_);
// First stop IOs to the file.
co_await pager->close();
pager.reset();

// Then close the file handle.
co_await file_->close();
}

file::file(
std::filesystem::path path,
file_t f,
std::unique_ptr<::experimental::io::pager> pager)
: path_(std::move(path))
, file_(std::move(f))
, pager_(std::move(pager)) {}

file_manager::~file_manager() = default;

ss::future<std::unique_ptr<file>>
file_manager::open_file(std::filesystem::path path) {
auto f = co_await storage_->open(path);
auto pager = std::make_unique<::experimental::io::pager>(
path, 0, storage_.get(), cache_.get(), scheduler_.get());
co_return std::unique_ptr<file>{
new file(std::move(path), std::move(f), std::move(pager))};
}

ss::future<std::unique_ptr<file>>
file_manager::create_file(std::filesystem::path path) {
auto f = co_await storage_->create(path);
auto pager = std::make_unique<::experimental::io::pager>(
path, 0, storage_.get(), cache_.get(), scheduler_.get());
co_return std::unique_ptr<file>{
new file(std::move(path), std::move(f), std::move(pager))};
}

ss::future<> file_manager::remove_file(std::filesystem::path path) {
co_return co_await ss::remove_file(path.string());
}

file_manager::file_manager(
size_t cache_size_bytes,
size_t small_queue_size_bytes,
size_t scheduler_num_files)
: storage_(std::make_unique<::experimental::io::disk_persistence>())
, cache_(std::make_unique<::experimental::io::page_cache>(
::experimental::io::page_cache::config{
.cache_size = cache_size_bytes, .small_size = small_queue_size_bytes}))
, scheduler_(
std::make_unique<::experimental::io::scheduler>(scheduler_num_files)) {}

} // namespace storage::experimental::mvlog
109 changes: 109 additions & 0 deletions src/v/storage/mvlog/file.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
#pragma once

#include "base/seastarx.h"
#include "base/units.h"
#include "bytes/iobuf.h"
#include "io/persistence.h"

#include <seastar/core/file.hh>
#include <seastar/core/future.hh>
#include <seastar/core/iostream.hh>
#include <seastar/core/seastar.hh>

#include <memory>

namespace experimental::io {
class page_cache;
class scheduler;
class pager;
} // namespace experimental::io

namespace storage::experimental::mvlog {

class file_manager;

// Wraps a pager, managing IO to a given file. This object is expected to live
// for the duration of the process, or until the underlying file is removed.
class file {
public:
using file_t = ss::shared_ptr<::experimental::io::persistence::file>;
file(file&) = delete;
file(file&&) = delete;
file& operator=(file&) = delete;
file& operator=(file&&) = delete;
~file();

// This file must outlive the returned streams.
ss::input_stream<char> make_stream(uint64_t offset, uint64_t length);

// Appends the given iobuf to the file.
ss::future<> append(iobuf);

// TODO(awong): implement.
ss::future<> flush() { return ss::make_ready_future(); }

// Stops remaing IO to the file and closes the handle.
ss::future<> close();

std::filesystem::path filepath() const { return path_; }
size_t size() const;

private:
friend class file_manager;
file(
std::filesystem::path,
file_t,
std::unique_ptr<::experimental::io::pager>);

std::filesystem::path path_;
file_t file_;
std::unique_ptr<::experimental::io::pager> pager_;
};

// Simple wrapper around paging infrastructure, meant as a stop-gap until the
// io:: public interface is more fleshed out.
//
// TODO: This class doesn't implement any kind of concurrency control or file
// descriptor management. Callers should be wary about concurrent access to a
// single file.
class file_manager {
public:
file_manager(file_manager&) = delete;
file_manager(file_manager&&) = delete;
file_manager& operator=(file_manager&) = delete;
file_manager& operator=(file_manager&&) = delete;

// NOTE: defaults are for tests only and are not tuned for production.
explicit file_manager(
size_t cache_size_bytes = 2_MiB,
size_t small_queue_size_bytes = 1_MiB,
size_t scheduler_num_files = 100);
~file_manager();

ss::future<std::unique_ptr<file>> create_file(std::filesystem::path);
ss::future<std::unique_ptr<file>> open_file(std::filesystem::path);
ss::future<> remove_file(std::filesystem::path);

private:
friend class file;

// Persistence interface, in charge of accessing underlying files (opening,
// appending, etc).
std::unique_ptr<::experimental::io::persistence> storage_;

// Page cache to buffer appends and cache recent pages.
std::unique_ptr<::experimental::io::page_cache> cache_;

// Scheduling policy used to dispatch IOs.
std::unique_ptr<::experimental::io::scheduler> scheduler_;
};

} // namespace storage::experimental::mvlog
1 change: 1 addition & 0 deletions src/v/storage/mvlog/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rp_test(
batch_collecting_stream_utils_test.cc
batch_collector_test.cc
entry_stream_utils_test.cc
file_test.cc
segment_appender_test.cc
segment_io_test.cc
segment_reader_test.cc
Expand Down
Loading

0 comments on commit 209f78d

Please sign in to comment.