Skip to content

Commit

Permalink
Merge pull request #910 from AntelopeIO/GH-891-interrupt-start-block-…
Browse files Browse the repository at this point in the history
…main

[4.0 -> main] Use block_num for interrupt of start_block & Use single timer for read and write windows
  • Loading branch information
heifner authored Mar 27, 2023
2 parents 49939fe + 23fd711 commit 1c5e133
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 41 deletions.
11 changes: 6 additions & 5 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3225,11 +3225,12 @@ namespace eosio {
return;
}

bool valid_block_header = !!bsp;

if( valid_block_header ) {
uint32_t block_num = bsp ? bsp->block_num : 0;

if( block_num != 0 ) {
fc_dlog( logger, "validated block header, broadcasting immediately, connection ${cid}, blk num = ${num}, id = ${id}",
("cid", cid)("num", bsp->block_num)("id", bsp->id) );
("cid", cid)("num", block_num)("id", bsp->id) );
my_impl->dispatcher->add_peer_block( bsp->id, cid ); // no need to send back to sender
my_impl->dispatcher->bcast_block( bsp->block, bsp->id );
}
Expand All @@ -3238,9 +3239,9 @@ namespace eosio {
c->process_signed_block( id, std::move(ptr), std::move(bsp) );
});

if( valid_block_header ) {
if( block_num != 0 ) {
// ready to process immediately, so signal producer to interrupt start_block
my_impl->producer_plug->received_block();
my_impl->producer_plug->received_block(block_num);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
void register_metrics_listener(metrics_listener listener);

// thread-safe, called when a new block is received
void received_block();
void received_block(uint32_t block_num);

const std::set<account_name>& producer_accounts() const;

Expand Down
74 changes: 39 additions & 35 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
producer_plugin_impl(boost::asio::io_service& io)
:_timer(io)
,_transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
,_ro_write_window_timer(io)
,_ro_read_window_timer(io)
,_ro_timer(io)
{
}

Expand Down Expand Up @@ -370,7 +369,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
named_thread_pool<struct prod> _thread_pool;

std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
std::atomic<bool> _received_block{false}; // modified by net_plugin thread pool and app thread
std::atomic<uint32_t> _received_block{0}; // modified by net_plugin thread pool
fc::microseconds _max_irreversible_block_age_us;
int32_t _produce_time_offset_us = 0;
int32_t _last_block_time_offset_us = 0;
Expand Down Expand Up @@ -486,20 +485,20 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// - all threads would be idle
// - or the net_plugin received a block.
// - or we have reached the read_window_deadline
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, fc::time_point deadline) {
void set_exit_criteria(uint32_t num_tasks, std::atomic<uint32_t>* received_block, uint32_t block_num, fc::time_point deadline) {
std::lock_guard<std::mutex> g( mtx ); // not strictly necessary with current usage from single thread
assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr);
assert(received_block && *received_block == false);
max_waiting = num_tasks;
num_waiting = 0;
received_block_ptr = received_block;
pending_block_num = block_num;
read_window_deadline = deadline;
exiting_read_window = false;
}

private:
bool should_exit() {
return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr;
return exiting_read_window || fc::time_point::now() >= read_window_deadline || (*received_block_ptr >= pending_block_num);
}

mutable std::mutex mtx;
Expand All @@ -508,7 +507,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
uint32_t num_waiting{0};
uint32_t max_waiting{0};
bool exiting_read_window{false};
std::atomic<bool>* received_block_ptr{nullptr};
std::atomic<uint32_t>* received_block_ptr{nullptr};
uint32_t pending_block_num{0};
fc::time_point read_window_deadline;
};

Expand All @@ -521,8 +521,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
fc::microseconds _ro_read_window_effective_time_us{ 0 }; // calculated during option initialization
std::atomic<int64_t> _ro_all_threads_exec_time_us; // total time spent by all threads executing transactions. use atomic for simplicity and performance
fc::time_point _ro_read_window_start_time;
boost::asio::deadline_timer _ro_write_window_timer;
boost::asio::deadline_timer _ro_read_window_timer;
boost::asio::deadline_timer _ro_timer;
fc::microseconds _ro_max_trx_time_us{ 0 }; // calculated during option initialization
ro_trx_queue_t _ro_trx_queue;
std::atomic<uint32_t> _ro_num_active_trx_exec_tasks{ 0 };
Expand Down Expand Up @@ -881,7 +880,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
exhausted
};

inline bool should_interrupt_start_block( const fc::time_point& deadline ) const;
inline bool should_interrupt_start_block( const fc::time_point& deadline, uint32_t pending_block_num ) const;
start_block_result start_block();

fc::time_point calculate_pending_block_time() const;
Expand Down Expand Up @@ -1870,12 +1869,12 @@ fc::time_point producer_plugin_impl::calculate_block_deadline( const fc::time_po
}
}

