Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Fix IoScheduler::yield_until and improve tests #533

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 11 additions & 23 deletions cpp/mrc/src/public/coroutines/io_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,42 +179,30 @@ auto IoScheduler::yield_for(std::chrono::milliseconds amount) -> mrc::coroutines
}
else
{
// Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
// that if the user gives an invalid amount and schedule() is directly called it will account
// for the scheduled task there.
m_size.fetch_add(1, std::memory_order::release);

// Yielding does not requiring setting the timer position on the poll info since
// it doesn't have a corresponding 'event' that can trigger, it always waits for
// the timeout to occur before resuming.

detail::PollInfo pi{};
add_timer_token(clock_t::now() + amount, pi);
co_await pi;

m_size.fetch_sub(1, std::memory_order::release);
co_return co_await yield_until(coroutines::clock_t::now() + amount);
}
co_return;
}

auto IoScheduler::yield_until(time_point_t time) -> mrc::coroutines::Task<void>
{
auto now = clock_t::now();

// If the requested time is in the past (or now!) bail out!
if (time <= now)
if (time <= clock_t::now())
{
co_await schedule();
}
else
{
// Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
// that if the user gives an invalid amount and schedule() is directly called it will account
// for the scheduled task there.
m_size.fetch_add(1, std::memory_order::release);

auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);

detail::PollInfo pi{};
add_timer_token(now + amount, pi);
co_await pi;
// Yielding does not requiring setting the timer position on the poll info since
// it doesn't have a corresponding 'event' that can trigger, it always waits for
// the timeout to occur before resuming.
detail::PollInfo poll_info{};
add_timer_token(time, poll_info);
co_await poll_info;

m_size.fetch_sub(1, std::memory_order::release);
}
Expand Down
40 changes: 34 additions & 6 deletions cpp/mrc/tests/coroutines/test_io_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <coroutine>
#include <cstdint>
#include <memory>
#include <ratio>
#include <utility>
#include <vector>

Expand All @@ -41,42 +42,69 @@ TEST_F(TestCoroIoScheduler, YieldFor)
{
auto scheduler = coroutines::IoScheduler::get_instance();

static constexpr std::chrono::milliseconds Delay{10};

auto task = [scheduler]() -> coroutines::Task<> {
co_await scheduler->yield_for(10ms);
co_await scheduler->yield_for(Delay);
};

auto start = coroutines::clock_t::now();
coroutines::sync_wait(task());
auto stop = coroutines::clock_t::now();

ASSERT_GE(stop - start, Delay);
}

TEST_F(TestCoroIoScheduler, YieldUntil)
{
auto scheduler = coroutines::IoScheduler::get_instance();

auto task = [scheduler]() -> coroutines::Task<> {
co_await scheduler->yield_until(coroutines::clock_t::now() + 10ms);
coroutines::clock_t::time_point target_time{};

auto task = [scheduler, &target_time]() -> coroutines::Task<> {
target_time = coroutines::clock_t::now() + 10ms;
co_await scheduler->yield_until(target_time);
};

coroutines::sync_wait(task());

auto current_time = coroutines::clock_t::now();

ASSERT_GE(current_time, target_time);
}

TEST_F(TestCoroIoScheduler, Concurrent)
{
auto scheduler = coroutines::IoScheduler::get_instance();

auto per_task_overhead = [&] {
static constexpr std::chrono::milliseconds SmallestDelay{1};
auto start = coroutines::clock_t::now();
coroutines::sync_wait([scheduler]() -> coroutines::Task<> {
co_await scheduler->yield_for(SmallestDelay);
}());
auto stop = coroutines::clock_t::now();
return (stop - start) - SmallestDelay;
}();

static constexpr std::chrono::milliseconds TaskDuration{10};

auto task = [scheduler]() -> coroutines::Task<> {
co_await scheduler->yield_for(10ms);
co_await scheduler->yield_for(TaskDuration);
};

auto start = coroutines::clock_t::now();

std::vector<coroutines::Task<>> tasks;

for (uint32_t i = 0; i < 1000; i++)
const uint32_t NumTasks{1'000};
for (uint32_t i = 0; i < NumTasks; i++)
{
tasks.push_back(task());
}

coroutines::sync_wait(coroutines::when_all(std::move(tasks)));
auto stop = coroutines::clock_t::now();

ASSERT_LT(coroutines::clock_t::now() - start, 20ms);
ASSERT_LT(stop - start, TaskDuration + per_task_overhead * NumTasks);
}