diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index 97f2fcdc2b..54b4c57688 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -335,4 +335,82 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) { BOOST_CHECK_LT( rslts[6], rslts[14] ); } +// verify tasks from both queues (read_only, read_exclusive) are processed in read window +BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) { + appbase::scoped_app app; + + auto app_thread = start_app_thread(app); + std::thread::id app_thread_id = app_thread.get_id(); + + // set to run functions from read_only & read_exclusive queues only + app->executor().set_to_read_window(3, [](){return false;}); + + // post functions + constexpr size_t num_expected = 600u; + std::vector> rslts(num_expected); + std::atomic seq_num = 0; + for (size_t i = 0; i < 200; i+=5) { + app->executor().post( priority::high, exec_queue::read_exclusive, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_only, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i+1)); } ); + } + auto read_thread1 = start_read_thread(app); + std::thread::id read_thread1_id = read_thread1.get_id(); + for (size_t i = 200; i < 400; i+=5) { + app->executor().post( priority::high, exec_queue::read_exclusive, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::low, exec_queue::read_exclusive, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } ); + } + auto read_thread2 = start_read_thread(app); + std::thread::id read_thread2_id = read_thread2.get_id(); + for (size_t i = 400; i < num_expected; i+=5) { + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } ); + app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; } ); + app->executor().post( priority::medium, exec_queue::read_exclusive, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; } ); + } + auto read_thread3 = start_read_thread(app); + std::thread::id read_thread3_id = read_thread3.get_id(); + + // Use lowest at the end to make sure this executes the last + app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() { + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); + } ); + + read_thread1.join(); + read_thread2.join(); + read_thread3.join(); + + size_t num_sleeps = 0; + while (seq_num < num_expected) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + if (++num_sleeps > 10000) + break; + }; + + app->quit(); + app_thread.join(); + + // exactly number of posts processed + BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){ return v != std::thread::id(); }), num_expected ); + + size_t run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; }); + size_t run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; }); + size_t run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; }); + size_t run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; }); + + BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected); + BOOST_CHECK(run_on_1 > 0); + BOOST_CHECK(run_on_2 > 0); + BOOST_CHECK(run_on_3 > 0); + BOOST_CHECK(run_on_main > 0); +} + BOOST_AUTO_TEST_SUITE_END()