Skip to content

Commit

Permalink
Merge branch 'main' into ptests_to_hightier
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner authored Sep 16, 2022
2 parents 1911080 + 56f858d commit e3addf9
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 46 deletions.
125 changes: 86 additions & 39 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,21 @@ class account_failures {
max_failures_per_account = max_failures;
}

void add_idle_time( const fc::microseconds& idle ) {
block_idle_time += idle;
}

void add_fail_time( const fc::microseconds& fail_time ) {
trx_fail_time += fail_time;
++trx_fail_num;
}

void add_success_time( const fc::microseconds& time ) {
trx_success_time += time;
++trx_success_num;
}


void add( const account_name& n, int64_t exception_code ) {
auto& fa = failed_accounts[n];
++fa.num_failures;
Expand All @@ -162,9 +177,14 @@ class account_failures {
return false;
}

void report() const {
void report( const fc::time_point& idle_trx_time ) {
if( _log.is_enabled( fc::log_level::debug ) ) {
auto now = fc::time_point::now();
add_idle_time( now - idle_trx_time );
fc_dlog( _log, "Block trx idle: ${i}us out of ${t}us, success: ${sn}, ${s}us, fail: ${fn}, ${f}us, other: ${o}us",
("i", block_idle_time)("t", now - clear_time)("sn", trx_success_num)("s", trx_success_time)
("fn", trx_fail_num)("f", trx_fail_time)
("o", (now - clear_time) - block_idle_time - trx_success_time - trx_fail_time) );
for( const auto& e : failed_accounts ) {
std::string reason;
if( e.second.is_deadline() ) reason += "deadline";
Expand All @@ -189,6 +209,9 @@ class account_failures {

void clear() {
failed_accounts.clear();
block_idle_time = trx_fail_time = trx_success_time = fc::microseconds{};
trx_fail_num = trx_success_num = 0;
clear_time = fc::time_point::now();
}

private:
Expand Down Expand Up @@ -226,6 +249,12 @@ class account_failures {

std::map<account_name, account_failure> failed_accounts;
uint32_t max_failures_per_account = 3;
fc::microseconds block_idle_time;
uint32_t trx_success_num = 0;
uint32_t trx_fail_num = 0;
fc::microseconds trx_success_time;
fc::microseconds trx_fail_time;
fc::time_point clear_time{fc::time_point::now()};
const eosio::subjective_billing& subjective_billing;
};

Expand Down Expand Up @@ -291,7 +320,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
bool _disable_subjective_p2p_billing = true;
bool _disable_subjective_api_billing = true;
fc::time_point _irreversible_block_time;
fc::time_point _idle_trx_time;
fc::time_point _idle_trx_time{fc::time_point::now()};

std::vector<chain::digest_type> _protocol_features_to_activate;
bool _protocol_features_signaled = false; // to mark whether it has been signaled in start_block
Expand Down Expand Up @@ -386,8 +415,13 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
void abort_block() {
auto& chain = chain_plug->chain();

if( chain.is_building_block() ) {
_account_fails.report( _idle_trx_time );
}
_unapplied_transactions.add_aborted( chain.abort_block() );
_subjective_billing.abort_block();
_account_fails.clear();
_idle_trx_time = fc::time_point::now();
}

bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id) {
Expand Down Expand Up @@ -453,26 +487,27 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}

const auto& hbs = chain.head_block_state();
if( hbs->header.timestamp.next().to_time_point() >= fc::time_point::now() ) {
auto now = fc::time_point::now();
if( hbs->header.timestamp.next().to_time_point() >= now ) {
_production_enabled = true;
}

if( fc::time_point::now() - block->timestamp < fc::minutes(5) || (blk_num % 1000 == 0) ) {
if( now - block->timestamp < fc::minutes(5) || (blk_num % 1000 == 0) ) {
ilog("Received block ${id}... #${n} @ ${t} signed by ${p} "
"[trxs: ${count}, lib: ${lib}, conf: ${confs}, net: ${net}, cpu: ${cpu}, elapsed: ${elapsed}, time: ${time}, latency: ${latency} ms]",
("p",block->producer)("id",id.str().substr(8,16))("n",blk_num)("t",block->timestamp)
("count",block->transactions.size())("lib",chain.last_irreversible_block_num())
("confs", block->confirmed)("net", br.total_net_usage)("cpu", br.total_cpu_usage_us)
("elapsed", br.total_elapsed_time)("time", br.total_time)
("latency", (fc::time_point::now() - block->timestamp).count()/1000 ) );
("latency", (now - block->timestamp).count()/1000 ) );
if( chain.get_read_mode() != db_read_mode::IRREVERSIBLE && hbs->id != id && hbs->block != nullptr ) { // not applied to head
ilog("Block not applied to head ${id}... #${n} @ ${t} signed by ${p} "
"[trxs: ${count}, dpos: ${dpos}, conf: ${confs}, net: ${net}, cpu: ${cpu}, elapsed: ${elapsed}, time: ${time}, latency: ${latency} ms]",
("p",hbs->block->producer)("id",hbs->id.str().substr(8,16))("n",hbs->block_num)("t",hbs->block->timestamp)
("count",hbs->block->transactions.size())("dpos", hbs->dpos_irreversible_blocknum)
("confs", hbs->block->confirmed)("net", br.total_net_usage)("cpu", br.total_cpu_usage_us)
("elapsed", br.total_elapsed_time)("time", br.total_time)
("latency", (fc::time_point::now() - hbs->block->timestamp).count()/1000 ) );
("latency", (now - hbs->block->timestamp).count()/1000 ) );
}
}

Expand All @@ -481,8 +516,9 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

void restart_speculative_block() {
chain::controller& chain = chain_plug->chain();

// abort the pending block
_unapplied_transactions.add_aborted( chain.abort_block() );
abort_block();

schedule_production_loop();
}
Expand Down Expand Up @@ -521,9 +557,17 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if( future.valid() ) {
future.wait();
app().post( priority::low, [self, future{std::move(future)}, persist_until_expired, next{std::move( next )}, trx{std::move(trx)}, return_failure_traces]() mutable {
auto exception_handler = [self, &next, trx{std::move(trx)}](fc::exception_ptr ex) {
self->log_trx_results( trx, nullptr, ex, 0, fc::time_point::now() );
auto start = fc::time_point::now();
auto idle_time = start - self->_idle_trx_time;
self->_account_fails.add_idle_time( idle_time );
fc_dlog( _trx_successful_trace_log, "Time since last trx: ${t}us", ("t", idle_time) );

auto exception_handler = [self, &next, trx{std::move(trx)}, &start](fc::exception_ptr ex) {
self->_account_fails.add_idle_time( start - self->_idle_trx_time );
self->log_trx_results( trx, nullptr, ex, 0, start );
next( std::move(ex) );
self->_idle_trx_time = fc::time_point::now();
self->_account_fails.add_fail_time(self->_idle_trx_time - start);
};
try {
auto result = future.get();
Expand All @@ -534,6 +578,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
self->restart_speculative_block();
}
}
self->_idle_trx_time = fc::time_point::now();
} CATCH_AND_CALL(exception_handler);
} );
}
Expand All @@ -546,7 +591,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
next_function<transaction_trace_ptr> next) {
bool exhausted = false;
chain::controller& chain = chain_plug->chain();

try {
const auto& id = trx->id();

Expand Down Expand Up @@ -593,7 +637,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
chain_plugin::handle_bad_alloc();
} CATCH_AND_CALL(next);

_idle_trx_time = fc::time_point::now();
return !exhausted;
}

Expand Down Expand Up @@ -1490,7 +1533,6 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
const fc::time_point now = fc::time_point::now();
const fc::time_point block_time = calculate_pending_block_time();
const fc::time_point preprocess_deadline = calculate_block_deadline(block_time);
_idle_trx_time = now;

const pending_block_mode previous_pending_mode = _pending_block_mode;
_pending_block_mode = pending_block_mode::producing;
Expand Down Expand Up @@ -1658,7 +1700,6 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

// limit execution of pending incoming to once per block
auto incoming_itr = _unapplied_transactions.incoming_begin();
_account_fails.clear();

if( !process_unapplied_trxs( preprocess_deadline ) )
return start_block_result::exhausted;
Expand All @@ -1679,12 +1720,13 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
return start_block_result::failed;
if (preprocess_deadline <= fc::time_point::now() || block_is_exhausted()) {
return start_block_result::exhausted;
} else {
if( !process_incoming_trxs( preprocess_deadline, incoming_itr ) )
return start_block_result::exhausted;
return start_block_result::succeeded;
}

if( !process_incoming_trxs( preprocess_deadline, incoming_itr ) )
return start_block_result::exhausted;

return start_block_result::succeeded;

} catch ( const guard_exception& e ) {
chain_plugin::handle_guard_exception(e);
return start_block_result::failed;
Expand Down Expand Up @@ -1872,18 +1914,21 @@ producer_plugin_impl::push_transaction( const fc::time_point& block_deadline,
next_function<transaction_trace_ptr> next )
{
auto start = fc::time_point::now();
fc_dlog( _trx_successful_trace_log, "Time since last trx: ${t}us", ("t", start - _idle_trx_time) );

bool disable_subjective_enforcement = (persist_until_expired && _disable_subjective_api_billing)
|| (!persist_until_expired && _disable_subjective_p2p_billing)
|| trx->read_only;

auto first_auth = trx->packed_trx()->get_transaction().first_authorizer();
if( _pending_block_mode == pending_block_mode::producing && _account_fails.failure_limit( first_auth ) ) {
if( !disable_subjective_enforcement && _account_fails.failure_limit( first_auth ) ) {
if( next ) {
auto except_ptr = std::static_pointer_cast<fc::exception>( std::make_shared<tx_cpu_usage_exceeded>(
FC_LOG_MESSAGE( error, "transaction ${id} exceeded failure limit for account ${a}",
("id", trx->id())( "a", first_auth ) ) ) );
log_trx_results( trx, except_ptr );
next( except_ptr );
}
_idle_trx_time = fc::time_point::now();
_account_fails.add_fail_time(fc::time_point::now() - start);
return push_result{.failed = true};
}

Expand All @@ -1892,9 +1937,7 @@ producer_plugin_impl::push_transaction( const fc::time_point& block_deadline,
if( max_trx_time.count() < 0 ) max_trx_time = fc::microseconds::maximum();

bool disable_subjective_billing = ( _pending_block_mode == pending_block_mode::producing )
|| ( persist_until_expired && _disable_subjective_api_billing )
|| ( !persist_until_expired && _disable_subjective_p2p_billing )
|| trx->read_only;
|| disable_subjective_enforcement;

int64_t sub_bill = 0;
if( !disable_subjective_billing )
Expand All @@ -1910,8 +1953,10 @@ producer_plugin_impl::push_transaction( const fc::time_point& block_deadline,
}

auto trace = chain.push_transaction( trx, block_deadline, max_trx_time, prev_billed_cpu_time_us, false, sub_bill );
auto end = fc::time_point::now();
push_result pr;
if( trace->except ) {
_account_fails.add_fail_time(end - start);
if( exception_is_exhausted( *trace->except ) ) {
if( _pending_block_mode == pending_block_mode::producing ) {
fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
Expand All @@ -1924,20 +1969,19 @@ producer_plugin_impl::push_transaction( const fc::time_point& block_deadline,
} else {
pr.failed = true;
fc_dlog( _trx_failed_trace_log, "Subjective bill for failed ${a}: ${b} elapsed ${t}us, time ${r}us",
("a",first_auth)("b",sub_bill)("t",trace->elapsed)("r", fc::time_point::now() - start));
("a",first_auth)("b",sub_bill)("t",trace->elapsed)("r", end - start));
if (!disable_subjective_billing)
_subjective_billing.subjective_bill_failure( first_auth, trace->elapsed, fc::time_point::now() );

log_trx_results( trx, trace, start );
if( _pending_block_mode == pending_block_mode::producing ) {
auto failure_code = trace->except->code();
if( failure_code != tx_duplicate::code_value ) {
// this failed our configured maximum transaction time, we don't want to replay it
fc_dlog( _log, "Failed ${c} trx, auth: ${a}, prev billed: ${p}us, ran: ${r}us, id: ${id}",
("c", failure_code)("a", first_auth)("p", prev_billed_cpu_time_us)
( "r", fc::time_point::now() - start )( "id", trx->id() ) );
auto failure_code = trace->except->code();
if( failure_code != tx_duplicate::code_value ) {
// this failed our configured maximum transaction time, we don't want to replay it
fc_dlog( _trx_failed_trace_log, "Failed ${c} trx, auth: ${a}, prev billed: ${p}us, ran: ${r}us, id: ${id}",
("c", failure_code)("a", first_auth)("p", prev_billed_cpu_time_us)
( "r", end - start )( "id", trx->id() ) );
if( !disable_subjective_enforcement )
_account_fails.add( first_auth, failure_code );
}
}
if( next ) {
if( return_failure_trace ) {
Expand All @@ -1950,7 +1994,8 @@ producer_plugin_impl::push_transaction( const fc::time_point& block_deadline,
}
} else {
fc_dlog( _trx_successful_trace_log, "Subjective bill for success ${a}: ${b} elapsed ${t}us, time ${r}us",
("a",first_auth)("b",sub_bill)("t",trace->elapsed)("r", fc::time_point::now() - start));
("a",first_auth)("b",sub_bill)("t",trace->elapsed)("r", end - start));
_account_fails.add_success_time(end - start);
log_trx_results( trx, trace, start );
if( persist_until_expired && !_disable_persist_until_expired ) {
// if this trx didn't fail/soft-fail and the persist flag is set
Expand All @@ -1966,7 +2011,6 @@ producer_plugin_impl::push_transaction( const fc::time_point& block_deadline,
if( next ) next( trace );
}

_idle_trx_time = fc::time_point::now();
return pr;
}

Expand Down Expand Up @@ -2063,7 +2107,6 @@ void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p

num_processed++;

_idle_trx_time = fc::time_point::now();
// configurable ratio of incoming txns vs deferred txns
while (incoming_trx_weight >= 1.0 && itr != end ) {
if (deadline <= fc::time_point::now()) {
Expand Down Expand Up @@ -2110,7 +2153,9 @@ void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p
if( max_trx_time.count() < 0 ) max_trx_time = fc::microseconds::maximum();

auto trace = chain.push_scheduled_transaction(trx_id, deadline, max_trx_time, 0, false);
auto end = fc::time_point::now();
if (trace->except) {
_account_fails.add_fail_time(end - start);
if (exception_is_exhausted(*trace->except)) {
if( block_is_exhausted() ) {
exhausted = true;
Expand All @@ -2120,7 +2165,7 @@ void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p
fc_dlog(_trx_failed_trace_log,
"[TRX_TRACE] Block ${block_num} for producer ${prod} is REJECTING scheduled tx: ${txid}, time: ${r}, auth: ${a} : ${details}",
("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
("txid", trx_id)("r", fc::time_point::now() - start)("a", get_first_authorizer(trace))
("txid", trx_id)("r", end - start)("a", get_first_authorizer(trace))
("details", get_detailed_contract_except_info(nullptr, trace, nullptr)));
fc_dlog(_trx_trace_failure_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is REJECTING scheduled tx: ${entire_trace}",
("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
Expand All @@ -2130,10 +2175,11 @@ void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p
num_failed++;
}
} else {
_account_fails.add_success_time(end - start);
fc_dlog(_trx_successful_trace_log,
"[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING scheduled tx: ${txid}, time: ${r}, auth: ${a}, cpu: ${cpu}",
("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
("txid", trx_id)("r", fc::time_point::now() - start)("a", get_first_authorizer(trace))
("txid", trx_id)("r", end - start)("a", get_first_authorizer(trace))
("cpu", trace->receipt ? trace->receipt->cpu_usage_us : 0));
fc_dlog(_trx_trace_success_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING scheduled tx: ${entire_trace}",
("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
Expand All @@ -2160,7 +2206,6 @@ bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline
bool exhausted = false;
auto end = _unapplied_transactions.incoming_end();
if( itr != end ) {
_idle_trx_time = fc::time_point::now();
size_t processed = 0;
fc_dlog( _log, "Processing ${n} pending transactions", ("n", _unapplied_transactions.incoming_size()) );
while( itr != end ) {
Expand Down Expand Up @@ -2214,6 +2259,8 @@ void producer_plugin_impl::schedule_production_loop() {

auto result = start_block();

_idle_trx_time = fc::time_point::now();

if (result == start_block_result::failed) {
elog("Failed to start a pending block, will try again later");
_timer.expires_from_now( boost::posix_time::microseconds( config::block_interval_us / 10 ));
Expand Down Expand Up @@ -2397,7 +2444,7 @@ void producer_plugin_impl::produce_block() {

block_state_ptr new_bs = chain.head_block_state();

_account_fails.report();
_account_fails.report(_idle_trx_time);
_account_fails.clear();

controller::block_report br;
Expand Down
Loading

0 comments on commit e3addf9

Please sign in to comment.