-
Notifications
You must be signed in to change notification settings - Fork 592
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
mvlog: wrap pager with file abstraction
Adds a barebones abstraction around the io:: paging abstractions that looks more like a file, with the goal in mind to abstract away the details of the underlying io:: subsystem. For now, this file abstraction supports appends, reads, and various file lifecycle operations, which is enough to get started using it in development of the new local log.
- Loading branch information
Showing
5 changed files
with
427 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
#include "storage/mvlog/file.h" | ||
|
||
#include "base/units.h" | ||
#include "io/page_cache.h" | ||
#include "io/pager.h" | ||
#include "io/paging_data_source.h" | ||
#include "io/persistence.h" | ||
#include "io/scheduler.h" | ||
|
||
namespace storage::experimental::mvlog { | ||
file::~file() = default; | ||
|
||
size_t file::size() const { | ||
vassert(pager_, "file has been closed"); | ||
return pager_->size(); | ||
} | ||
|
||
ss::input_stream<char> file::make_stream(uint64_t offset, uint64_t length) { | ||
vassert(pager_, "file has been closed"); | ||
auto max_len = size() - offset; | ||
return ss::input_stream<char>( | ||
ss::data_source(std::make_unique<::experimental::io::paging_data_source>( | ||
pager_.get(), | ||
::experimental::io::paging_data_source::config{ | ||
offset, std::min(max_len, length)}))); | ||
} | ||
|
||
ss::future<> file::append(iobuf buf) { | ||
vassert(pager_, "file has been closed"); | ||
for (auto& io_frag : buf) { | ||
co_await pager_->append(std::move(io_frag).release()); | ||
} | ||
} | ||
|
||
ss::future<> file::close() { | ||
vassert(pager_, "file has been closed"); | ||
auto pager = std::move(pager_); | ||
// First stop IOs to the file. | ||
co_await pager->close(); | ||
pager.reset(); | ||
|
||
// Then close the file handle. | ||
co_await file_->close(); | ||
} | ||
|
||
file::file( | ||
std::filesystem::path path, | ||
file_t f, | ||
std::unique_ptr<::experimental::io::pager> pager) | ||
: path_(std::move(path)) | ||
, file_(std::move(f)) | ||
, pager_(std::move(pager)) {} | ||
|
||
file_manager::~file_manager() = default; | ||
|
||
ss::future<std::unique_ptr<file>> | ||
file_manager::open_file(std::filesystem::path path) { | ||
auto f = co_await storage_->open(path); | ||
auto pager = std::make_unique<::experimental::io::pager>( | ||
path, 0, storage_.get(), cache_.get(), scheduler_.get()); | ||
co_return std::unique_ptr<file>{ | ||
new file(std::move(path), std::move(f), std::move(pager))}; | ||
} | ||
|
||
ss::future<std::unique_ptr<file>> | ||
file_manager::create_file(std::filesystem::path path) { | ||
auto f = co_await storage_->create(path); | ||
auto pager = std::make_unique<::experimental::io::pager>( | ||
path, 0, storage_.get(), cache_.get(), scheduler_.get()); | ||
co_return std::unique_ptr<file>{ | ||
new file(std::move(path), std::move(f), std::move(pager))}; | ||
} | ||
|
||
ss::future<> file_manager::remove_file(std::filesystem::path path) { | ||
co_return co_await ss::remove_file(path.string()); | ||
} | ||
|
||
file_manager::file_manager( | ||
size_t cache_size_bytes, | ||
size_t small_queue_size_bytes, | ||
size_t scheduler_num_files) | ||
: storage_(std::make_unique<::experimental::io::disk_persistence>()) | ||
, cache_(std::make_unique<::experimental::io::page_cache>( | ||
::experimental::io::page_cache::config{ | ||
.cache_size = cache_size_bytes, .small_size = small_queue_size_bytes})) | ||
, scheduler_( | ||
std::make_unique<::experimental::io::scheduler>(scheduler_num_files)) {} | ||
|
||
} // namespace storage::experimental::mvlog |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// Copyright 2024 Redpanda Data, Inc. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.md | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0 | ||
#pragma once | ||
|
||
#include "base/seastarx.h" | ||
#include "base/units.h" | ||
#include "bytes/iobuf.h" | ||
#include "io/persistence.h" | ||
|
||
#include <seastar/core/file.hh> | ||
#include <seastar/core/future.hh> | ||
#include <seastar/core/iostream.hh> | ||
#include <seastar/core/seastar.hh> | ||
|
||
#include <memory> | ||
|
||
namespace experimental::io { | ||
class page_cache; | ||
class scheduler; | ||
class pager; | ||
} // namespace experimental::io | ||
|
||
namespace storage::experimental::mvlog { | ||
|
||
class file_manager; | ||
|
||
// Wraps a pager, managing IO to a given file. This object is expected to live | ||
// for the duration of the process, or until the underlying file is removed. | ||
class file { | ||
public: | ||
using file_t = ss::shared_ptr<::experimental::io::persistence::file>; | ||
file(file&) = delete; | ||
file(file&&) = delete; | ||
file& operator=(file&) = delete; | ||
file& operator=(file&&) = delete; | ||
~file(); | ||
|
||
// The resulting stream must outlive this file. | ||
ss::input_stream<char> make_stream(uint64_t offset, uint64_t length); | ||
|
||
// Appends the given iobuf to the file. | ||
ss::future<> append(iobuf); | ||
|
||
// TODO(awong): implement. | ||
ss::future<> flush() { return ss::make_ready_future(); } | ||
|
||
// Stops remaing IO to the file and closes the handle. | ||
ss::future<> close(); | ||
|
||
std::filesystem::path filepath() const { return path_; } | ||
size_t size() const; | ||
|
||
private: | ||
friend class file_manager; | ||
file( | ||
std::filesystem::path, | ||
file_t, | ||
std::unique_ptr<::experimental::io::pager>); | ||
|
||
std::filesystem::path path_; | ||
file_t file_; | ||
std::unique_ptr<::experimental::io::pager> pager_; | ||
}; | ||
|
||
// Simple wrapper around paging infrastructure, meant as a stop-gap until the | ||
// io:: public interface is more fleshed out. | ||
// | ||
// TODO: This class doesn't implement any kind of concurrency control or file | ||
// descriptor management. Callers should be wary about concurrent access to a | ||
// single file. | ||
class file_manager { | ||
public: | ||
file_manager(file_manager&) = delete; | ||
file_manager(file_manager&&) = delete; | ||
file_manager& operator=(file_manager&) = delete; | ||
file_manager& operator=(file_manager&&) = delete; | ||
|
||
// NOTE: defaults are for tests only and are not tuned for production. | ||
explicit file_manager( | ||
size_t cache_size_bytes = 2_MiB, | ||
size_t small_queue_size_bytes = 1_MiB, | ||
size_t scheduler_num_files = 100); | ||
~file_manager(); | ||
|
||
ss::future<std::unique_ptr<file>> create_file(std::filesystem::path); | ||
ss::future<std::unique_ptr<file>> open_file(std::filesystem::path); | ||
ss::future<> remove_file(std::filesystem::path); | ||
|
||
private: | ||
friend class file; | ||
|
||
// Persistence interface, in charge of accessing underlying files (opening, | ||
// appending, etc). | ||
std::unique_ptr<::experimental::io::persistence> storage_; | ||
|
||
// Page cache to buffer appends and cache recent pages. | ||
std::unique_ptr<::experimental::io::page_cache> cache_; | ||
|
||
// Scheduling policy used to dispatch IOs. | ||
std::unique_ptr<::experimental::io::scheduler> scheduler_; | ||
}; | ||
|
||
} // namespace storage::experimental::mvlog |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.