diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index df40cf6cee3b..73f187600b58 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -49,7 +49,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc cluster/cluster_family.cc cluster/cluster_slot_migration.cc cluster/cluster_shard_migration.cc cluster/outgoing_slot_migration.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc - acl/validator.cc acl/helpers.cc) + acl/validator.cc acl/helpers.cc + tiering/disk_storage.cc) if (DF_ENABLE_MEMORY_TRACKING) target_compile_definitions(dragonfly_lib PRIVATE DFLY_ENABLE_MEMORY_TRACKING) @@ -102,6 +103,7 @@ cxx_test(acl/user_registry_test dfly_test_lib LABELS DFLY) cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY) cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY) cxx_test(search/search_family_test dfly_test_lib LABELS DFLY) +cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY) if (WITH_ASAN OR WITH_USAN) target_compile_definitions(stream_family_test PRIVATE SANITIZERS) target_compile_definitions(multi_test PRIVATE SANITIZERS) diff --git a/src/server/io_mgr.cc b/src/server/io_mgr.cc index e40a1871489b..c4e7a1dce804 100644 --- a/src/server/io_mgr.cc +++ b/src/server/io_mgr.cc @@ -33,13 +33,9 @@ constexpr inline size_t alignup(size_t num, size_t align) { } // namespace -IoMgr::IoMgr() { - flags_val = 0; -} - constexpr size_t kInitialSize = 1UL << 28; // 256MB -error_code IoMgr::Open(const string& path) { +error_code IoMgr::Open(std::string_view path) { CHECK(!backing_file_); int kFlags = O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC; @@ -71,10 +67,30 @@ error_code IoMgr::Open(const string& path) { return error_code{}; } +error_code IoMgr::Grow(size_t len) { + Proactor* proactor = (Proactor*)ProactorBase::me(); + + if (exchange(grow_progress_, true)) + return make_error_code(errc::operation_in_progress); + + fb2::FiberCall fc(proactor); + fc->PrepFallocate(backing_file_->fd(), 0, sz_, len); + Proactor::IoResult res = fc.Get(); + + grow_progress_ = false; + + if (res == 0) { + sz_ += len; + return {}; + } else { + return std::error_code(-res, std::iostream_category()); + } +} + error_code IoMgr::GrowAsync(size_t len, GrowCb cb) { DCHECK_EQ(0u, len % (1 << 20)); - if (flags.grow_progress) { + if (exchange(grow_progress_, true)) { return make_error_code(errc::operation_in_progress); } @@ -82,14 +98,13 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) { SubmitEntry entry = proactor->GetSubmitEntry( [this, len, cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t) { - this->flags.grow_progress = 0; + this->grow_progress_ = false; sz_ += (res == 0 ? len : 0); cb(res); }, 0); entry.PrepFallocate(backing_file_->fd(), 0, sz_, len); - flags.grow_progress = 1; return error_code{}; } @@ -146,8 +161,22 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) { return ec; } +std::error_code IoMgr::ReadAsync(size_t offset, absl::Span buffer, ReadCb cb) { + DCHECK(!buffer.empty()); + VLOG(1) << "Read " << offset << "/" << buffer.size(); + + Proactor* proactor = (Proactor*)ProactorBase::me(); + + auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); }; + + SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0); + se.PrepRead(backing_file_->fd(), buffer.data(), buffer.size(), offset); + + return error_code{}; +} + void IoMgr::Shutdown() { - while (flags_val) { + while (grow_progress_) { ThisFiber::SleepFor(200us); // TODO: hacky for now. } } diff --git a/src/server/io_mgr.h b/src/server/io_mgr.h index 41fca04f693f..24965973f493 100644 --- a/src/server/io_mgr.h +++ b/src/server/io_mgr.h @@ -22,12 +22,15 @@ class IoMgr { // (io_res, ) using GrowCb = std::function; - IoMgr(); + using ReadCb = std::function; // blocks until all the pending requests are finished. void Shutdown(); - std::error_code Open(const std::string& path); + std::error_code Open(std::string_view path); + + // Try growing file by that length. Return error if growth failed. + std::error_code Grow(size_t len); // Grows file by that length. len must be divided by 1MB. // passing other values will check-fail. @@ -36,15 +39,20 @@ class IoMgr { // Returns error if submission failed. Otherwise - returns the io result // via cb. A caller must make sure that the blob exists until cb is called. std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb); + + // Read synchronously into dest std::error_code Read(size_t offset, io::MutableBytes dest); + // Read into dest and call cb once read + std::error_code ReadAsync(size_t offset, io::MutableBytes dest, ReadCb cb); + // Total file span size_t Span() const { return sz_; } bool grow_pending() const { - return flags.grow_progress; + return grow_progress_; } const IoMgrStats& GetStats() const { @@ -55,13 +63,7 @@ class IoMgr { std::unique_ptr backing_file_; size_t sz_ = 0; - union { - uint8_t flags_val; - struct { - uint8_t grow_progress : 1; - } flags; - }; - + bool grow_progress_ = false; IoMgrStats stats_; }; diff --git a/src/server/tiering/disk_storage.cc b/src/server/tiering/disk_storage.cc new file mode 100644 index 000000000000..2d48609964d9 --- /dev/null +++ b/src/server/tiering/disk_storage.cc @@ -0,0 +1,73 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/tiering/disk_storage.h" + +#include "base/io_buf.h" +#include "base/logging.h" +#include "server/error.h" + +namespace dfly::tiering { + +std::error_code DiskStorage::Open(std::string_view path) { + RETURN_ON_ERR(io_mgr_.Open(path)); + alloc_.AddStorage(0, io_mgr_.Span()); + return {}; +} + +void DiskStorage::Close() { + io_mgr_.Shutdown(); +} + +void DiskStorage::Read(DiskSegment segment, ReadCb cb) { + DCHECK_GT(segment.length, 0u); + DCHECK_EQ(segment.offset % 4096, 0u); + + // TODO: use registered buffers (UringProactor::RegisterBuffers) + uint8_t* buf = new uint8_t[segment.length]; + auto io_cb = [cb, buf, segment](int res) { + cb(std::string_view{reinterpret_cast(buf), segment.length}); + delete buf; // because std::function needs to be copyable, unique_ptr can't be used + }; + io_mgr_.ReadAsync(segment.offset, {buf, segment.length}, std::move(io_cb)); +} + +void DiskStorage::MarkAsFree(DiskSegment segment) { + DCHECK_GT(segment.length, 0u); + DCHECK_EQ(segment.offset % 4096, 0u); + + alloc_.Free(segment.offset, segment.length); +} + +std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) { + DCHECK_GT(bytes.length(), 0u); + + int64_t offset = alloc_.Malloc(bytes.size()); + + // If we've run out of space, block and grow as much as needed + if (offset < 0) { + size_t start = io_mgr_.Span(); + size_t grow_size = -offset; + RETURN_ON_ERR(io_mgr_.Grow(grow_size)); + + alloc_.AddStorage(start, grow_size); + offset = alloc_.Malloc(bytes.size()); + + if (offset < 0) // we can't fit it even after resizing + return std::make_error_code(std::errc::file_too_large); + } + + auto io_cb = [this, cb, offset, bytes, len = bytes.size()](int io_res) { + if (io_res < 0) { + MarkAsFree({size_t(offset), len}); + cb({}); + } else { + cb({size_t(offset), len}); + } + }; + + return io_mgr_.WriteAsync(offset, io::View(bytes), std::move(io_cb)); +} + +} // namespace dfly::tiering diff --git a/src/server/tiering/disk_storage.h b/src/server/tiering/disk_storage.h new file mode 100644 index 000000000000..185533af1562 --- /dev/null +++ b/src/server/tiering/disk_storage.h @@ -0,0 +1,46 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include + +#include "core/external_alloc.h" +#include "io/io.h" +#include "server/io_mgr.h" + +namespace dfly::tiering { + +struct DiskSegment { + // Mesured in bytes, offset should be aligned to page boundaries (4kb) + size_t offset, length; +}; + +// Disk storage controlled by asynchronous operations. +class DiskStorage { + public: + using ReadCb = std::function; + using StashCb = std::function; + + std::error_code Open(std::string_view path); + void Close(); + + // Request read for segment, cb will be called on completion with read value + void Read(DiskSegment segment, ReadCb cb); + + // Mark segment as free, performed immediately + void MarkAsFree(DiskSegment segment); + + // Request bytes to be stored, cb will be called with assigned segment on completion. Can block to + // grow backing file. Returns error code if operation failed immediately (most likely it failed + // to grow the backing file) or passes an empty segment if the final write operation failed. + std::error_code Stash(io::Bytes bytes, StashCb cb); + + private: + IoMgr io_mgr_; + ExternalAllocator alloc_; +}; + +}; // namespace dfly::tiering diff --git a/src/server/tiering/disk_storage_test.cc b/src/server/tiering/disk_storage_test.cc new file mode 100644 index 000000000000..2d404eb48ef0 --- /dev/null +++ b/src/server/tiering/disk_storage_test.cc @@ -0,0 +1,132 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/tiering/disk_storage.h" + +#include + +#include "base/gtest.h" +#include "base/logging.h" +#include "util/fibers/fibers.h" +#include "util/fibers/pool.h" + +namespace dfly::tiering { + +using namespace std; +using namespace std::string_literals; + +class PoolTestBase : public testing::Test { + protected: + static void SetUpTestSuite(); + static void TearDownTestSuite(); + + static unique_ptr pp_; +}; + +unique_ptr PoolTestBase::pp_ = nullptr; + +void PoolTestBase::SetUpTestSuite() { + pp_.reset(util::fb2::Pool::IOUring(16, 2)); + pp_->Run(); +} + +void PoolTestBase::TearDownTestSuite() { + pp_->Stop(); + pp_.reset(); +} + +struct DiskStorageTest : public PoolTestBase { + ~DiskStorageTest() { + EXPECT_EQ(pending_ops_, 0); + } + + void Open() { + storage_ = make_unique(); + storage_->Open("disk_storage_test_backing"); + } + + void Close() { + storage_->Close(); + storage_.reset(); + unlink("disk_storage_test_backing"); + } + + void Stash(size_t index, string value) { + pending_ops_++; + auto buf = make_shared(value); + storage_->Stash(io::Buffer(*buf), [this, index, buf](DiskSegment segment) { + segments_[index] = segment; + pending_ops_--; + }); + } + + void Read(size_t index) { + pending_ops_++; + storage_->Read(segments_[index], [this, index](string_view value) { + last_reads_[index] = value; + pending_ops_--; + }); + } + + void Delete(size_t index) { + storage_->MarkAsFree(segments_[index]); + segments_.erase(index); + last_reads_.erase(index); + } + + void Wait() { + while (pending_ops_ > 0) { + ::util::ThisFiber::SleepFor(1ms); + } + } + + protected: + int pending_ops_ = 0; + + std::unordered_map last_reads_; + std::unordered_map segments_; + std::unique_ptr storage_; +}; + +TEST_F(DiskStorageTest, Basic) { + pp_->at(0)->Await([this] { + // Write 100 values + Open(); + for (size_t i = 0; i < 100; i++) + Stash(i, absl::StrCat("value", i)); + Wait(); + EXPECT_EQ(segments_.size(), 100); + + // Read all 100 values + for (size_t i = 0; i < 100; i++) + Read(i); + Wait(); + + // Expect them to be equal to written + for (size_t i = 0; i < 100; i++) + EXPECT_EQ(last_reads_[i], absl::StrCat("value", i)); + + Close(); + }); +} + +TEST_F(DiskStorageTest, ReUse) { + pp_->at(0)->Await([this] { + Open(); + + Stash(0, "value1"); + Wait(); + EXPECT_EQ(segments_[0].offset, 0u); + + Delete(0); + + Stash(1, "value2"); + Wait(); + EXPECT_EQ(segments_[1].offset, 0u); + + Close(); + }); +} + +} // namespace dfly::tiering