Skip to content

Commit

Permalink
Merge pull request #1702 from AntelopeIO/GH-1662-thread-hops
Browse files Browse the repository at this point in the history
Reduce the number of thread hops for read-only trxs
  • Loading branch information
heifner authored Oct 4, 2023
2 parents f5e368e + 6a44da5 commit 77b46e7
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 65 deletions.
24 changes: 21 additions & 3 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,31 @@ class priority_queue_executor {
static constexpr uint16_t minimum_runtime_ms = 3;

// inform how many read_threads will be calling read_only/read_exclusive queues
// Currently only used to assert if exec_queue::read_exclusive is used without any read threads
// expected to only be called at program startup, not thread safe, not safe to call after startup
void init_read_threads(size_t num_read_threads) {
pri_queue_.init_read_threads(num_read_threads);
}

// not thread safe, see init_read_threads comment
size_t get_read_threads() const {
return pri_queue_.get_read_threads();
}

// assume application is started on the main thread
std::thread::id get_main_thread_id() const {
return main_thread_id_;
}

template <typename Func>
auto post( int priority, exec_queue q, Func&& func ) {
return boost::asio::post( io_serv_, pri_queue_.wrap( priority, q, --order_, std::forward<Func>(func)));
void post( int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
// no reason to post to io_service which then places this in the read_exclusive_handlers queue.
// read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue.
pri_queue_.add(priority, q, --order_, std::forward<Func>(func));
} else {
// post to io_service as the main thread may be blocked on io_service.run_one() in application::exec()
boost::asio::post(io_serv_, pri_queue_.wrap(priority, q, --order_, std::forward<Func>(func)));
}
}

// Legacy and deprecated. To be removed after cleaning up its uses in base appbase
Expand Down Expand Up @@ -125,6 +142,7 @@ class priority_queue_executor {

// members are ordered taking into account that the last one is destructed first
private:
std::thread::id main_thread_id_{ std::this_thread::get_id() };
boost::asio::io_service io_serv_;
appbase::exec_pri_queue pri_queue_;
std::atomic<std::size_t> order_{ std::numeric_limits<size_t>::max() }; // to maintain FIFO ordering in all queues within priority
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@ class exec_pri_queue : public boost::asio::execution_context
public:

// inform how many read_threads will be calling read_only/read_exclusive queues
// expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_
void init_read_threads(size_t num_read_threads) {
assert(!lock_enabled_);
num_read_threads_ = num_read_threads;
}

// not strictly thread safe, see init_read_threads comment
size_t get_read_threads() const {
return num_read_threads_;
}

void stop() {
std::lock_guard g( mtx_ );
exiting_blocking_ = true;
Expand All @@ -60,7 +67,7 @@ class exec_pri_queue : public boost::asio::execution_context
assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive);
prio_queue& que = priority_que(q);
std::unique_ptr<queued_handler_base> handler(new queued_handler<Function>(priority, order, std::move(function)));
if (lock_enabled_) {
if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive
std::lock_guard g( mtx_ );
que.push( std::move( handler ) );
if (num_waiting_)
Expand Down
114 changes: 73 additions & 41 deletions libraries/custom_appbase/tests/custom_appbase_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,61 @@ using namespace appbase;

BOOST_AUTO_TEST_SUITE(custom_appbase_tests)

std::thread start_app_thread(appbase::scoped_app& app) {
const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() };
BOOST_CHECK(app->initialize(sizeof(argv) / sizeof(char*), const_cast<char**>(argv)));
app->startup();
std::thread app_thread( [&]() {
app->exec();
} );
return app_thread;
}
class scoped_app_thread {
public:
explicit scoped_app_thread(bool delay_exec = false) {
app_thread_ = start_app_thread();
if (!delay_exec) {
start_exec();
}
}
~scoped_app_thread() { // destroy app instance so next instance gets a clean one
application::reset_app_singleton();
}

scoped_app_thread(const scoped_app_thread&) = delete;
scoped_app_thread& operator=(const scoped_app_thread&) = delete;

appbase::application* operator->() {
return app_;
}
const appbase::application* operator->() const {
return app_;
}

void start_exec() {
start_exec_.set_value();
}

void join() {
app_thread_.join();
}

private:
std::thread start_app_thread() {
std::promise<void> start_complete;
std::thread app_thread( [&]() {
assert(application::null_app_singleton());
app_ = &appbase::app();
const char* argv[] = { boost::unit_test::framework::current_test_case().p_name->c_str() };
BOOST_CHECK(app_->initialize(sizeof(argv) / sizeof(char*), const_cast<char**>(argv)));
app_->startup();
start_complete.set_value();
start_exec_.get_future().get();
app_->exec();
} );
start_complete.get_future().get();
return app_thread;
}

private:
std::thread app_thread_;
std::promise<void> start_exec_;
appbase::application* app_;
};


std::thread start_read_thread(appbase::scoped_app& app) {
std::thread start_read_thread(scoped_app_thread& app) {
static int num = 0;
std::thread read_thread( [&]() {
std::string name ="read-" + std::to_string(num++);
Expand All @@ -37,9 +81,8 @@ std::thread start_read_thread(appbase::scoped_app& app) {

// verify functions from both queues (read_only,read_write) are executed when execution window is not explicitly set
BOOST_AUTO_TEST_CASE( default_exec_window ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// post functions
std::map<int, int> rslts {};
int seq_num = 0;
Expand All @@ -51,19 +94,17 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) {
app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } );
app->executor().post( priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_exclusive, [&]() { rslts[8]=seq_num; ++seq_num; } );
app->executor().post( priority::low, exec_queue::read_exclusive, [&]() { rslts[9]=seq_num; ++seq_num; } );

// Stop app. Use the lowest priority to make sure this function to execute the last
app->executor().post( priority::lowest, exec_queue::read_only, [&]() {
// read_only_queue should only contain the current lambda function,
// and read_write_queue should have executed all its functions
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 2u );
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u );
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -86,9 +127,8 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) {

// verify functions only from read_only queue are processed during read window on the main thread
BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// set to run functions from read_only queue only
app->executor().init_read_threads(1);
app->executor().set_to_read_window([](){return false;});
Expand All @@ -115,7 +155,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 4u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -134,9 +174,8 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) {

// verify no functions are executed during read window if read_only & read_exclusive queue is empty
BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);

scoped_app_thread app;

// set to run functions from read_only & read_exclusive queues only
app->executor().init_read_threads(1);
app->executor().set_to_read_window([](){return false;});
Expand All @@ -163,7 +202,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 10u );
app->quit();
} );
app_thread.join();
app.join();

// all queues are cleared when exiting application::exec()
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand All @@ -176,8 +215,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_only_queue ) {

// verify functions from both queues (read_only, read_write) are processed in write window, but not read_exclusive
BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {
appbase::scoped_app app;
auto app_thread = start_app_thread(app);
scoped_app_thread app;

// set to run functions from both queues
app->executor().is_write_window();
Expand All @@ -197,20 +235,17 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {
app->executor().post( priority::lowest, exec_queue::read_only, [&]() { rslts[9]=seq_num; ++seq_num; } );
app->executor().post( priority::low, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } );
app->executor().post( priority::highest,exec_queue::read_exclusive, [&]() { rslts[12]=seq_num; ++seq_num; } );
app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { rslts[13]=seq_num; ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_exclusive, [&]() { rslts[14]=seq_num; ++seq_num; } );

// stop application. Use lowest at the end to make sure this executes the last
app->executor().post( priority::lowest, exec_queue::read_only, [&]() {
// read_queue should have current function and write_queue's functions are all executed
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 1u); // pop()s after execute
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 3u);
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u);
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u );
app->quit();
} );