bool producer_plugin_impl::should_interrupt_start_block( const fc::time_point& deadline ) const {
bool producer_plugin_impl::should_interrupt_start_block( const fc::time_point& deadline, uint32_t pending_block_num ) const {
if( _pending_block_mode == pending_block_mode::producing ) {
return deadline <= fc::time_point::now();
}
// if we can produce then honor deadline so production starts on time
return (!_producers.empty() && deadline <= fc::time_point::now()) || _received_block;
return (!_producers.empty() && deadline <= fc::time_point::now()) || (_received_block >= pending_block_num);
}

producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
Expand All @@ -1895,6 +1894,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

const fc::time_point now = fc::time_point::now();
const fc::time_point block_time = calculate_pending_block_time();
const uint32_t pending_block_num = hbs->block_num + 1;
const fc::time_point preprocess_deadline = calculate_block_deadline(block_time);

const pending_block_mode previous_pending_mode = _pending_block_mode;
Expand Down Expand Up @@ -1960,7 +1960,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
if (_pending_block_mode == pending_block_mode::producing) {
const auto start_block_time = block_time - fc::microseconds( config::block_interval_us );
if( now < start_block_time ) {
fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", hbs->block_num + 1)("bt", block_time) );
fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", pending_block_num)("bt", block_time) );
// start_block_time instead of block_time because schedule_delayed_production_loop calculates next block time from given time
schedule_delayed_production_loop(weak_from_this(), calculate_producer_wake_up_time(start_block_time));
return start_block_result::waiting_for_production;
Expand All @@ -1974,7 +1974,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
}

fc_dlog(_log, "Starting block #${n} at ${time} producer ${p}",
("n", hbs->block_num + 1)("time", now)("p", scheduled_producer.producer_name));
("n", pending_block_num)("time", now)("p", scheduled_producer.producer_name));

try {
uint16_t blocks_to_confirm = 0;
Expand Down Expand Up @@ -2038,7 +2038,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
std::swap( features_to_activate, protocol_features_to_activate );
_protocol_features_signaled = true;
ilog( "signaling activation of the following protocol features in block ${num}: ${features_to_activate}",
("num", hbs->block_num + 1)("features_to_activate", features_to_activate) );
("num", pending_block_num)("features_to_activate", features_to_activate) );
}
}

Expand All @@ -2064,7 +2064,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
if( !remove_expired_blacklisted_trxs( preprocess_deadline ) )
return start_block_result::exhausted;
if( !_subjective_billing.remove_expired( _log, chain.pending_block_time(), fc::time_point::now(),
[&](){ return should_interrupt_start_block( preprocess_deadline ); } ) ) {
[&](){ return should_interrupt_start_block( preprocess_deadline, pending_block_num ); } ) ) {
return start_block_result::exhausted;
}

