Skip to content

Commit

Permalink
GH-1639 Additional test
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Sep 21, 2023
1 parent e6bbaaf commit a6465e6
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions libraries/custom_appbase/tests/custom_appbase_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,4 +335,82 @@ BOOST_AUTO_TEST_CASE( execute_from_read_only_and_read_exclusive_queues ) {
BOOST_CHECK_LT( rslts[6], rslts[14] );
}

// verify tasks from both queues (read_only, read_exclusive) are processed in read window
BOOST_AUTO_TEST_CASE( execute_many_from_read_only_and_read_exclusive_queues ) {
appbase::scoped_app app;

auto app_thread = start_app_thread(app);
std::thread::id app_thread_id = app_thread.get_id();

// set to run functions from read_only & read_exclusive queues only
app->executor().set_to_read_window(3, [](){return false;});

// post functions
constexpr size_t num_expected = 600u;
std::vector<std::atomic<std::thread::id>> rslts(num_expected);
std::atomic<int> seq_num = 0;
for (size_t i = 0; i < 200; i+=5) {
app->executor().post( priority::high, exec_queue::read_exclusive, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } );
app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; } );
app->executor().post( priority::low, exec_queue::read_exclusive, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_only, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i+1)); } );
}
auto read_thread1 = start_read_thread(app);
std::thread::id read_thread1_id = read_thread1.get_id();
for (size_t i = 200; i < 400; i+=5) {
app->executor().post( priority::high, exec_queue::read_exclusive, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } );
app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } );
app->executor().post( priority::low, exec_queue::read_exclusive, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } );
app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } );
app->executor().post( priority::medium, exec_queue::read_exclusive, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(i)); } );
}
auto read_thread2 = start_read_thread(app);
std::thread::id read_thread2_id = read_thread2.get_id();
for (size_t i = 400; i < num_expected; i+=5) {
app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } );
app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+1) = std::this_thread::get_id(); ++seq_num; std::this_thread::sleep_for(std::chrono::microseconds(10)); } );
app->executor().post( priority::low, exec_queue::read_only, [&,i]() { rslts.at(i+2) = std::this_thread::get_id(); ++seq_num; } );
app->executor().post( priority::high, exec_queue::read_only, [&,i]() { rslts.at(i+3) = std::this_thread::get_id(); ++seq_num; } );
app->executor().post( priority::medium, exec_queue::read_exclusive, [&,i]() { rslts.at(i+4) = std::this_thread::get_id(); ++seq_num; } );
}
auto read_thread3 = start_read_thread(app);
std::thread::id read_thread3_id = read_thread3.get_id();

// Use lowest at the end to make sure this executes the last
app->executor().post( priority::lowest, exec_queue::read_exclusive, [&]() {
BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute
BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u);
BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u );
} );

read_thread1.join();
read_thread2.join();
read_thread3.join();

size_t num_sleeps = 0;
while (seq_num < num_expected) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (++num_sleeps > 10000)
break;
};

app->quit();
app_thread.join();

// exactly number of posts processed
BOOST_REQUIRE_EQUAL( std::count_if(rslts.cbegin(), rslts.cend(), [](const auto& v){ return v != std::thread::id(); }), num_expected );

size_t run_on_1 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread1_id; });
size_t run_on_2 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread2_id; });
size_t run_on_3 = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == read_thread3_id; });
size_t run_on_main = std::count_if(rslts.cbegin(), rslts.cend(), [&](const auto& v){ return v == app_thread_id; });

BOOST_REQUIRE_EQUAL(run_on_1+run_on_2+run_on_3+run_on_main, num_expected);
BOOST_CHECK(run_on_1 > 0);
BOOST_CHECK(run_on_2 > 0);
BOOST_CHECK(run_on_3 > 0);
BOOST_CHECK(run_on_main > 0);
}

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit a6465e6

Please sign in to comment.