app_thread.join();
app.join();

// queues are emptied after exec
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand Down Expand Up @@ -245,7 +280,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_write_queues ) {

// verify tasks from both queues (read_only, read_exclusive) are processed in read window
BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;
scoped_app_thread app(true);

app->executor().init_read_threads(3);
// set to run functions from read_only & read_exclusive queues only
Expand Down Expand Up @@ -289,7 +324,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

auto app_thread = start_app_thread(app);
app.start_exec();
constexpr int num_expected = 13; // 16 - 3 read_write

auto read_thread1 = start_read_thread(app);
Expand All @@ -307,7 +342,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
};
work.reset();
app->quit();
app_thread.join();
app.join();

// queues are emptied after exec
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true);
Expand Down Expand Up @@ -340,10 +375,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {

// verify tasks from both queues (read_only, read_exclusive) are processed in read window
BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;

auto app_thread = start_app_thread(app);
std::thread::id app_thread_id = app_thread.get_id();
scoped_app_thread app;

// set to run functions from read_only & read_exclusive queues only
app->executor().init_read_threads(3);
Expand Down Expand Up @@ -400,15 +432,15 @@ BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
};

app->quit();
app_thread.join();
app.join();

// exactly number of posts processed
BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){ return v != std::thread::id(); }), num_expected );

const auto run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; });
const auto run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; });
const auto run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; });
const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; });
const auto run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app->executor().get_main_thread_id(); });

BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected);
BOOST_CHECK(run_on_1 > 0);
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_api_plugin/chain_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ void chain_api_plugin::plugin_startup() {
CHAIN_RO_CALL(get_required_keys, 200, http_params_types::params_required),
CHAIN_RO_CALL(get_transaction_id, 200, http_params_types::params_required),
// transaction related APIs will be posted to read_write queue after keys are recovered, they are safe to run in parallel until they post to the read_write queue
CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required),
CHAIN_RO_CALL_ASYNC(compute_transaction, chain_apis::read_only::compute_transaction_results, 200, http_params_types::params_required),
CHAIN_RW_CALL_ASYNC(push_transaction, chain_apis::read_write::push_transaction_results, 202, http_params_types::params_required),
CHAIN_RW_CALL_ASYNC(push_transactions, chain_apis::read_write::push_transactions_results, 202, http_params_types::params_required),
Expand All @@ -170,6 +169,8 @@ void chain_api_plugin::plugin_startup() {
}

_http_plugin.add_async_api({
// chain_plugin send_read_only_transaction will post to read_exclusive queue
CHAIN_RO_CALL_ASYNC(send_read_only_transaction, chain_apis::read_only::send_read_only_transaction_results, 200, http_params_types::params_required),
CHAIN_RO_CALL_WITH_400(get_raw_block, 200, http_params_types::params_required),
CHAIN_RO_CALL_WITH_400(get_block_header, 200, http_params_types::params_required)
});
Expand Down
10 changes: 9 additions & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,7 @@ void read_write::push_transactions(const read_write::push_transactions_params& p
} CATCH_AND_CALL(next);
}

// called from read-exclusive thread for read-only
template<class API, class Result>
void api_base::send_transaction_gen(API &api, send_transaction_params_t params, next_function<Result> next) {
try {
Expand Down Expand Up @@ -2567,12 +2568,19 @@ void read_only::compute_transaction(compute_transaction_params params, next_func
}

void read_only::send_read_only_transaction(send_read_only_transaction_params params, next_function<send_read_only_transaction_results> next) {
static bool read_only_enabled = app().executor().get_read_threads() > 0;
EOS_ASSERT( read_only_enabled, unsupported_feature,
"read-only transactions execution not enabled on API node. Set read-only-threads > 0" );

send_transaction_params_t gen_params { .return_failure_trace = false,
.retry_trx = false,
.retry_trx_num_blocks = std::nullopt,
.trx_type = transaction_metadata::trx_type::read_only,
.transaction = std::move(params.transaction) };
return send_transaction_gen(*this, std::move(gen_params), std::move(next));
// run read-only trx exclusively on read-only threads
app().executor().post(priority::low, exec_queue::read_exclusive, [this, gen_params{std::move(gen_params)}, next{std::move(next)}]() mutable {
send_transaction_gen(*this, std::move(gen_params), std::move(next));
});
}

read_only::get_transaction_id_result read_only::get_transaction_id( const read_only::get_transaction_id_params& params, const fc::time_point& ) const {
Expand Down
16 changes: 8 additions & 8 deletions plugins/http_plugin/include/eosio/http_plugin/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,18 @@ struct http_plugin_state {
*/
inline auto make_http_response_handler(http_plugin_state& plugin_state, detail::abstract_conn_ptr session_ptr, http_content_type content_type) {
return [&plugin_state,
session_ptr{std::move(session_ptr)}, content_type](int code, std::optional<fc::variant> response) {
session_ptr{std::move(session_ptr)}, content_type](int code, std::optional<fc::variant> response) mutable {
auto payload_size = detail::in_flight_sizeof(response);
if(auto error_str = session_ptr->verify_max_bytes_in_flight(payload_size); !error_str.empty()) {
session_ptr->send_busy_response(std::move(error_str));
return;
}

plugin_state.bytes_in_flight += payload_size;

// post back to an HTTP thread to allow the response handler to be called from any thread
boost::asio::post(plugin_state.thread_pool.get_executor(),
[&plugin_state, session_ptr, code, payload_size, response = std::move(response), content_type]() {
boost::asio::dispatch(plugin_state.thread_pool.get_executor(),
[&plugin_state, session_ptr{std::move(session_ptr)}, code, payload_size, response = std::move(response), content_type]() {
if(auto error_str = session_ptr->verify_max_bytes_in_flight(0); !error_str.empty()) {
session_ptr->send_busy_response(std::move(error_str));
return;
}

try {
plugin_state.bytes_in_flight -= payload_size;
if (response.has_value()) {
Expand Down
Loading

0 comments on commit 77b46e7

Please sign in to comment.