Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tiering): DiskStorage #2770

Merged
merged 4 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
cluster/cluster_family.cc cluster/cluster_slot_migration.cc
cluster/cluster_shard_migration.cc cluster/outgoing_slot_migration.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)
acl/validator.cc acl/helpers.cc
tiering/disk_storage.cc)

if (DF_ENABLE_MEMORY_TRACKING)
target_compile_definitions(dragonfly_lib PRIVATE DFLY_ENABLE_MEMORY_TRACKING)
Expand Down Expand Up @@ -102,6 +103,7 @@ cxx_test(acl/user_registry_test dfly_test_lib LABELS DFLY)
cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY)
cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY)
cxx_test(search/search_family_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY)
if (WITH_ASAN OR WITH_USAN)
target_compile_definitions(stream_family_test PRIVATE SANITIZERS)
target_compile_definitions(multi_test PRIVATE SANITIZERS)
Expand Down
47 changes: 38 additions & 9 deletions src/server/io_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@ constexpr inline size_t alignup(size_t num, size_t align) {

} // namespace

IoMgr::IoMgr() {
flags_val = 0;
}

constexpr size_t kInitialSize = 1UL << 28; // 256MB

error_code IoMgr::Open(const string& path) {
error_code IoMgr::Open(std::string_view path) {
CHECK(!backing_file_);

int kFlags = O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC;
Expand Down Expand Up @@ -71,25 +67,44 @@ error_code IoMgr::Open(const string& path) {
return error_code{};
}

error_code IoMgr::Grow(size_t len) {
Proactor* proactor = (Proactor*)ProactorBase::me();

if (exchange(grow_progress_, true))
return make_error_code(errc::operation_in_progress);

fb2::FiberCall fc(proactor);
fc->PrepFallocate(backing_file_->fd(), 0, sz_, len);
Proactor::IoResult res = fc.Get();

grow_progress_ = false;

if (res == 0) {
sz_ += len;
return {};
} else {
return std::error_code(-res, std::iostream_category());
}
}

error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
DCHECK_EQ(0u, len % (1 << 20));

if (flags.grow_progress) {
if (exchange(grow_progress_, true)) {
return make_error_code(errc::operation_in_progress);
}

Proactor* proactor = (Proactor*)ProactorBase::me();

SubmitEntry entry = proactor->GetSubmitEntry(
[this, len, cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t) {
this->flags.grow_progress = 0;
this->grow_progress_ = false;
sz_ += (res == 0 ? len : 0);
cb(res);
},
0);

entry.PrepFallocate(backing_file_->fd(), 0, sz_, len);
flags.grow_progress = 1;

return error_code{};
}
Expand Down Expand Up @@ -146,8 +161,22 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
return ec;
}

std::error_code IoMgr::ReadAsync(size_t offset, absl::Span<uint8_t> buffer, ReadCb cb) {
DCHECK(!buffer.empty());
VLOG(1) << "Read " << offset << "/" << buffer.size();

Proactor* proactor = (Proactor*)ProactorBase::me();

auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); };

SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0);
se.PrepRead(backing_file_->fd(), buffer.data(), buffer.size(), offset);

return error_code{};
}

