Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-654] Add gc callback for PageStorage #319

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions dbms/src/Storages/Page/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ namespace PageMetaFormat
using WBSize = UInt32;
using PageFileVersion = PageFile::Version;
// TODO we should align these alias with type in PageCache
using PageTag = UInt64;
using IsPut = std::underlying_type<WriteBatch::WriteType>::type;
using PageOffset = UInt64;
using Checksum = UInt64;
using PageTag = UInt64;
using IsPut = std::underlying_type<WriteBatch::WriteType>::type;
using PageOffset = UInt64;
using Checksum = UInt64;

static const size_t PAGE_META_SIZE = sizeof(PageId) + sizeof(PageTag) + sizeof(PageOffset) + sizeof(PageSize) + sizeof(Checksum);

Expand Down Expand Up @@ -96,7 +96,8 @@ std::pair<ByteBuffer, ByteBuffer> genWriteData( //
case WriteBatch::WriteType::PUT:
case WriteBatch::WriteType::MOVE_NORMAL_PAGE:
{
write.read_buffer->readStrict(data_pos, write.size);
if (write.read_buffer) // In case read_buffer is nullptr
write.read_buffer->readStrict(data_pos, write.size);
Checksum page_checksum = CityHash_v1_0_2::CityHash64(data_pos, write.size);
data_pos += write.size;

Expand Down Expand Up @@ -177,7 +178,7 @@ std::pair<UInt64, UInt64> analyzeMetaFile( //
// this field is always true now
const auto version = PageUtil::get<PageFileVersion>(pos);
if (version != PageFile::CURRENT_VERSION)
throw Exception("Version not match", ErrorCodes::LOGICAL_ERROR);
throw Exception("Version not match, version: " + DB::toString(version), ErrorCodes::LOGICAL_ERROR);

// check the checksum of WriteBatch
const auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum);
Expand Down Expand Up @@ -226,6 +227,7 @@ std::pair<UInt64, UInt64> analyzeMetaFile( //
const auto ref_id = PageUtil::get<PageId>(pos);
const auto page_id = PageUtil::get<PageId>(pos);
edit.ref(ref_id, page_id);
break;
}
}
}
Expand Down Expand Up @@ -584,6 +586,8 @@ bool PageFile::isExist() const

UInt64 PageFile::getDataFileSize() const
{
if (type == Type::Legacy)
return 0;
Poco::File file(dataPath());
return file.getSize();
}
Expand Down
47 changes: 43 additions & 4 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ PageId PageStorage::getMaxId()
return versioned_page_entries.getSnapshot()->version()->maxId();
}

PageId PageStorage::getNormalPageId(PageId page_id, SnapshotPtr snapshot)
{
if (!snapshot)
{
snapshot = this->getSnapshot();
}

auto [is_ref_id, normal_page_id] = snapshot->version()->isRefId(page_id);
return is_ref_id ? normal_page_id : page_id;
}

PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot)
{
if (!snapshot)
Expand Down Expand Up @@ -348,11 +359,32 @@ void PageStorage::traversePageEntries( //
#endif
}

void PageStorage::registerExternalPagesCallbacks(ExternalPagesScanner scanner, ExternalPagesRemover remover)
{
assert(scanner != nullptr);
assert(remover != nullptr);
external_pages_scanner = scanner;
external_pages_remover = remover;
}

bool PageStorage::gc()
{
std::lock_guard<std::mutex> gc_lock(gc_mutex);
// get all PageFiles
// If another thread is running gc, just return;
bool v = false;
if (!gc_is_running.compare_exchange_strong(v, true))
return false;

SCOPE_EXIT({
bool is_running = true;
gc_is_running.compare_exchange_strong(is_running, false);
});

/// Get all pending external pages and PageFiles. Note that we should get external pages before PageFiles.
std::set<PageId> external_pages;
if (external_pages_scanner)
{
external_pages = external_pages_scanner();
}
auto page_files = PageStorage::listAllPageFiles(storage_path, /* remove_tmp_file */ true, /* ignore_legacy */ true, page_file_log);
if (page_files.empty())
{
Expand Down Expand Up @@ -420,9 +452,8 @@ bool PageStorage::gc()
gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files, migrate_page_count);
}

std::set<PageFileIdAndLevel> live_files;
/// Here we have to apply edit to versioned_page_entries and generate a new version, then return all files that are in used
live_files = versioned_page_entries.gcApply(gc_file_entries_edit);
auto [live_files, live_normal_pages] = versioned_page_entries.gcApply(gc_file_entries_edit);

{
// Remove obsolete files' reader cache that are not used by any version
Expand All @@ -444,6 +475,12 @@ bool PageStorage::gc()

// Delete obsolete files that are not used by any version, without lock
gcRemoveObsoleteData(page_files, writing_file_id_level, live_files);

// Invoke callback with valid normal page id after gc.
if (external_pages_remover)
{
external_pages_remover(external_pages, live_normal_pages);
}
return true;
}

Expand Down Expand Up @@ -495,6 +532,8 @@ PageEntriesEdit PageStorage::gcMigratePages(const SnapshotPtr & snapshot,
const size_t migrate_page_count) const
{
PageEntriesEdit gc_file_edit;
if (merge_files.empty())
return gc_file_edit;

// merge `merge_files` to PageFile which PageId = max of all `merge_files` and level = level + 1
auto [largest_file_id, level] = *(merge_files.rbegin());
Expand Down
18 changes: 17 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class PageStorage
using ReaderPtr = std::shared_ptr<PageFile::Reader>;
using OpenReadFiles = std::map<PageFileIdAndLevel, ReaderPtr>;

using ExternalPagesScanner = std::function<std::set<PageId>()>;
using ExternalPagesRemover
= std::function<void(const std::set<PageId> & pengding_external_pages, const std::set<PageId> & valid_normal_pages)>;

public:
PageStorage(String name, const String & storage_path, const Config & config_);

Expand All @@ -75,6 +79,13 @@ class PageStorage
void traversePageEntries(const std::function<void(PageId page_id, const PageEntry & page)> & acceptor, SnapshotPtr snapshot);
bool gc();

PageId getNormalPageId(PageId page_id, SnapshotPtr snapshot = {});

// Register two callback:
// `scanner` for scanning avaliable external page ids.
// `remover` will be called with living normal page ids after gc run a round.
void registerExternalPagesCallbacks(ExternalPagesScanner scanner, ExternalPagesRemover remover);

static std::set<PageFile, PageFile::Comparator>
listAllPageFiles(const String & storage_path, bool remove_tmp_file, bool ignore_legacy, Poco::Logger * page_file_log);

Expand Down Expand Up @@ -115,7 +126,11 @@ class PageStorage
VersionedPageEntries versioned_page_entries;

std::mutex write_mutex;
std::mutex gc_mutex; // A mutex used to protect gc

std::atomic<bool> gc_is_running = false;

ExternalPagesScanner external_pages_scanner = nullptr;
ExternalPagesRemover external_pages_remover = nullptr;
};

class PageReader
Expand All @@ -131,6 +146,7 @@ class PageReader
PageMap read(const std::vector<PageId> & page_ids) const { return storage.read(page_ids, snap); }
void read(const std::vector<PageId> & page_ids, PageHandler & handler) const { storage.read(page_ids, handler, snap); };

PageId getNormalPageId(PageId page_id) const { return storage.getNormalPageId(page_id, snap); }
UInt64 getPageChecksum(PageId page_id) const { return storage.getEntry(page_id, snap).checksum; }

private:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/PageUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ void writeFile(int fd, UInt64 offset, const char * data, size_t to_write, const

void readFile(int fd, const off_t offset, const char * buf, size_t expected_bytes, const std::string & path)
{
if (unlikely(expected_bytes == 0))
return;

ProfileEvents::increment(ProfileEvents::PSMReadCalls);

size_t bytes_read = 0;
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Storages/Page/VersionSet/PageEntriesVersionSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace DB
{

std::set<PageFileIdAndLevel> PageEntriesVersionSet::gcApply(PageEntriesEdit & edit)
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>> PageEntriesVersionSet::gcApply(PageEntriesEdit & edit)
{
std::unique_lock lock(read_mutex);

Expand All @@ -20,18 +20,21 @@ std::set<PageFileIdAndLevel> PageEntriesVersionSet::gcApply(PageEntriesEdit & ed
return listAllLiveFiles(lock);
}

std::set<PageFileIdAndLevel> PageEntriesVersionSet::listAllLiveFiles(const std::unique_lock<std::shared_mutex> & lock) const
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>>
PageEntriesVersionSet::listAllLiveFiles(const std::unique_lock<std::shared_mutex> & lock) const
{
(void)lock;
std::set<PageFileIdAndLevel> liveFiles;
std::set<PageFileIdAndLevel> live_files;
std::set<PageId> live_normal_pages;
for (PageEntries * v = placeholder_node.next; v != &placeholder_node; v = v->next)
{
for (auto it = v->pages_cbegin(); it != v->pages_cend(); ++it)
{
liveFiles.insert(it->second.fileIdLevel());
live_normal_pages.insert(it->first);
live_files.insert(it->second.fileIdLevel());
}
}
return liveFiles;
return {live_files, live_normal_pages};
}


Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/VersionSet/PageEntriesVersionSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ class PageEntriesVersionSet : public ::DB::MVCC::VersionSet<PageEntries, PageEnt

/// `gcApply` only accept PageEntry's `PUT` changes and will discard changes if PageEntry is invalid
/// append new version to version-list
std::set<PageFileIdAndLevel> gcApply(PageEntriesEdit & edit);
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>> gcApply(PageEntriesEdit & edit);

/// List all PageFile that are used by any version
std::set<PageFileIdAndLevel> listAllLiveFiles(const std::unique_lock<std::shared_mutex> &) const;
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>> listAllLiveFiles(const std::unique_lock<std::shared_mutex> &) const;
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace DB
// PageEntriesVersionSetWithDelta
//==========================================================================================

std::set<PageFileIdAndLevel> PageEntriesVersionSetWithDelta::gcApply(PageEntriesEdit & edit)
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>> PageEntriesVersionSetWithDelta::gcApply(PageEntriesEdit & edit)
{
std::unique_lock lock(read_mutex);
if (!edit.empty())
Expand All @@ -36,40 +36,38 @@ std::set<PageFileIdAndLevel> PageEntriesVersionSetWithDelta::gcApply(PageEntries
return listAllLiveFiles(lock);
}

std::set<PageFileIdAndLevel> PageEntriesVersionSetWithDelta::listAllLiveFiles(const std::unique_lock<std::shared_mutex> & lock) const
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>>
PageEntriesVersionSetWithDelta::listAllLiveFiles(const std::unique_lock<std::shared_mutex> & lock) const
{
(void)lock; // Note read_mutex must be hold.
std::set<PageFileIdAndLevel> liveFiles;
std::set<VersionPtr> visitedVersions; // avoid to access same version multiple time

/// TODO: this is costly, maybe we should find a better way not to block generating other snapshot.
std::set<PageFileIdAndLevel> live_files;
std::set<PageId> live_normal_pages;
// Iterate all snapshot to collect all PageFile in used.
for (auto s = snapshots->next; s != snapshots.get(); s = s->next)
{
collectLiveFilesFromVersionList(s->version()->getSharedTailVersion(), visitedVersions, liveFiles);
collectLiveFilesFromVersionList(*(s->version()), live_files, live_normal_pages);
}
// Iterate over `current`
collectLiveFilesFromVersionList(current, visitedVersions, liveFiles);
return liveFiles;
PageEntriesView latest_view(current);
collectLiveFilesFromVersionList(latest_view, live_files, live_normal_pages);
return {live_files, live_normal_pages};
}

void PageEntriesVersionSetWithDelta::collectLiveFilesFromVersionList( //
VersionPtr v,
std::set<VersionPtr> & visited,
std::set<PageFileIdAndLevel> & liveFiles) const
const PageEntriesView & view,
std::set<PageFileIdAndLevel> & live_files,
std::set<PageId> & live_normal_pages) const
{
for (; v != nullptr; v = v->prev)
std::set<PageId> normal_pages_this_snapshot = view.validNormalPageIds();
for (auto normal_page_id : normal_pages_this_snapshot)
{
// If this version has been visited, all previous version has been collected.
if (visited.count(v) > 0)
break;
for (auto it = v->pages_cbegin(); it != v->pages_cend(); ++it)
live_normal_pages.insert(normal_page_id);
if (auto entry = view.findNormalPageEntry(normal_page_id); entry && !entry->isTombstone())
{
// ignore if it is a tombstone entry
if (it->second.ref != 0)
{
liveFiles.insert(it->second.fileIdLevel());
}
live_files.insert(entry->fileIdLevel());
}
visited.insert(v);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,22 @@ class PageEntriesVersionSetWithDelta : public ::DB::MVCC::VersionSetWithDelta< /
using VersionPtr = BaseType::VersionPtr;

public:
explicit PageEntriesVersionSetWithDelta(const ::DB::MVCC::VersionSetConfig & config_, Poco::Logger * log_)
: BaseType(config_, log_)
{
}
explicit PageEntriesVersionSetWithDelta(const ::DB::MVCC::VersionSetConfig & config_, Poco::Logger * log_) : BaseType(config_, log_) {}

public:
std::set<PageFileIdAndLevel> gcApply(PageEntriesEdit & edit);
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>> gcApply(PageEntriesEdit & edit);

/// List all PageFile that are used by any version
std::set<PageFileIdAndLevel> listAllLiveFiles(const std::unique_lock<std::shared_mutex> &) const;
std::pair<std::set<PageFileIdAndLevel>, std::set<PageId>> listAllLiveFiles(const std::unique_lock<std::shared_mutex> &) const;

VersionPtr compactDeltas(const VersionPtr & tail) const override;

VersionPtr compactDeltaAndBase(const VersionPtr & old_base, const VersionPtr & delta) const override;

private:
void collectLiveFilesFromVersionList(VersionPtr tail, std::set<VersionPtr> & visited, std::set<PageFileIdAndLevel> & liveFiles) const;
void collectLiveFilesFromVersionList(const PageEntriesView & view,
std::set<PageFileIdAndLevel> & live_files,
std::set<PageId> & live_normal_pages) const;
};

/// Read old entries state from `view_` and apply new edit to `view_->tail`
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/Page/WriteBatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ class WriteBatch
writes.emplace_back(w);
}

void putExternal(PageId page_id, UInt64 tag)
{
// External page's data is not managed by PageStorage, which means data is empty.
Write w = {WriteType::PUT, page_id, tag, nullptr, 0, 0};
writes.emplace_back(w);
}

void gcMovePage(PageId page_id, UInt64 tag, const ReadBufferPtr & read_buffer, UInt32 size)
{
Write w = {WriteType::MOVE_NORMAL_PAGE, page_id, tag, read_buffer, size, 0};
Expand Down
Loading