This repository has been archived by the owner on Aug 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Support state history log splitting #9277
Merged
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
27ad707
Support state history log splitting
huangminghuang c2e1188
fix namespace problem
huangminghuang 692d80b
Avoid designated initializers
huangminghuang bac6239
Add retained directory
huangminghuang 1e938de
Address some PR comments
huangminghuang 083cf2d
Address PR comment
huangminghuang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
#pragma once | ||
#include <boost/filesystem/path.hpp> | ||
|
||
namespace eosio { | ||
namespace chain { | ||
|
||
namespace bfs = boost::filesystem; | ||
|
||
struct block_log_config { | ||
bfs::path log_dir; | ||
bfs::path retained_dir; | ||
bfs::path archive_dir; | ||
uint32_t stride = UINT32_MAX; | ||
uint16_t max_retained_files = 10; | ||
bool fix_irreversible_blocks = false; | ||
}; | ||
|
||
} // namespace chain | ||
} // namespace eosio |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
#pragma once | ||
#include <boost/container/flat_map.hpp> | ||
#include <boost/filesystem/path.hpp> | ||
#include <boost/iostreams/device/mapped_file.hpp> | ||
#include <fc/io/cfile.hpp> | ||
#include <fc/io/datastream.hpp> | ||
#include <regex> | ||
|
||
namespace eosio { | ||
namespace chain { | ||
|
||
namespace bfs = boost::filesystem; | ||
|
||
template <typename Lambda> | ||
void for_each_file_in_dir_matches(const bfs::path& dir, std::string pattern, Lambda&& lambda) { | ||
const std::regex my_filter(pattern); | ||
std::smatch what; | ||
bfs::directory_iterator end_itr; // Default ctor yields past-the-end | ||
for (bfs::directory_iterator p(dir); p != end_itr; ++p) { | ||
// Skip if not a file | ||
if (!bfs::is_regular_file(p->status())) | ||
continue; | ||
// skip if it does not match the pattern | ||
if (!std::regex_match(p->path().filename().string(), what, my_filter)) | ||
continue; | ||
lambda(p->path()); | ||
} | ||
} | ||
|
||
struct null_verifier { | ||
template <typename LogData> | ||
void verify(const LogData&, const bfs::path&) {} | ||
}; | ||
|
||
template <typename LogData, typename LogIndex, typename LogVerifier = null_verifier> | ||
struct log_catalog { | ||
using block_num_t = uint32_t; | ||
|
||
struct mapped_type { | ||
block_num_t last_block_num; | ||
bfs::path filename_base; | ||
}; | ||
using collection_t = boost::container::flat_map<block_num_t, mapped_type>; | ||
using size_type = typename collection_t::size_type; | ||
static constexpr size_type npos = std::numeric_limits<size_type>::max(); | ||
|
||
using mapmode = boost::iostreams::mapped_file::mapmode; | ||
|
||
bfs::path retained_dir; | ||
bfs::path archive_dir; | ||
size_type max_retained_files = 10; | ||
collection_t collection; | ||
size_type active_index = npos; | ||
LogData log_data; | ||
LogIndex log_index; | ||
LogVerifier verifier; | ||
|
||
bool empty() const { return collection.empty(); } | ||
|
||
uint32_t first_block_num() const { | ||
if (empty()) | ||
return 0; | ||
return collection.begin()->first; | ||
} | ||
|
||
static bfs::path make_abosolute_dir(const bfs::path& base_dir, bfs::path new_dir) { | ||
if (new_dir.is_relative()) | ||
new_dir = base_dir / new_dir; | ||
|
||
if (!bfs::is_directory(new_dir)) | ||
bfs::create_directories(new_dir); | ||
|
||
return new_dir; | ||
} | ||
|
||
void open(const bfs::path& log_dir, const bfs::path& retained_dir, const bfs::path& archive_dir, const char* name, | ||
const char* suffix_pattern = R"(-\d+-\d+\.log)") { | ||
|
||
this->retained_dir = make_abosolute_dir(log_dir, retained_dir.empty() ? log_dir : retained_dir); | ||
if (!archive_dir.empty()) { | ||
this->archive_dir = make_abosolute_dir(log_dir, archive_dir); | ||
} | ||
|
||
for_each_file_in_dir_matches(this->retained_dir, std::string(name) + suffix_pattern, [this](bfs::path path) { | ||
auto log_path = path; | ||
auto index_path = path.replace_extension("index"); | ||
auto path_without_extension = log_path.parent_path() / log_path.stem().string(); | ||
|
||
LogData log(log_path); | ||
|
||
verifier.verify(log, log_path); | ||
|
||
// check if index file matches the log file | ||
if (!index_matches_data(index_path, log)) | ||
log.construct_index(index_path); | ||
|
||
auto existing_itr = collection.find(log.first_block_num()); | ||
if (existing_itr != collection.end()) { | ||
if (log.last_block_num() <= existing_itr->second.last_block_num) { | ||
wlog("${log_path} contains the overlapping range with ${existing_path}.log, dropping ${log_path} " | ||
"from catalog", | ||
("log_path", log_path.string())("existing_path", existing_itr->second.filename_base.string())); | ||
return; | ||
} else { | ||
wlog( | ||
"${log_path} contains the overlapping range with ${existing_path}.log, droping ${existing_path}.log " | ||
"from catelog", | ||
("log_path", log_path.string())("existing_path", existing_itr->second.filename_base.string())); | ||
} | ||
} | ||
|
||
collection.insert_or_assign(log.first_block_num(), mapped_type{log.last_block_num(), path_without_extension}); | ||
}); | ||
} | ||
|
||
bool index_matches_data(const bfs::path& index_path, const LogData& log) const { | ||
if (!bfs::exists(index_path)) return false; | ||
|
||
auto num_blocks_in_index = bfs::file_size(index_path) / sizeof(uint64_t); | ||
if (num_blocks_in_index != log.num_blocks()) | ||
return false; | ||
|
||
// make sure the last 8 bytes of index and log matches | ||
fc::cfile index_file; | ||
index_file.set_file_path(index_path); | ||
index_file.open("r"); | ||
index_file.seek_end(-sizeof(uint64_t)); | ||
uint64_t pos; | ||
index_file.read(reinterpret_cast<char*>(&pos), sizeof(pos)); | ||
return pos == log.last_block_position(); | ||
} | ||
|
||
std::optional<uint64_t> get_block_position(uint32_t block_num, mapmode mode = mapmode::readonly) { | ||
try { | ||
if (active_index != npos) { | ||
auto active_item = collection.nth(active_index); | ||
if (active_item->first <= block_num && block_num <= active_item->second.last_block_num && | ||
log_data.flags() == mode) { | ||
return log_index.nth_block_position(block_num - log_data.first_block_num()); | ||
} | ||
} | ||
if (collection.empty() || block_num < collection.begin()->first) | ||
return {}; | ||
|
||
auto it = --collection.upper_bound(block_num); | ||
|
||
if (block_num <= it->second.last_block_num) { | ||
auto name = it->second.filename_base; | ||
log_data.open(name.replace_extension("log"), mode); | ||
log_index.open(name.replace_extension("index")); | ||
this->active_index = collection.index_of(it); | ||
return log_index.nth_block_position(block_num - log_data.first_block_num()); | ||
} | ||
return {}; | ||
} catch (...) { | ||
this->active_index = npos; | ||
return {}; | ||
} | ||
} | ||
|
||
std::pair<fc::datastream<const char*>, uint32_t> ro_stream_for_block(uint32_t block_num) { | ||
auto pos = get_block_position(block_num, mapmode::readonly); | ||
if (pos) { | ||
return std::make_pair(log_data.ro_stream_at(*pos), log_data.version()); | ||
} | ||
return {fc::datastream<const char*>(nullptr, 0), static_cast<uint32_t>(0)}; | ||
} | ||
|
||
std::pair<fc::datastream<char*>, uint32_t> rw_stream_for_block(uint32_t block_num) { | ||
auto pos = get_block_position(block_num, mapmode::readwrite); | ||
if (pos) { | ||
return std::make_pair(log_data.rw_stream_at(*pos), log_data.version()); | ||
} | ||
return {fc::datastream<char*>(nullptr, 0), static_cast<uint32_t>(0)}; | ||
} | ||
|
||
std::optional<block_id_type> id_for_block(uint32_t block_num) { | ||
auto pos = get_block_position(block_num, mapmode::readonly); | ||
if (pos) { | ||
return log_data.block_id_at(*pos); | ||
} | ||
return {}; | ||
} | ||
|
||
static void rename_if_not_exists(bfs::path old_name, bfs::path new_name) { | ||
if (!bfs::exists(new_name)) { | ||
bfs::rename(old_name, new_name); | ||
} | ||
else { | ||
bfs::remove(old_name); | ||
wlog("${new_name} already exists, just removing ${old_name}", ("old_name", old_name.string())("new_name", new_name.string())); | ||
} | ||
} | ||
|
||
static void rename_bundle(bfs::path orig_path, bfs::path new_path) { | ||
rename_if_not_exists(orig_path.replace_extension(".log"), new_path.replace_extension(".log")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it matter to the code after calling this, if the rename fails or succeeds? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't matter. The design was to cover the case when the archive or retaining dir is a network shared directory like (NFS mount), where multiple nodeos can write to it. If the file already exists because another nodeos instance has written to it. We should be able to ignore it. |
||
rename_if_not_exists(orig_path.replace_extension(".index"), new_path.replace_extension(".index")); | ||
} | ||
|
||
/// Add a new entry into the catalog. | ||
/// | ||
/// Notice that \c start_block_num must be monotonically increasing between the invocations of this function | ||
/// so that the new entry would be inserted at the end of the flat_map; otherwise, \c active_index would be | ||
/// invalidated and the mapping between the log data their block range would be wrong. This function is only used | ||
/// during the splitting of block log. Using this function for other purpose should make sure if the monotonically | ||
/// increasing block num guarantee can be met. | ||
void add(uint32_t start_block_num, uint32_t end_block_num, const bfs::path& dir, const char* name) { | ||
|
||
const int bufsize = 64; | ||
char buf[bufsize]; | ||
snprintf(buf, bufsize, "%s-%d-%d", name, start_block_num, end_block_num); | ||
bfs::path new_path = retained_dir / buf; | ||
rename_bundle(dir / name, new_path); | ||
|
||
if (this->collection.size() >= max_retained_files) { | ||
const auto items_to_erase = | ||
max_retained_files > 0 ? this->collection.size() - max_retained_files + 1 : this->collection.size(); | ||
for (auto it = this->collection.begin(); it < this->collection.begin() + items_to_erase; ++it) { | ||
auto orig_name = it->second.filename_base; | ||
if (archive_dir.empty()) { | ||
// delete the old files when no backup dir is specified | ||
bfs::remove(orig_name.replace_extension("log")); | ||
bfs::remove(orig_name.replace_extension("index")); | ||
} else { | ||
// move the the archive dir | ||
rename_bundle(orig_name, archive_dir/orig_name.filename() ); | ||
} | ||
} | ||
this->collection.erase(this->collection.begin(), this->collection.begin() + items_to_erase); | ||
this->active_index = this->active_index == npos || this->active_index < items_to_erase | ||
? npos | ||
: this->active_index - items_to_erase; | ||
} | ||
if (max_retained_files > 0) | ||
this->collection.emplace(start_block_num, mapped_type{end_block_num, new_path}); | ||
} | ||
}; | ||
|
||
} // namespace chain | ||
} // namespace eosio |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
#pragma once | ||
#include <boost/iostreams/device/mapped_file.hpp> | ||
|
||
namespace eosio { | ||
namespace chain { | ||
|
||
template <typename T> | ||
T read_buffer(const char* buf) { | ||
T result; | ||
memcpy(&result, buf, sizeof(T)); | ||
return result; | ||
} | ||
|
||
template <typename Derived> | ||
class log_data_base { | ||
protected: | ||
boost::iostreams::mapped_file file; | ||
|
||
const Derived* self() const { return static_cast<const Derived*>(this); } | ||
|
||
public: | ||
using mapmode = boost::iostreams::mapped_file::mapmode; | ||
|
||
log_data_base() = default; | ||
|
||
bool is_open() const { return file.is_open(); } | ||
mapmode flags() const { return file.flags(); } | ||
|
||
const char* data() const { return file.const_data(); } | ||
uint64_t size() const { return file.size(); } | ||
uint32_t last_block_num() const { return self()->block_num_at(last_block_position()); } | ||
uint64_t last_block_position() const { return read_buffer<uint64_t>(data() + size() - sizeof(uint64_t)); } | ||
|
||
uint32_t num_blocks() const { | ||
if (self()->first_block_position() == file.size()) | ||
return 0; | ||
return last_block_num() - self()->first_block_num() + 1; | ||
} | ||
}; | ||
} // namespace chain | ||
} // namespace eosio |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a broad absorption of exceptions. Are there specific exceptions that you expect to get here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is for the validation of log file preamble.