Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #8012 from EOSIO/7939-trim-block-log-v3-support
Browse files Browse the repository at this point in the history
7939 trim block log v3 support
  • Loading branch information
brianjohnson5972 authored Oct 3, 2019
2 parents b65629c + 9de6bbb commit a69c9db
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 300 deletions.
290 changes: 284 additions & 6 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ namespace eosio { namespace chain {
uint64_t previous();
uint32_t version() const { return _version; }
uint32_t first_block_num() const { return _first_block_num; }
constexpr static uint32_t _buf_len = 1U << 24;
private:
void update_buffer();

Expand All @@ -118,9 +119,7 @@ namespace eosio { namespace chain {
std::unique_ptr<char[]> _buffer_ptr;
std::string _block_file_name;
constexpr static int64_t _unset_position = -1;
constexpr static uint32_t _buf_len = 1U << 24;
constexpr static uint64_t _position_size = sizeof(_current_position_in_file);
constexpr static int _blknum_offset_from_pos = 14; //offset from start of block to 4 byte block number, valid for the only allowed versions
};

constexpr uint64_t buffer_location_to_file_location(uint32_t buffer_location) { return buffer_location << 3; }
Expand All @@ -132,6 +131,7 @@ namespace eosio { namespace chain {
void write(uint64_t pos);
void complete();
void update_buffer_position();
constexpr static uint64_t _buffer_bytes = 1U << 22;
private:
void prepare_buffer();
bool shift_buffer();
Expand All @@ -144,9 +144,41 @@ namespace eosio { namespace chain {
int64_t _current_position = 0;
int64_t _start_of_buffer_position = 0;
int64_t _end_of_buffer_position = 0;
constexpr static uint64_t _buffer_bytes = 1U << 22;
constexpr static uint64_t _max_buffer_length = file_location_to_buffer_location(_buffer_bytes);
};

/*
* @brief datastream adapter that adapts FILE* for use with fc unpack
*
* This class supports unpack functionality but not pack.
*/
class fileptr_datastream {
public:
explicit fileptr_datastream( FILE* file, const std::string& filename ) : _file(file), _filename(filename) {}

void skip( size_t s ) {
auto status = fseek(_file, s, SEEK_CUR);
EOS_ASSERT( status == 0, block_log_exception,
"Could not seek past ${bytes} bytes in Block log file at '${blocks_log}'. Returned status: ${status}",
("bytes", s)("blocks_log", _filename)("status", status) );
}

bool read( char* d, size_t s ) {
size_t result = fread( d, 1, s, _file );
EOS_ASSERT( result == s, block_log_exception,
"only able to read ${act} bytes of the expected ${exp} bytes in file: ${file}",
("act",result)("exp",s)("file", _filename) );
return true;
}

bool get( unsigned char& c ) { return get( *(char*)&c ); }

bool get( char& c ) { return read(&c, 1); }

private:
FILE* const _file;
const std::string _filename;
};
}

block_log::block_log(const fc::path& data_dir)
Expand Down Expand Up @@ -342,6 +374,8 @@ namespace eosio { namespace chain {
}

void block_log::reset( const chain_id_type& chain_id, uint32_t first_block_num ) {
EOS_ASSERT( first_block_num > 1, block_log_exception,
"Block log version ${ver} needs to be created with a genesis state if starting from block number 1." );
my->reset(chain_id, signed_block_ptr(), first_block_num);
}

Expand Down Expand Up @@ -531,6 +565,15 @@ namespace eosio { namespace chain {
uint32_t first_block_num = 1;
if (version != 1) {
old_block_stream.read ( (char*)&first_block_num, sizeof(first_block_num) );

// this assert is only here since repair_log is only used for --hard-replay-blockchain, which removes any
// existing state, if another API needs to use it, this can be removed and the check for the first block's
// previous block id will need to accommodate this.
EOS_ASSERT( first_block_num == 1, block_log_exception,
"Block log ${file} must contain a genesis state and start at block number 1. This block log "
"starts at block number ${first_block_num}.",
("file", (backup_dir / "blocks.log").generic_string())("first_block_num", first_block_num));

new_block_stream.write( (char*)&first_block_num, sizeof(first_block_num) );
}

Expand All @@ -547,6 +590,12 @@ namespace eosio { namespace chain {

new_block_stream << chain_id;
}
else {
EOS_THROW( block_log_exception,
"Block log ${file} is not supported. version: ${ver} and first_block_num: ${fbn} does not contain "
"a genesis_state nor a chain_id.",
("file", (backup_dir / "blocks.log").generic_string())("ver", version)("fbn", first_block_num));
}

if (version != 1) {
auto expected_totem = npos;
Expand Down Expand Up @@ -751,10 +800,10 @@ namespace eosio { namespace chain {
uint32_t bnum = 0;
if (block_pos >= _start_of_buffer_position) {
const uint32_t index_of_block = block_pos - _start_of_buffer_position;
bnum = *reinterpret_cast<uint32_t*>(buf + index_of_block + _blknum_offset_from_pos); //block number of previous block (is big endian)
bnum = *reinterpret_cast<uint32_t*>(buf + index_of_block + trim_data::blknum_offset); //block number of previous block (is big endian)
}
else {
const auto blknum_offset_pos = block_pos + _blknum_offset_from_pos;
const auto blknum_offset_pos = block_pos + trim_data::blknum_offset;
auto status = fseek(_file.get(), blknum_offset_pos, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "Could not seek in '${blocks_log}' to position: ${pos}. Returned status: ${status}", ("blocks_log", _block_file_name)("pos", blknum_offset_pos)("status", status) );
auto size = fread((void*)&bnum, sizeof(bnum), 1, _file.get());
Expand Down Expand Up @@ -906,4 +955,233 @@ namespace eosio { namespace chain {
bool block_log::is_supported_version(uint32_t version) {
return std::clamp(version, min_supported_version, max_supported_version) == version;
}
} } /// eosio::chain

bool block_log::trim_blocklog_front(const fc::path& block_dir, const fc::path& temp_dir, uint32_t truncate_at_block) {
using namespace std;
EOS_ASSERT( block_dir != temp_dir, block_log_exception, "block_dir and temp_dir need to be different directories" );
ilog("In directory ${dir} will trim all blocks before block ${n} from blocks.log and blocks.index.",
("dir", block_dir.generic_string())("n", truncate_at_block));
trim_data original_block_log(block_dir);
if (truncate_at_block <= original_block_log.first_block) {
ilog("There are no blocks before block ${n} so do nothing.", ("n", truncate_at_block));
return false;
}
if (truncate_at_block > original_block_log.last_block) {
ilog("All blocks are before block ${n} so do nothing (trim front would delete entire blocks.log).", ("n", truncate_at_block));
return false;
}

// ****** create the new block log file and write out the header for the file
fc::create_directories(temp_dir);
fc::path new_block_filename = temp_dir / "blocks.log";
if (fc::remove(new_block_filename)) {
ilog("Removing old blocks.out file");
}
fc::cfile new_block_file;
new_block_file.set_file_path(new_block_filename);
// need to open as append since the file doesn't already exist, then reopen without append to allow writing the
// file in any order
new_block_file.open( LOG_WRITE_C );
new_block_file.close();
new_block_file.open( LOG_RW_C );

static_assert( block_log::max_supported_version == 3,
"Code was written to support version 3 format, need to update this code for latest format." );
uint32_t version = block_log::max_supported_version;
new_block_file.seek(0);
new_block_file.write((char*)&version, sizeof(version));
new_block_file.write((char*)&truncate_at_block, sizeof(truncate_at_block));

new_block_file << original_block_log.chain_id;

// append a totem to indicate the division between blocks and header
auto totem = block_log::npos;
new_block_file.write((char*)&totem, sizeof(totem));

const auto new_block_file_first_block_pos = new_block_file.tellp();
// ****** end of new block log header

// copy over remainder of block log to new block log
auto buffer = make_unique<char[]>(detail::reverse_iterator::_buf_len);
char* buf = buffer.get();

// offset bytes to shift from old blocklog position to new blocklog position
const uint64_t original_file_block_pos = original_block_log.block_pos(truncate_at_block);
const uint64_t pos_delta = original_file_block_pos - new_block_file_first_block_pos;
auto status = fseek(original_block_log.blk_in, 0, SEEK_END);
EOS_ASSERT( status == 0, block_log_exception, "blocks.log seek failed" );

// all blocks to copy to the new blocklog
const uint64_t to_write = ftell(original_block_log.blk_in) - original_file_block_pos;
const auto pos_size = sizeof(uint64_t);

// start with the last block's position stored at the end of the block
uint64_t original_pos = ftell(original_block_log.blk_in) - pos_size;

const auto num_blocks = original_block_log.last_block - truncate_at_block + 1;

fc::path new_index_filename = temp_dir / "blocks.index";
detail::index_writer index(new_index_filename, num_blocks);

uint64_t read_size = 0;
for(uint64_t to_write_remaining = to_write; to_write_remaining > 0; to_write_remaining -= read_size) {
read_size = to_write_remaining;
if (read_size > detail::reverse_iterator::_buf_len) {
read_size = detail::reverse_iterator::_buf_len;
}

// read in the previous contiguous memory into the read buffer
const auto start_of_blk_buffer_pos = original_file_block_pos + to_write_remaining - read_size;
status = fseek(original_block_log.blk_in, start_of_blk_buffer_pos, SEEK_SET);
const auto num_read = fread(buf, read_size, 1, original_block_log.blk_in);
EOS_ASSERT( num_read == 1, block_log_exception, "blocks.log read failed" );

// walk this memory section to adjust block position to match the adjusted location
// of the block start and store in the new index file
while(original_pos >= start_of_blk_buffer_pos) {
const auto buffer_index = original_pos - start_of_blk_buffer_pos;
uint64_t& pos_content = *(uint64_t*)(buf + buffer_index);
const auto start_of_this_block = pos_content;
pos_content = start_of_this_block - pos_delta;
index.write(pos_content);
original_pos = start_of_this_block - pos_size;
}
new_block_file.seek(new_block_file_first_block_pos + to_write_remaining - read_size);
new_block_file.write(buf, read_size);
}
index.complete();
fclose(original_block_log.blk_in);
original_block_log.blk_in = nullptr;
new_block_file.flush();
new_block_file.close();

fc::path old_log = temp_dir / "old.log";
rename(original_block_log.block_file_name, old_log);
rename(new_block_filename, original_block_log.block_file_name);
fc::path old_ind = temp_dir / "old.index";
rename(original_block_log.index_file_name, old_ind);
rename(new_index_filename, original_block_log.index_file_name);

return true;
}

trim_data::trim_data(fc::path block_dir) {

// code should follow logic in block_log::repair_log

using namespace std;
block_file_name = block_dir / "blocks.log";
index_file_name = block_dir / "blocks.index";
blk_in = FC_FOPEN(block_file_name.generic_string().c_str(), "rb");
EOS_ASSERT( blk_in != nullptr, block_log_not_found, "cannot read file ${file}", ("file",block_file_name.string()) );
ind_in = FC_FOPEN(index_file_name.generic_string().c_str(), "rb");
EOS_ASSERT( ind_in != nullptr, block_log_not_found, "cannot read file ${file}", ("file",index_file_name.string()) );
auto size = fread((void*)&version,sizeof(version), 1, blk_in);
EOS_ASSERT( size == 1, block_log_unsupported_version, "invalid format for file ${file}", ("file",block_file_name.string()));
ilog("block log version= ${version}",("version",version));
EOS_ASSERT( block_log::is_supported_version(version), block_log_unsupported_version, "block log version ${v} is not supported", ("v",version));

detail::fileptr_datastream ds(blk_in, block_file_name.string());
if (version == 1) {
first_block = 1;
genesis_state gs;
fc::raw::unpack(ds, gs);
chain_id = gs.compute_chain_id();
}
else {
size = fread((void *) &first_block, sizeof(first_block), 1, blk_in);
EOS_ASSERT(size == 1, block_log_exception, "invalid format for file ${file}",
("file", block_file_name.string()));
if (block_log::contains_genesis_state(version, first_block)) {
genesis_state gs;
fc::raw::unpack(ds, gs);
chain_id = gs.compute_chain_id();
}
else if (block_log::contains_chain_id(version, first_block)) {
ds >> chain_id;
}
else {
EOS_THROW( block_log_exception,
"Block log ${file} is not supported. version: ${ver} and first_block: ${first_block} does not contain "
"a genesis_state nor a chain_id.",
("file", block_file_name.string())("ver", version)("first_block", first_block));
}

const auto expected_totem = block_log::npos;
std::decay_t<decltype(block_log::npos)> actual_totem;
size = fread ( (char*)&actual_totem, sizeof(actual_totem), 1, blk_in);

EOS_ASSERT(size == 1, block_log_exception,
"Expected to read ${size} bytes, but did not read any bytes", ("size", sizeof(actual_totem)));
EOS_ASSERT(actual_totem == expected_totem, block_log_exception,
"Expected separator between block log header and blocks was not found( expected: ${e}, actual: ${a} )",
("e", fc::to_hex((char*)&expected_totem, sizeof(expected_totem) ))("a", fc::to_hex((char*)&actual_totem, sizeof(actual_totem) )));
}

const uint64_t start_of_blocks = ftell(blk_in);

const auto status = fseek(ind_in, 0, SEEK_END); //get length of blocks.index (gives number of blocks)
EOS_ASSERT( status == 0, block_log_exception, "cannot seek to ${file} end", ("file", index_file_name.string()) );
const uint64_t file_end = ftell(ind_in); //get length of blocks.index (gives number of blocks)
last_block = first_block + file_end/sizeof(uint64_t) - 1;

first_block_pos = block_pos(first_block);
EOS_ASSERT(start_of_blocks == first_block_pos, block_log_exception,
"Block log ${file} was determined to have its first block at ${determined}, but the block index "
"indicates the first block is at ${index}",
("file", block_file_name.string())("determined", start_of_blocks)("index",first_block_pos));
ilog("first block= ${first}",("first",first_block));
ilog("last block= ${last}",("last",last_block));
}

trim_data::~trim_data() {
if (blk_in != nullptr)
fclose(blk_in);
if (ind_in != nullptr)
fclose(ind_in);
}

uint64_t trim_data::block_index(uint32_t n) const {
using namespace std;
EOS_ASSERT( first_block <= n, block_log_exception,
"cannot seek in ${file} to block number ${b}, block number ${first} is the first block",
("file", index_file_name.string())("b",n)("first",first_block) );
EOS_ASSERT( n <= last_block, block_log_exception,
"cannot seek in ${file} to block number ${b}, block number ${last} is the last block",
("file", index_file_name.string())("b",n)("last",last_block) );
return sizeof(uint64_t) * (n - first_block);
}

uint64_t trim_data::block_pos(uint32_t n) {
using namespace std;
// can indicate the location of the block after the last block
if (n == last_block + 1) {
return ftell(blk_in);
}
const uint64_t index_pos = block_index(n);
auto status = fseek(ind_in, index_pos, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "cannot seek to ${file} ${pos} from beginning of file for block ${b}", ("file", index_file_name.string())("pos", index_pos)("b",n) );
const uint64_t pos = ftell(ind_in);
EOS_ASSERT( pos == index_pos, block_log_exception, "cannot seek to ${file} entry for block ${b}", ("file", index_file_name.string())("b",n) );
uint64_t block_n_pos;
auto size = fread((void*)&block_n_pos, sizeof(block_n_pos), 1, ind_in); //filepos of block n
EOS_ASSERT( size == 1, block_log_exception, "cannot read ${file} entry for block ${b}", ("file", index_file_name.string())("b",n) );

//read blocks.log and verify block number n is found at the determined file position
const auto calc_blknum_pos = block_n_pos + blknum_offset;
status = fseek(blk_in, calc_blknum_pos, SEEK_SET);
EOS_ASSERT( status == 0, block_log_exception, "cannot seek to ${file} ${pos} from beginning of file", ("file", block_file_name.string())("pos", calc_blknum_pos) );
const uint64_t block_offset_pos = ftell(blk_in);
EOS_ASSERT( block_offset_pos == calc_blknum_pos, block_log_exception, "cannot seek to ${file} ${pos} from beginning of file", ("file", block_file_name.string())("pos", calc_blknum_pos) );
uint32_t prior_blknum;
size = fread((void*)&prior_blknum, sizeof(prior_blknum), 1, blk_in); //read bigendian block number of prior block
EOS_ASSERT( size == 1, block_log_exception, "cannot read prior block");
const uint32_t bnum = fc::endian_reverse_u32(prior_blknum) + 1; //convert to little endian, add 1 since prior block
EOS_ASSERT( bnum == n, block_log_exception,
"At position ${pos} in ${file} expected to find ${exp_bnum} but found ${act_bnum}",
("pos",block_offset_pos)("file", block_file_name.string())("exp_bnum",n)("act_bnum",bnum) );

return block_n_pos;
}

} } /// eosio::chain
Loading

0 comments on commit a69c9db

Please sign in to comment.