Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

fix producer_plugin watermark tracking - develop #7702

Merged
merged 2 commits into from
Jul 30, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 34 additions & 27 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
std::vector<chain::digest_type> _protocol_features_to_activate;
bool _protocol_features_signaled = false; // to mark whether it has been signaled in start_block

time_point _start_time = fc::time_point::now();

producer_plugin* _self = nullptr;
chain_plugin* chain_plug = nullptr;

Expand All @@ -206,6 +204,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
pending_snapshot_index _pending_snapshot_index;

fc::optional<scoped_connection> _accepted_block_connection;
fc::optional<scoped_connection> _accepted_block_header_connection;
fc::optional<scoped_connection> _irreversible_block_connection;

/*
Expand All @@ -227,20 +226,29 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// path to write the snapshots to
bfs::path _snapshots_dir;

void consider_new_watermark( account_name producer, uint32_t block_num ) {
auto itr = _producer_watermarks.find( producer );
if( itr != _producer_watermarks.end() ) {
itr->second = std::max( itr->second, block_num );
} else if( _producers.count( producer ) > 0 ) {
_producer_watermarks.emplace( producer, block_num );
}
}

std::optional<uint32_t> get_watermark( account_name producer ) const {
auto itr = _producer_watermarks.find( producer );

if( itr == _producer_watermarks.end() ) return {};

return itr->second;
}

void on_block( const block_state_ptr& bsp ) {
_unapplied_transactions.clear_applied( bsp->trxs );
}

if( bsp->header.timestamp <= _start_time ) return;

// simplify handling of watermark in on_block
auto block_producer = bsp->header.producer;
auto watermark_itr = _producer_watermarks.find( block_producer );
if( watermark_itr != _producer_watermarks.end() ) {
watermark_itr->second = bsp->block_num;
} else if( _producers.count( block_producer ) > 0 ) {
_producer_watermarks.emplace( block_producer, bsp->block_num );
}
void on_block_header( const block_state_ptr& bsp ) {
consider_new_watermark( bsp->header.producer, bsp->block_num );
}

void on_irreversible_block( const signed_block_ptr& lib ) {
Expand Down Expand Up @@ -835,6 +843,7 @@ void producer_plugin::plugin_startup()
"node cannot have any producer-name configured because block production is not safe when validation_mode is not \"full\"" );

my->_accepted_block_connection.emplace(chain.accepted_block.connect( [this]( const auto& bsp ){ my->on_block( bsp ); } ));
my->_accepted_block_header_connection.emplace(chain.accepted_block_header.connect( [this]( const auto& bsp ){ my->on_block_header( bsp ); } ));
my->_irreversible_block_connection.emplace(chain.irreversible_block.connect( [this]( const auto& bsp ){ my->on_irreversible_block( bsp->block ); } ));

const auto lib_num = chain.last_irreversible_block_num();
Expand Down Expand Up @@ -872,6 +881,7 @@ void producer_plugin::plugin_shutdown() {
my->_thread_pool->stop();
}
my->_accepted_block_connection.reset();
my->_accepted_block_header_connection.reset();
my->_irreversible_block_connection.reset();
}

Expand Down Expand Up @@ -1226,9 +1236,9 @@ optional<fc::time_point> producer_plugin_impl::calculate_next_block_time(const a
// disqualify this producer for longer but it is assumed they will wake up, determine that they
// are disqualified for longer due to skipped blocks and re-caculate their next block with better
// information then
auto current_watermark_itr = _producer_watermarks.find(producer_name);
if (current_watermark_itr != _producer_watermarks.end()) {
auto watermark = current_watermark_itr->second;
auto current_watermark = get_watermark(producer_name);
if (current_watermark) {
auto watermark = *current_watermark;
auto block_num = chain.head_block_state()->block_num;
if (chain.is_building_block()) {
++block_num;
Expand Down Expand Up @@ -1306,7 +1316,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

// Not our turn
const auto& scheduled_producer = hbs->get_scheduled_producer(block_time);
auto currrent_watermark_itr = _producer_watermarks.find(scheduled_producer.producer_name);
auto current_watermark = get_watermark(scheduled_producer.producer_name);

size_t num_relevant_signatures = 0;
scheduled_producer.for_each_key([&](const public_key_type& key){
Expand Down Expand Up @@ -1336,14 +1346,12 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

if (_pending_block_mode == pending_block_mode::producing) {
// determine if our watermark excludes us from producing at this point
if (currrent_watermark_itr != _producer_watermarks.end()) {
if (currrent_watermark_itr->second >= hbs->block_num + 1) {
elog("Not producing block because \"${producer}\" signed a BFT confirmation OR block at a higher block number (${watermark}) than the current fork's head (${head_block_num})",
("producer", scheduled_producer.producer_name)
("watermark", currrent_watermark_itr->second)
("head_block_num", hbs->block_num));
_pending_block_mode = pending_block_mode::speculating;
}
if (current_watermark && *current_watermark >= hbs->block_num + 1) {
elog("Not producing block because \"${producer}\" signed a block at a higher block number (${watermark}) than the current fork's head (${head_block_num})",
("producer", scheduled_producer.producer_name)
("watermark", *current_watermark)
("head_block_num", hbs->block_num));
_pending_block_mode = pending_block_mode::speculating;
}
}

Expand All @@ -1363,8 +1371,8 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
// confirmations to make sure we don't double sign after a crash TODO: make these watermarks durable?
// 3) if it is a producer on this node where this node knows the last block it produced, safely set it -UNLESS-
// 4) the producer on this node's last watermark is higher (meaning on a different fork)
if (currrent_watermark_itr != _producer_watermarks.end()) {
auto watermark = currrent_watermark_itr->second;
if (current_watermark) {
auto watermark = *current_watermark;
if (watermark < hbs->block_num) {
blocks_to_confirm = (uint16_t)(std::min<uint32_t>(std::numeric_limits<uint16_t>::max(), (uint32_t)(hbs->block_num - watermark)));
}
Expand Down Expand Up @@ -1856,7 +1864,6 @@ void producer_plugin_impl::produce_block() {
chain.commit_block();

block_state_ptr new_bs = chain.head_block_state();
_producer_watermarks[new_bs->header.producer] = chain.head_block_num();

ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} [trxs: ${count}, lib: ${lib}, confirmed: ${confs}]",
("p",new_bs->header.producer)("id",fc::variant(new_bs->id).as_string().substr(0,16))
Expand Down