Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

add merge blocklog feature #10078

Merged
merged 2 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 100 additions & 10 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <eosio/chain/thread_utils.hpp>
#include <fc/bitutil.hpp>
#include <fc/io/raw.hpp>
#include <fc/scoped_exit.hpp>
#include <future>
#include <regex>

Expand Down Expand Up @@ -212,9 +213,15 @@ namespace eosio { namespace chain {

class index_writer {
public:
index_writer(const fc::path& block_index_name, uint32_t blocks_expected)
index_writer(const fc::path& block_index_name, uint32_t blocks_expected, bool create=true)
: current_offset(blocks_expected * sizeof(uint64_t)) {
create_mapped_file(index, block_index_name.generic_string(), current_offset);
if (create)
create_mapped_file(index, block_index_name.generic_string(), current_offset);
else {
auto sz = sizeof(uint64_t)*blocks_expected;
bfs::resize_file(block_index_name.generic_string(), sizeof(uint64_t)*blocks_expected);
index.open(block_index_name.generic_string(), sz);
}
}
void write(uint64_t pos) {
current_offset -= sizeof(pos);
Expand Down Expand Up @@ -449,6 +456,10 @@ namespace eosio { namespace chain {
current_position = read_buffer<uint64_t>(addr()) - sizeof(uint64_t);
return *this;
}

bool done() const {
return current_position <= begin_position;
}
};

template <typename BlockLogData>
Expand All @@ -462,6 +473,16 @@ namespace eosio { namespace chain {
return reverse_block_position_iterator<BlockLogData>(t, first_block_position);
}

void adjust_block_positions(index_writer& index, boost::iostreams::mapped_file_sink& block_file, uint64_t first_block_position, int64_t offset) {
// walk along the block position of each block entry and add its value by offset
for (auto itr = make_reverse_block_position_iterator(block_file, first_block_position);
!itr.done(); ++itr) {
auto new_pos = itr.get_value() + offset;
index.write(new_pos);
itr.set_value(new_pos);
}
}

void block_log_data::construct_index(const fc::path& index_file_path) {
std::string index_file_name = index_file_path.generic_string();
ilog("Will write new blocks.index file ${file}", ("file", index_file_name));
Expand Down Expand Up @@ -1138,14 +1159,7 @@ namespace eosio { namespace chain {
ds.write(log_bundle.log_data.data() + first_kept_block_pos, last_block_pos - first_kept_block_pos);

index_writer index(new_index_filename, num_blocks);

// walk along the block position of each block entry and decrement its value by nbytes_to_trim
for (auto itr = make_reverse_block_position_iterator(new_block_file, block_log_preamble::nbytes_with_chain_id);
itr.get_value() != block_log::npos; ++itr) {
auto new_pos = itr.get_value() - nbytes_to_trim;
index.write(new_pos);
itr.set_value(new_pos);
}
adjust_block_positions(index, new_block_file, block_log_preamble::nbytes_with_chain_id, - nbytes_to_trim);
}

bool block_log::trim_blocklog_front(const fc::path& block_dir, const fc::path& temp_dir, uint32_t truncate_at_block) {
Expand Down Expand Up @@ -1282,5 +1296,81 @@ namespace eosio { namespace chain {
extract_blocklog_i(log_bundle, new_block_filename, new_index_filename, start_block_num, num_blocks);
}
}

inline bfs::path operator+(bfs::path left, bfs::path right){return bfs::path(left)+=right;}

void move_blocklog_files(const bfs::path& src_dir, const fc::path& dest_dir, uint32_t start_block, uint32_t end_block) {
auto [log_filename, index_filename] = blocklog_files(dest_dir, start_block, end_block - start_block + 1);
bfs::rename(src_dir / "blocks.log", log_filename);
bfs::rename(src_dir / "blocks.index", index_filename);
}

uint32_t get_blocklog_version(const bfs::path& blocklog_file) {
uint32_t version;
fc::cfile f;
f.set_file_path(blocklog_file.generic_string());
f.open("r");
f.read((char*)&version, sizeof(uint32_t));
return version;
}

void block_log::merge_blocklogs(const fc::path& blocks_dir, const fc::path& dest_dir) {
block_log_catalog catalog;

catalog.open("", blocks_dir, "", "blocks");
if (catalog.collection.size() <= 1) {
wlog("There's no more than one blocklog files in ${blocks_dir}, skip merge.",
("blocks_dir", blocks_dir));
return;
}

fc::temp_directory temp_dir;
bfs::path temp_path = temp_dir.path();
uint32_t start_block, end_block = 0;
uint32_t version;

bfs::path temp_block_log = temp_path / "blocks.log";
bfs::path temp_block_index = temp_path / "blocks.index";

for (auto const& [first_block_num, val] : catalog.collection ) {
if (bfs::exists(temp_block_log)) {
if (first_block_num == end_block + 1) {
block_log_data log_data;
auto ds = log_data.open(val.filename_base + ".log");

if ((version >= pruned_transaction_version && log_data.version() >= pruned_transaction_version) ||
(version < pruned_transaction_version && log_data.version() < pruned_transaction_version) ) {

auto orig_log_size = bfs::file_size(temp_block_log);
bfs::resize_file(temp_block_log, orig_log_size + ds.remaining());
boost::iostreams::mapped_file_sink file(temp_block_log);
memcpy(file.data() + orig_log_size, ds.pos(), ds.remaining());
end_block = val.last_block_num;
index_writer index(temp_block_index, end_block - start_block + 1, false);
adjust_block_positions(index, file, orig_log_size, orig_log_size - (ds.pos() - log_data.data()));
continue;
}
else
wlog("${file}.log has a block log version ${new_version} which is incompatible with previous block log version ${version}, skip merging.",
("file", val.filename_base.generic_string())("new_version", log_data.version())("version", version));
}
else
wlog("${file}.log cannot be merged with previous block log file because of the discontinuity of blocks, skip merging.",
("file", val.filename_base.generic_string()));
// there is a version or block number gap between the stride files
move_blocklog_files(temp_path, dest_dir, start_block, end_block);
}

bfs::copy(val.filename_base + ".log", temp_block_log);
bfs::copy(val.filename_base + ".index", temp_block_index);
start_block = first_block_num;
end_block = val.last_block_num;
version = get_blocklog_version(temp_block_log);
}

if (bfs::exists(temp_block_log)) {
move_blocklog_files(temp_path, dest_dir, start_block, end_block);
}
}
} // namespace chain
} // namespace eosio
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ namespace eosio { namespace chain {
static void extract_blocklog(const fc::path& log_filename, const fc::path& index_filename,
const fc::path& dest_dir, uint32_t start_block, uint32_t num_blocks);
static void split_blocklog(const fc::path& block_dir, const fc::path& dest_dir, uint32_t stride);
static void merge_blocklogs(const fc::path& block_dir, const fc::path& dest_dir);

private:
std::unique_ptr<detail::block_log_impl> my;
Expand Down
9 changes: 9 additions & 0 deletions programs/eosio-blocklog/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct blocklog {
bool prune_transactions = false;
bool help = false;
bool extract_blocklog = false;
bool merge_blocklogs = false;
uint32_t blocklog_split_stride = 0;
};

Expand Down Expand Up @@ -213,6 +214,9 @@ void blocklog::set_program_options(options_description& cli)
("extract-blocklog", bpo::bool_switch(&extract_blocklog)->default_value(false),
"Extract blocks from blocks.log and blocks.index and keep the original."
" Must give 'blocks-dir' or 'blocks-filebase','output-dir', 'first' and 'last'.")
("merge-blocklogs", bpo::bool_switch(&merge_blocklogs)->default_value(false),
"Merge block log files in 'blocks-dir' with the file pattern 'blocks-\\d+-\\d+.[log,index]' to 'output-dir' whenever possible."
"The files in 'blocks-dir' will be kept without change. Must give 'blocks-dir' and 'output-dir'.")
("help,h", bpo::bool_switch(&help)->default_value(false), "Print this help message and exit.")
;
}
Expand Down Expand Up @@ -407,6 +411,11 @@ int main(int argc, char** argv) {
return 0;
}

if (blog.merge_blocklogs) {
block_log::merge_blocklogs(blocks_dir, output_dir);
return 0;
}

// else print blocks.log as JSON
blog.initialize(vmap);
blog.read_log();
Expand Down
30 changes: 27 additions & 3 deletions unittests/restart_chain_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,10 +621,9 @@ BOOST_AUTO_TEST_CASE(test_trim_blocklog_front_v3) {
trim_blocklog_front(3);
}

BOOST_AUTO_TEST_CASE(test_split_log_util2) {
BOOST_AUTO_TEST_CASE(test_blocklog_split_then_merge) {
namespace bfs = boost::filesystem;


tester chain;
chain.produce_blocks(160);
chain.close();
Expand All @@ -651,7 +650,32 @@ BOOST_AUTO_TEST_CASE(test_split_log_util2) {
block_log blog({.log_dir = blocks_dir, .retained_dir = retained_dir });

BOOST_CHECK(blog.version() != 0);
BOOST_CHECK(blog.head().get());
BOOST_CHECK_EQUAL(blog.head()->block_num(), 150);

// test blocklog merge
fc::temp_directory dest_dir;
BOOST_CHECK_NO_THROW(block_log::merge_blocklogs(retained_dir, dest_dir.path()));
BOOST_CHECK(bfs::exists( dest_dir.path() / "blocks-50-150.log" ));

if (bfs::exists( dest_dir.path() / "blocks-50-150.log" )) {
bfs::rename(dest_dir.path() / "blocks-50-150.log", dest_dir.path() / "blocks.log");
bfs::rename(dest_dir.path() / "blocks-50-150.index", dest_dir.path() / "blocks.index");
BOOST_CHECK_NO_THROW(block_log::smoke_test(dest_dir.path(), 1));
}

bfs::remove(dest_dir.path() / "blocks.log");

// test blocklog merge with gap
bfs::remove(retained_dir / "blocks-51-100.log");
bfs::remove(retained_dir / "blocks-51-100.index");

BOOST_CHECK_NO_THROW(block_log::merge_blocklogs(retained_dir, dest_dir.path()));
BOOST_CHECK(bfs::exists( dest_dir.path() / "blocks-50-50.log" ));
BOOST_CHECK(bfs::exists( dest_dir.path() / "blocks-50-50.index" ));

BOOST_CHECK(bfs::exists( dest_dir.path() / "blocks-101-150.log" ));
BOOST_CHECK(bfs::exists( dest_dir.path() / "blocks-101-150.index" ));

}

BOOST_AUTO_TEST_SUITE_END()