Expand All @@ -2089,7 +2089,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit()
return start_block_result::failed;
if ( should_interrupt_start_block( preprocess_deadline ) || block_is_exhausted() ) {
if ( should_interrupt_start_block( preprocess_deadline, pending_block_num ) || block_is_exhausted() ) {
return start_block_result::exhausted;
}

Expand All @@ -2116,11 +2116,12 @@ bool producer_plugin_impl::remove_expired_trxs( const fc::time_point& deadline )
{
chain::controller& chain = chain_plug->chain();
auto pending_block_time = chain.pending_block_time();
auto pending_block_num = chain.pending_block_num();

// remove all expired transactions
size_t num_expired = 0;
size_t orig_count = _unapplied_transactions.size();
bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline); },
bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline, pending_block_num); },
[&num_expired]( const packed_transaction_ptr& packed_trx_ptr, trx_enum_type trx_type ) {
// expired exception is logged as part of next() call
++num_expired;
Expand All @@ -2144,12 +2145,13 @@ bool producer_plugin_impl::remove_expired_blacklisted_trxs( const fc::time_point
if(!blacklist_by_expiry.empty()) {
const chain::controller& chain = chain_plug->chain();
const auto lib_time = chain.last_irreversible_block_time();
const auto pending_block_num = chain.pending_block_num();

int num_expired = 0;
int orig_count = _blacklisted_transactions.size();

while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= lib_time) {
if ( should_interrupt_start_block( deadline ) ) {
if ( should_interrupt_start_block( deadline, pending_block_num ) ) {
exhausted = true;
break;
}
Expand Down Expand Up @@ -2416,12 +2418,14 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin
{
bool exhausted = false;
if( !_unapplied_transactions.empty() ) {
const chain::controller& chain = chain_plug->chain();
const auto pending_block_num = chain.pending_block_num();
int num_applied = 0, num_failed = 0, num_processed = 0;
auto unapplied_trxs_size = _unapplied_transactions.size();
auto itr = _unapplied_transactions.unapplied_begin();
auto end_itr = _unapplied_transactions.unapplied_end();
while( itr != end_itr ) {
if( should_interrupt_start_block( deadline ) ) {
if( should_interrupt_start_block( deadline, pending_block_num ) ) {
exhausted = true;
break;
}
Expand Down Expand Up @@ -2596,8 +2600,10 @@ bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline
if( itr != end ) {
size_t processed = 0;
fc_dlog( _log, "Processing ${n} pending transactions", ("n", _unapplied_transactions.incoming_size()) );
const chain::controller& chain = chain_plug->chain();
const auto pending_block_num = chain.pending_block_num();
while( itr != end ) {
if ( should_interrupt_start_block( deadline ) ) {
if ( should_interrupt_start_block( deadline, pending_block_num ) ) {
exhausted = true;
break;
}
Expand Down Expand Up @@ -2640,7 +2646,6 @@ bool producer_plugin_impl::block_is_exhausted() const {
// -> Idle
// --> Start block B (block time y.000) at time x.500
void producer_plugin_impl::schedule_production_loop() {
_received_block = false;
_timer.cancel();

auto result = start_block();
Expand Down Expand Up @@ -2848,8 +2853,8 @@ void producer_plugin_impl::produce_block() {
("confs", new_bs->header.confirmed));
}

void producer_plugin::received_block() {
my->_received_block = true;
void producer_plugin::received_block(uint32_t block_num) {
my->_received_block = block_num;
}

void producer_plugin::log_failed_transaction(const transaction_id_type& trx_id, const packed_transaction_ptr& packed_trx_ptr, const char* reason) const {
Expand Down Expand Up @@ -2878,8 +2883,7 @@ void producer_plugin_impl::switch_to_write_window() {
}

EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "no read-only tasks should be running before switching to write window");
_ro_read_window_timer.cancel();
_ro_write_window_timer.cancel();
_ro_timer.cancel();

start_write_window();
}
Expand All @@ -2893,8 +2897,8 @@ void producer_plugin_impl::start_write_window() {
_idle_trx_time = fc::time_point::now();

auto expire_time = boost::posix_time::microseconds(_ro_write_window_time_us.count());
_ro_write_window_timer.expires_from_now( expire_time );
_ro_write_window_timer.async_wait( app().executor().wrap( // stay on app thread
_ro_timer.expires_from_now( expire_time );
_ro_timer.async_wait( app().executor().wrap( // stay on app thread
priority::high,
exec_queue::read_only_trx_safe, // placed in read_only_trx_safe queue so it is ensured to be executed in either window
[weak_this = weak_from_this()]( const boost::system::error_code& ec ) {
Expand All @@ -2910,8 +2914,7 @@ void producer_plugin_impl::switch_to_read_window() {
EOS_ASSERT(app().executor().is_write_window(), producer_exception, "expected to be in write window");
EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "_ro_trx_exec_tasks_fut expected to be empty" );

_ro_write_window_timer.cancel();
_ro_read_window_timer.cancel();
_ro_timer.cancel();
_time_tracker.add_idle_time( fc::time_point::now() - _idle_trx_time );

// we are in write window, so no read-only trx threads are processing transactions.
Expand All @@ -2921,25 +2924,26 @@ void producer_plugin_impl::switch_to_read_window() {
return;
}

auto& chain = chain_plug->chain();
uint32_t pending_block_num = chain.head_block_num() + 1;
app().executor().set_to_read_window();
chain_plug->chain().set_db_read_only_mode();
_received_block = false;
chain.set_db_read_only_mode();
_ro_read_window_start_time = fc::time_point::now();
_ro_all_threads_exec_time_us = 0;

// start a read-only transaction execution task in each thread in the thread pool
_ro_num_active_trx_exec_tasks = _ro_thread_pool_size;
auto start_time = fc::time_point::now();
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us);
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, pending_block_num, start_time + _ro_read_window_effective_time_us);
for (auto i = 0; i < _ro_thread_pool_size; ++i ) {
_ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () {
return read_only_trx_execution_task(start_time);
}) );
}

auto expire_time = boost::posix_time::microseconds(_ro_read_window_time_us.count());
_ro_read_window_timer.expires_from_now( expire_time );
_ro_read_window_timer.async_wait( app().executor().wrap( // stay on app thread
_ro_timer.expires_from_now( expire_time );
_ro_timer.async_wait( app().executor().wrap( // stay on app thread
priority::high,
exec_queue::read_only_trx_safe,
[weak_this = weak_from_this()]( const boost::system::error_code& ec ) {
Expand Down

0 comments on commit 1c5e133

Please sign in to comment.