Skip to content

Commit

Permalink
generate pbft watermark from forkdb.
Browse files Browse the repository at this point in the history
  • Loading branch information
oldcold authored and VincentOCL committed May 22, 2019
1 parent cf75956 commit 3c45326
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 102 deletions.
30 changes: 5 additions & 25 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ struct controller_impl {
bool pbft_upgrading = false;
optional<block_id_type> pending_pbft_lib;
optional<block_id_type> pending_pbft_checkpoint;
vector<block_num_type> proposed_schedule_blocks;
vector<block_num_type> promoted_schedule_blocks;
block_state_ptr pbft_prepared;
block_state_ptr my_prepare;
block_state_ptr head;
Expand Down Expand Up @@ -1390,14 +1388,8 @@ struct controller_impl {
auto lscb_num = pending->_pending_block_state->pbft_stable_checkpoint_blocknum;

if (pbft_enabled && gpo.proposed_schedule_block_num) {
proposed_schedule_blocks.emplace_back(*gpo.proposed_schedule_block_num);
for ( auto itr = proposed_schedule_blocks.begin(); itr != proposed_schedule_blocks.end();) {
if ((*itr) < lscb_num) {
itr = proposed_schedule_blocks.erase(itr);
} else {
++itr;
}
}
auto bs = fork_db.get_block_in_current_chain_by_num(*gpo.proposed_schedule_block_num);
if (bs) fork_db.mark_as_pbft_watermark(bs);
}

bool should_promote_pending_schedule = false;
Expand Down Expand Up @@ -1429,14 +1421,7 @@ struct controller_impl {
pending->_pending_block_state->set_new_producers(gpo.proposed_schedule);

if (pbft_enabled) {
promoted_schedule_blocks.emplace_back(pending->_pending_block_state->block_num);
for ( auto itr = promoted_schedule_blocks.begin(); itr != promoted_schedule_blocks.end();) {
if ((*itr) < lscb_num) {
itr = promoted_schedule_blocks.erase(itr);
} else {
++itr;
}
}
pending->_pending_block_state->pbft_watermark = true;
}
}
db.modify( gpo, [&]( auto& gp ) {
Expand Down Expand Up @@ -2343,13 +2328,8 @@ block_id_type controller::last_stable_checkpoint_block_id() const {
return block_id_type{};
}


vector<uint32_t> controller::proposed_schedule_block_nums() const {
return my->proposed_schedule_blocks;
}

vector<uint32_t> controller::promoted_schedule_block_nums() const {
return my->promoted_schedule_blocks;
vector<uint32_t> controller::get_watermarks() const {
return my->fork_db.get_watermarks_in_forkdb();
}

bool controller::is_replaying() const {
Expand Down
39 changes: 32 additions & 7 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace eosio { namespace chain {
struct by_block_id;
struct by_block_num;
struct by_lib_block_num;
struct by_watermark;
struct by_prev;
typedef multi_index_container<
block_state_ptr,
Expand All @@ -32,13 +33,20 @@ namespace eosio { namespace chain {
>,
ordered_non_unique< tag<by_lib_block_num>,
composite_key< block_state,
member<block_header_state,uint32_t,&block_header_state::dpos_irreversible_blocknum>,
member<block_header_state,uint32_t,&block_header_state::bft_irreversible_blocknum>,
member<block_state,bool,&block_state::pbft_prepared>,
member<block_state,bool,&block_state::pbft_my_prepare>,
member<block_header_state,uint32_t,&block_header_state::block_num>
member<block_header_state,uint32_t,&block_header_state::dpos_irreversible_blocknum>,
member<block_header_state,uint32_t,&block_header_state::bft_irreversible_blocknum>,
member<block_state,bool,&block_state::pbft_prepared>,
member<block_state,bool,&block_state::pbft_my_prepare>,
member<block_header_state,uint32_t,&block_header_state::block_num>
>,
composite_key_compare< std::greater<uint32_t>, std::greater<uint32_t>, std::greater<bool>, std::greater<bool>, std::greater<uint32_t> >
>,
ordered_non_unique< tag<by_watermark>,
composite_key< block_state,
member<block_state,bool,&block_state::pbft_watermark>,
member<block_header_state,uint32_t,&block_header_state::block_num>
>,
composite_key_compare< std::greater<>, std::greater<uint32_t> >
>
>
> fork_multi_index_type;
Expand Down Expand Up @@ -202,7 +210,6 @@ namespace eosio { namespace chain {

auto prior = my->index.find( n->block->previous );

//TODO: to be optimised.
if (prior != my->index.end()) {
if ((*prior)->pbft_prepared) mark_pbft_prepared_fork(*prior);
if ((*prior)->pbft_my_prepare) mark_pbft_my_prepare_fork(*prior);
Expand Down Expand Up @@ -601,5 +608,23 @@ namespace eosio { namespace chain {
}
}

vector<block_num_type> fork_database::get_watermarks_in_forkdb() {
vector<block_num_type> watermarks;
auto& pidx = my->index.get<by_watermark>();
auto pitr = pidx.begin();
while (pitr != pidx.end() && (*pitr)->pbft_watermark) {
watermarks.emplace_back((*pitr)->block_num);
}
return watermarks;
}

void fork_database::mark_as_pbft_watermark( const block_state_ptr& h) {
auto& by_id_idx = my->index.get<by_block_id>();
auto itr = by_id_idx.find( h->id );
EOS_ASSERT( itr != by_id_idx.end(), fork_db_block_not_found, "could not find block in fork database" );
by_id_idx.modify( itr, [&]( auto& bsp ) { bsp->pbft_watermark = true; });
}



} } /// eosio::chain
} } /// eosio::chain
3 changes: 2 additions & 1 deletion libraries/chain/include/eosio/chain/block_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace eosio { namespace chain {
bool in_current_chain = false;
bool pbft_prepared = false;
bool pbft_my_prepare = false;
bool pbft_watermark = false;

/// this data is redundant with the data stored in block, but facilitates
/// recapturing transactions when we pop a block
Expand All @@ -33,4 +34,4 @@ namespace eosio { namespace chain {

} } /// namespace eosio::chain

FC_REFLECT_DERIVED( eosio::chain::block_state, (eosio::chain::block_header_state), (block)(validated)(in_current_chain)(pbft_prepared)(pbft_my_prepare) )
FC_REFLECT_DERIVED( eosio::chain::block_state, (eosio::chain::block_header_state), (block)(validated)(in_current_chain)(pbft_prepared)(pbft_my_prepare)(pbft_watermark) )
3 changes: 1 addition & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ namespace eosio { namespace chain {

bool pending_pbft_lib();

vector<uint32_t> proposed_schedule_block_nums()const;
vector<uint32_t> promoted_schedule_block_nums()const;
vector<uint32_t> get_watermarks() const;

void set_pbft_latest_checkpoint( const block_id_type& id );
uint32_t last_stable_checkpoint_block_num()const;
Expand Down
4 changes: 4 additions & 0 deletions libraries/chain/include/eosio/chain/fork_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ namespace eosio { namespace chain {

void remove_pbft_prepared_fork();

vector<block_num_type> get_watermarks_in_forkdb();

void mark_as_pbft_watermark( const block_state_ptr& h);

private:
unique_ptr<fork_database_impl> my;
};
Expand Down
3 changes: 3 additions & 0 deletions libraries/chain/include/eosio/chain/pbft_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ namespace eosio {

block_num_type get_current_pbft_watermark();

void update_fork_schedules();

private:
controller &ctrl;
pbft_state_multi_index_type pbft_state_index;
Expand All @@ -663,6 +665,7 @@ namespace eosio {
fc::path checkpoints_dir;
boost::uuids::random_generator uuid_generator;
vector<block_num_type> prepare_watermarks;
flat_map<public_key_type, uint32_t> fork_schedules;

bool is_valid_prepared_certificate(const pbft_prepared_certificate &certificate);

Expand Down
121 changes: 54 additions & 67 deletions libraries/chain/pbft_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ namespace eosio {
set(std::make_shared<pbft_state>(move(s)));
}

unsigned_int watermarks_size;
fc::raw::unpack(ds, watermarks_size);
for (uint32_t i = 0, n = watermarks_size.value; i < n; ++i) {
block_num_type h;
fc::raw::unpack(ds, h);
prepare_watermarks.emplace_back(h);
}
sort(prepare_watermarks.begin(), prepare_watermarks.end());

} else {
pbft_state_index = pbft_state_multi_index_type{};
}
Expand Down Expand Up @@ -72,7 +63,6 @@ namespace eosio {

void pbft_database::close() {


fc::path checkpoints_db = checkpoints_dir / config::checkpoints_filename;
std::ofstream c_out(checkpoints_db.generic_string().c_str(),
std::ios::out | std::ios::binary | std::ofstream::trunc);
Expand All @@ -94,24 +84,14 @@ namespace eosio {
fc::raw::pack(out, *s);
}


uint32_t watermarks_size = prepare_watermarks.size();
fc::raw::pack(out, unsigned_int{watermarks_size});

for (const auto &n: prepare_watermarks) {
fc::raw::pack(out, n);
}

pbft_state_index.clear();
checkpoint_index.clear();
prepare_watermarks.clear();
}

pbft_database::~pbft_database() {
close();
}


void pbft_database::add_pbft_prepare(pbft_prepare &p) {

if (!is_valid_prepare(p)) return;
Expand Down Expand Up @@ -701,6 +681,7 @@ namespace eosio {
ccb.emplace_back(highest_committed_block_num);
}

update_fork_schedules();
for (auto i = 0; i < prepare_watermarks.size() && prepare_watermarks[i] < highest_committed_block_num; ++i) {
//adding committed cert on every water mark.
ccb.emplace_back(prepare_watermarks[i]);
Expand Down Expand Up @@ -1157,7 +1138,6 @@ namespace eosio {

if (lscb_num == 0) {
const auto& ucb = ctrl.get_upgrade_properties().upgrade_complete_block_num;
// if (ucb == 0) return lscb_info;
if (ucb == 0) {
current_schedule = ctrl.initial_schedule();
new_schedule = ctrl.initial_schedule();
Expand Down Expand Up @@ -1209,6 +1189,7 @@ namespace eosio {

for (auto i = psp->block_num;
i > std::max(ctrl.last_stable_checkpoint_block_num(), static_cast<uint32_t>(1)); --i) {
update_fork_schedules();
if (checkpoint(i)) {
my_latest_checkpoint = max(i, my_latest_checkpoint);
auto &by_block = checkpoint_index.get<by_block_id>();
Expand Down Expand Up @@ -1410,33 +1391,21 @@ namespace eosio {

bool pbft_database::should_send_pbft_msg() {

//use last_stable_checkpoint producer schedule

auto as = lscb_active_producers();
auto my_sp = ctrl.my_signature_providers();

for (auto i: prepare_watermarks) {
for (auto const &bp: as.producers) {
for (auto const &my: my_sp) {
if (bp.block_signing_key == my.first) return true;
}
update_fork_schedules();
for (auto const &bp: fork_schedules) {
for (auto const &my: my_sp) {
if (bp.first == my.first) return true;
}
auto bs = ctrl.fetch_block_state_by_number(i);
if (bs && bs->active_schedule != as) as = bs->active_schedule;
}
return false;
}

bool pbft_database::should_recv_pbft_msg(const public_key_type &pub_key) {

auto as = lscb_active_producers();

for (auto i: prepare_watermarks) {
for (auto const &bp: as.producers) {
if (bp.block_signing_key == pub_key) return true;
}
auto bs = ctrl.fetch_block_state_by_number(i);
if (bs && bs->active_schedule != as) as = bs->active_schedule;
update_fork_schedules();
for (auto const &bp: fork_schedules) {
if (bp.first == pub_key) return true;
}
return false;
}
Expand Down Expand Up @@ -1470,34 +1439,8 @@ namespace eosio {
}

block_num_type pbft_database::get_current_pbft_watermark() {
auto unique_merge = [&](vector<block_num_type> &v1, vector<block_num_type> &v2)
{
std::sort(v1.begin(), v1.end());
v1.reserve(v1.size() + v2.size());
v1.insert(v1.end(), v2.begin(), v2.end());

sort( v1.begin(), v1.end() );
v1.erase( unique( v1.begin(), v1.end() ), v1.end() );
};

update_fork_schedules();
auto lib = ctrl.last_irreversible_block_num();
auto lscb = ctrl.last_stable_checkpoint_block_num();

auto proposed_schedule_blocks = ctrl.proposed_schedule_block_nums();
auto promoted_schedule_blocks = ctrl.promoted_schedule_block_nums();
unique_merge(prepare_watermarks, proposed_schedule_blocks);
unique_merge(prepare_watermarks, promoted_schedule_blocks);


for ( auto itr = prepare_watermarks.begin(); itr != prepare_watermarks.end();) {
if ((*itr) <= lscb) {
itr = prepare_watermarks.erase(itr);
} else {
++itr;
}
}
std::sort(prepare_watermarks.begin(), prepare_watermarks.end());


if (prepare_watermarks.empty()) return 0;

Expand All @@ -1506,6 +1449,50 @@ namespace eosio {
if (cw > lib) return cw; else return 0;
}

void pbft_database::update_fork_schedules() {

auto vector_diff = [&](vector<block_num_type> &v1, vector<block_num_type> &v2)
{
vector<block_num_type> diff;
std::set_difference(v1.begin(), v1.end(), v2.begin(), v2.end(),
std::inserter(diff, diff.begin()));
return diff;
};

auto watermarks = ctrl.get_watermarks();

if (watermarks != prepare_watermarks) {
auto prev = prepare_watermarks;
prepare_watermarks = watermarks;
std::sort(prepare_watermarks.begin(), prepare_watermarks.end());
auto added = vector_diff(prepare_watermarks, prev);
auto removed = vector_diff(prev, prepare_watermarks);
for (auto i: added) {
if (auto bs = ctrl.fetch_block_state_by_number(i)) {
auto as = bs->active_schedule.producers;
for (auto &bp: as) {
auto key = bp.block_signing_key;
if (fork_schedules.find(key) == fork_schedules.end()) {
fork_schedules[key] = i;
} else if ( i > fork_schedules[key]) {
fork_schedules[key] = i;
}
}
}
}
if (!removed.empty()) {
auto pruned_num = *max_element(removed.begin(), removed.end());
for (auto itr = fork_schedules.begin(); itr != fork_schedules.end();) {
if ((*itr).second <= pruned_num) {
itr = fork_schedules.erase(itr);
} else {
++itr;
}
}
}
}
}

pbft_state_ptr pbft_database::get_pbft_state_by_id(const block_id_type& id) const {

auto &by_block_id_index = pbft_state_index.get<by_block_id>();
Expand Down

0 comments on commit 3c45326

Please sign in to comment.