void IoMgr::Shutdown() {
while (flags_val) {
while (grow_progress_) {
ThisFiber::SleepFor(200us); // TODO: hacky for now.
}
}
Expand Down
22 changes: 12 additions & 10 deletions src/server/io_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ class IoMgr {
// (io_res, )
using GrowCb = std::function<void(int)>;

IoMgr();
using ReadCb = std::function<void(int)>;

// blocks until all the pending requests are finished.
void Shutdown();

std::error_code Open(const std::string& path);
std::error_code Open(std::string_view path);

// Try growing file by that length. Return error if growth failed.
std::error_code Grow(size_t len);
romange marked this conversation as resolved.
Show resolved Hide resolved

// Grows file by that length. len must be divided by 1MB.
// passing other values will check-fail.
Expand All @@ -36,15 +39,20 @@ class IoMgr {
// Returns error if submission failed. Otherwise - returns the io result
// via cb. A caller must make sure that the blob exists until cb is called.
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);

// Read synchronously into dest
std::error_code Read(size_t offset, io::MutableBytes dest);

// Read into dest and call cb once read
std::error_code ReadAsync(size_t offset, io::MutableBytes dest, ReadCb cb);

// Total file span
size_t Span() const {
return sz_;
}

bool grow_pending() const {
return flags.grow_progress;
return grow_progress_;
}

const IoMgrStats& GetStats() const {
Expand All @@ -55,13 +63,7 @@ class IoMgr {
std::unique_ptr<util::fb2::LinuxFile> backing_file_;
size_t sz_ = 0;

union {
uint8_t flags_val;
struct {
uint8_t grow_progress : 1;
} flags;
};

bool grow_progress_ = false;
IoMgrStats stats_;
};

Expand Down
73 changes: 73 additions & 0 deletions src/server/tiering/disk_storage.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "server/tiering/disk_storage.h"
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

#include "base/io_buf.h"
#include "base/logging.h"
#include "server/error.h"

namespace dfly::tiering {

std::error_code DiskStorage::Open(std::string_view path) {
RETURN_ON_ERR(io_mgr_.Open(path));
alloc_.AddStorage(0, io_mgr_.Span());
return {};
}

void DiskStorage::Close() {
io_mgr_.Shutdown();
}

void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
DCHECK_GT(segment.length, 0u);
DCHECK_EQ(segment.offset % 4096, 0u);

// TODO: use registered buffers (UringProactor::RegisterBuffers)
uint8_t* buf = new uint8_t[segment.length];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add TODO: use registered buffers (UringProactor::RegisterBuffers)

auto io_cb = [cb, buf, segment](int res) {
cb(std::string_view{reinterpret_cast<char*>(buf), segment.length});
delete buf; // because std::function needs to be copyable, unique_ptr can't be used
};
io_mgr_.ReadAsync(segment.offset, {buf, segment.length}, std::move(io_cb));
}

void DiskStorage::MarkAsFree(DiskSegment segment) {
DCHECK_GT(segment.length, 0u);
DCHECK_EQ(segment.offset % 4096, 0u);

alloc_.Free(segment.offset, segment.length);
}

std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
DCHECK_GT(bytes.length(), 0u);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please dcheck on length 4k alignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean offset

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant bytes.length()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant bytes.length()

Didn't we discuss above what the potential benefits of 4k alignment would be if our abstractions both ways work with byte ranges? If it's always 4k bytes, then why not count pages instead 😆

int64_t offset = alloc_.Malloc(bytes.size());

// If we've run out of space, block and grow as much as needed
if (offset < 0) {
size_t start = io_mgr_.Span();
size_t grow_size = -offset;
RETURN_ON_ERR(io_mgr_.Grow(grow_size));

alloc_.AddStorage(start, grow_size);
offset = alloc_.Malloc(bytes.size());

if (offset < 0) // we can't fit it even after resizing
return std::make_error_code(std::errc::file_too_large);
}

auto io_cb = [this, cb, offset, bytes, len = bytes.size()](int io_res) {
if (io_res < 0) {
MarkAsFree({size_t(offset), len});
cb({});
} else {
cb({size_t(offset), len});
}
};

return io_mgr_.WriteAsync(offset, io::View(bytes), std::move(io_cb));
}

} // namespace dfly::tiering
46 changes: 46 additions & 0 deletions src/server/tiering/disk_storage.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <string>
#include <system_error>

#include "core/external_alloc.h"
#include "io/io.h"
#include "server/io_mgr.h"

namespace dfly::tiering {

struct DiskSegment {
// Mesured in bytes, offset should be aligned to page boundaries (4kb)
size_t offset, length;
};

// Disk storage controlled by asynchronous operations.
class DiskStorage {
public:
using ReadCb = std::function<void(std::string_view)>;
using StashCb = std::function<void(DiskSegment)>;

std::error_code Open(std::string_view path);
void Close();

// Request read for segment, cb will be called on completion with read value
void Read(DiskSegment segment, ReadCb cb);

// Mark segment as free, performed immediately
void MarkAsFree(DiskSegment segment);

// Request bytes to be stored, cb will be called with assigned segment on completion. Can block to
// grow backing file. Returns error code if operation failed immediately (most likely it failed
// to grow the backing file) or passes an empty segment if the final write operation failed.
std::error_code Stash(io::Bytes bytes, StashCb cb);

private:
IoMgr io_mgr_;
ExternalAllocator alloc_;
};

}; // namespace dfly::tiering
Loading
Loading