Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tiering): DiskStorage #2770

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
cluster/cluster_family.cc cluster/cluster_slot_migration.cc
cluster/cluster_shard_migration.cc cluster/outgoing_slot_migration.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)
acl/validator.cc acl/helpers.cc
tiering/disk_storage.cc)

if (DF_ENABLE_MEMORY_TRACKING)
target_compile_definitions(dragonfly_lib PRIVATE DFLY_ENABLE_MEMORY_TRACKING)
Expand Down Expand Up @@ -102,6 +103,7 @@ cxx_test(acl/user_registry_test dfly_test_lib LABELS DFLY)
cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY)
cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY)
cxx_test(search/search_family_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY)
if (WITH_ASAN OR WITH_USAN)
target_compile_definitions(stream_family_test PRIVATE SANITIZERS)
target_compile_definitions(multi_test PRIVATE SANITIZERS)
Expand Down
36 changes: 35 additions & 1 deletion src/server/io_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ IoMgr::IoMgr() {

constexpr size_t kInitialSize = 1UL << 28; // 256MB

error_code IoMgr::Open(const string& path) {
error_code IoMgr::Open(std::string_view path) {
CHECK(!backing_file_);

int kFlags = O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC;
Expand Down Expand Up @@ -71,6 +71,26 @@ error_code IoMgr::Open(const string& path) {
return error_code{};
}

error_code IoMgr::Grow(size_t len) {
Proactor* proactor = (Proactor*)ProactorBase::me();

DCHECK_EQ(flags.grow_progress, false);
flags.grow_progress = true;

fb2::FiberCall fc(proactor);
fc->PrepFallocate(backing_file_->fd(), 0, sz_, len);
Proactor::IoResult res = fc.Get();

flags.grow_progress = false;

if (res == 0) {
sz_ += len;
return {};
} else {
return std::error_code(res, std::iostream_category());
}
}

error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
DCHECK_EQ(0u, len % (1 << 20));

Expand Down Expand Up @@ -146,6 +166,20 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
return ec;
}

std::error_code IoMgr::ReadAsync(size_t offset, absl::Span<uint8_t> buffer, ReadCb cb) {
DCHECK(!buffer.empty());
VLOG(1) << "Read " << offset << "/" << buffer.size();

Proactor* proactor = (Proactor*)ProactorBase::me();

auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); };

SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0);
se.PrepRead(backing_file_->fd(), buffer.data(), buffer.size(), offset);

return error_code{};
}

void IoMgr::Shutdown() {
while (flags_val) {
ThisFiber::SleepFor(200us); // TODO: hacky for now.
Expand Down
8 changes: 7 additions & 1 deletion src/server/io_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ class IoMgr {
// (io_res, )
using GrowCb = std::function<void(int)>;

using ReadCb = std::function<void(int)>;

IoMgr();

// blocks until all the pending requests are finished.
void Shutdown();

std::error_code Open(const std::string& path);
std::error_code Open(std::string_view path);

std::error_code Grow(size_t len);
romange marked this conversation as resolved.
Show resolved Hide resolved

// Grows file by that length. len must be divided by 1MB.
// passing other values will check-fail.
Expand All @@ -38,6 +42,8 @@ class IoMgr {
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);
std::error_code Read(size_t offset, io::MutableBytes dest);

std::error_code ReadAsync(size_t offset, io::MutableBytes dest, ReadCb cb);

// Total file span
size_t Span() const {
return sz_;
Expand Down
62 changes: 62 additions & 0 deletions src/server/tiering/disk_storage.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "server/tiering/disk_storage.h"
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

#include "base/io_buf.h"
#include "base/logging.h"
#include "server/error.h"

namespace dfly::tiering {

std::error_code DiskStorage::Open(std::string_view path) {
RETURN_ON_ERR(io_mgr_.Open(path));
alloc_.AddStorage(0, io_mgr_.Span());
return {};
}

void DiskStorage::Close() {
io_mgr_.Shutdown();
}

void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
DCHECK_GT(segment.length, 0u);

uint8_t* buf = new uint8_t[segment.length];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add TODO: use registered buffers (UringProactor::RegisterBuffers)

auto io_cb = [cb, buf, segment](int res) {
cb(std::string_view{reinterpret_cast<char*>(buf), segment.length});
delete buf; // because std::function needs to be copyable, unique_ptr can't be used
};
io_mgr_.ReadAsync(segment.offset, {buf, segment.length}, std::move(io_cb));
}

void DiskStorage::MarkAsFree(DiskSegment segment) {
DCHECK_GT(segment.length, 0u);
alloc_.Free(segment.offset, segment.length);
}

std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
DCHECK_GT(bytes.length(), 0u);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please dcheck on length 4k alignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean offset

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant bytes.length()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant bytes.length()

Didn't we discuss above what the potential benefits of 4k alignment would be if our abstractions both ways work with byte ranges? If it's always 4k bytes, then why not count pages instead 😆

int64_t offset = alloc_.Malloc(bytes.size());

// If we've run out of space, block and grow as much as needed
if (offset < 0) {
size_t start = io_mgr_.Span();
size_t grow_size = -offset;
RETURN_ON_ERR(io_mgr_.Grow(grow_size));

alloc_.AddStorage(start, grow_size);
return Stash(bytes, std::move(cb));
romange marked this conversation as resolved.
Show resolved Hide resolved
}

auto io_cb = [this, cb, offset, bytes, len = bytes.size()](int io_res) {
if (io_res < 0) {
MarkAsFree({size_t(offset), len});
cb({});
} else {
cb({size_t(offset), len});
}
};

return io_mgr_.WriteAsync(offset, io::View(bytes), std::move(io_cb));
}

} // namespace dfly::tiering
46 changes: 46 additions & 0 deletions src/server/tiering/disk_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <string>
#include <system_error>

#include "core/external_alloc.h"
#include "io/io.h"
#include "server/io_mgr.h"

namespace dfly::tiering {

struct DiskSegment {
size_t offset, length;
};
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

// Disk storage controlled by asynchronous operations.
class DiskStorage {
public:
using ReadCb = std::function<void(std::string_view)>;
using StashCb = std::function<void(DiskSegment)>;

std::error_code Open(std::string_view path);
void Close();

// Request read for segment, cb will be called on completion with read value
void Read(DiskSegment segment, ReadCb cb);

// Mark segment as free, performed immediately
void MarkAsFree(DiskSegment segment);

// Request bytes to be stored, cb will be called with assigned segment on completion. Can block to
// grow backing file, thus safe to call only from single fiber.
// Returns error code if operation failed immediately (most likely it failed to grow the backing
// file) or passes an empty segment if the final write operation failed.
std::error_code Stash(io::Bytes bytes, StashCb cb);
romange marked this conversation as resolved.
Show resolved Hide resolved

private:
IoMgr io_mgr_;
ExternalAllocator alloc_;
};

}; // namespace dfly::tiering
131 changes: 131 additions & 0 deletions src/server/tiering/disk_storage_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/tiering/disk_storage.h"

#include <memory>

#include "base/gtest.h"
#include "base/logging.h"
#include "util/fibers/fibers.h"
#include "util/fibers/pool.h"

namespace dfly::tiering {

using namespace std;
using namespace std::string_literals;

class PoolTestBase : public testing::Test {
protected:
static void SetUpTestSuite();
static void TearDownTestSuite();

static unique_ptr<util::ProactorPool> pp_;
};

unique_ptr<util::ProactorPool> PoolTestBase::pp_ = nullptr;

void PoolTestBase::SetUpTestSuite() {
pp_.reset(util::fb2::Pool::IOUring(16, 2));
pp_->Run();
}

void PoolTestBase::TearDownTestSuite() {
pp_->Stop();
pp_.reset();
}

struct DiskStorageTest : public PoolTestBase {
~DiskStorageTest() {
EXPECT_EQ(pending_ops_, 0);
}

void Open(std::string_view filename) {
storage_ = make_unique<DiskStorage>();
storage_->Open(filename);
}

void Close() {
storage_->Close();
storage_.reset();
}

void Stash(size_t index, string value) {
pending_ops_++;
auto buf = make_shared<string>(value);
storage_->Stash(io::Buffer(*buf), [this, index, buf](DiskSegment segment) {
segments_[index] = segment;
pending_ops_--;
});
}

void Read(size_t index) {
pending_ops_++;
storage_->Read(segments_[index], [this, index](string_view value) {
last_reads_[index] = value;
pending_ops_--;
});
}

void Delete(size_t index) {
storage_->MarkAsFree(segments_[index]);
segments_.erase(index);
last_reads_.erase(index);
}

void Wait() {
while (pending_ops_ > 0) {
::util::ThisFiber::SleepFor(1ms);
}
}

protected:
int pending_ops_ = 0;

std::unordered_map<size_t, string> last_reads_;
std::unordered_map<size_t, DiskSegment> segments_;
std::unique_ptr<DiskStorage> storage_;
};

TEST_F(DiskStorageTest, Basic) {
pp_->at(0)->Await([this] {
// Write 100 values
Open("testfile");
for (size_t i = 0; i < 100; i++)
Stash(i, absl::StrCat("value", i));
Wait();
EXPECT_EQ(segments_.size(), 100);

// Read all 100 values
for (size_t i = 0; i < 100; i++)
Read(i);
Wait();

// Expect them to be equal to written
for (size_t i = 0; i < 100; i++)
EXPECT_EQ(last_reads_[i], absl::StrCat("value", i));

Close();
});
}

TEST_F(DiskStorageTest, ReUse) {
pp_->at(0)->Await([this] {
Open("testfile");

Stash(0, "value1");
Wait();
EXPECT_EQ(segments_[0].offset, 0u);

Delete(0);

Stash(1, "value2");
Wait();
EXPECT_EQ(segments_[1].offset, 0u);

Close();
});
}

} // namespace dfly::tiering
Loading