diff --git a/src/rpp/rpp/schedulers/new_thread.hpp b/src/rpp/rpp/schedulers/new_thread.hpp index 4aa176dec..6ceabe5b5 100644 --- a/src/rpp/rpp/schedulers/new_thread.hpp +++ b/src/rpp/rpp/schedulers/new_thread.hpp @@ -40,25 +40,9 @@ namespace rpp::schedulers }; } - ~disposable() override - { - if (!m_thread.joinable()) - return; - - { - std::lock_guard lock{m_state->mutex}; - m_state->is_destroying.store(true, std::memory_order::relaxed); - } - m_state->cv.notify_all(); - m_thread.detach(); - } - template Fn> void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { - if (is_disposed()) - return; - std::lock_guard lock{m_state->mutex}; // guarded by lock if (const auto queue = m_state->queue_ptr.load(std::memory_order::seq_cst)) @@ -87,7 +71,6 @@ namespace rpp::schedulers { std::atomic*> queue_ptr{}; std::atomic_bool is_disposed{}; - std::atomic_bool is_destroying{}; }; static void data_thread(std::shared_ptr state) @@ -95,16 +78,16 @@ namespace rpp::schedulers auto& queue = current_thread::s_queue; state->queue_ptr.store(&queue.emplace(state), std::memory_order::seq_cst); - while (!state->is_disposed.load(std::memory_order::seq_cst)) + while (true) { std::unique_lock lock{state->mutex}; - if (state->is_destroying.load(std::memory_order::seq_cst) && queue->is_empty()) + if (queue->is_empty() && state->is_disposed.load(std::memory_order::seq_cst)) break; - state->cv.wait(lock, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || !queue->is_empty() || state->is_destroying.load(std::memory_order::seq_cst); }); + state->cv.wait(lock, [&] { return !queue->is_empty() || state->is_disposed.load(std::memory_order::seq_cst); }); - if (state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst)) + if (queue->is_empty()) break; if (queue->top()->is_disposed()) @@ -117,7 +100,7 @@ namespace rpp::schedulers { if (const auto now = worker_strategy::now(); now < queue->top()->get_timepoint()) { - state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return state->is_disposed.load(std::memory_order::seq_cst) || state->is_destroying.load(std::memory_order::seq_cst) || worker_strategy::now() >= queue->top()->get_timepoint(); }); + state->cv.wait_for(lock, queue->top()->get_timepoint() - now, [&] { return queue->top()->is_disposed() || worker_strategy::now() >= queue->top()->get_timepoint(); }); continue; } } @@ -126,7 +109,8 @@ namespace rpp::schedulers lock.unlock(); if (const auto timepoint = (*top)()) - queue->emplace(timepoint.value(), std::move(top)); + if (!top->is_disposed()) + queue->emplace(timepoint.value(), std::move(top)); } std::unique_lock lock{state->mutex}; diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index bd0d07d37..3e40eeec8 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -895,4 +895,54 @@ TEST_CASE("different delaying strategies") CHECK(scheduler.get_schedulings() == std::vector{now, now + delay}); CHECK(scheduler.get_executions() == std::vector{now}); } +} + +TEST_CASE("current_thread inside new_thread") +{ + auto worker = std::optional{rpp::schedulers::new_thread{}.create_worker()}; + auto d = rpp::composite_disposable_wrapper::make(); + auto obs = std::optional{mock_observer_strategy{}.get_observer(d).as_dynamic()}; + auto started = std::make_shared(); + auto done = std::make_shared(); + + worker->schedule([&](const auto&) { + thread_local rpp::utils::finally_action th{[done] { + done->store(true); + }}; + return rpp::schedulers::optional_delay_from_now{}; + }, + obs.value()); + + auto current_thread_invoked = std::make_shared(); + + worker->schedule([&](const auto& obs) { + worker->get_disposable().dispose(); + + rpp::schedulers::current_thread{}.create_worker().schedule([current_thread_invoked](const auto&) { + current_thread_invoked->store(true); + return rpp::schedulers::optional_delay_from_now{}; + }, + obs); + + if (current_thread_invoked->load()) + throw std::runtime_error{"current_thread was invoked"}; + + started->store(true); + + return rpp::schedulers::optional_delay_from_now{}; + }, + obs.value()); + + while (!started->load()) + { + } + + worker.reset(); + obs.reset(); + d = rpp::composite_disposable_wrapper::empty(); + + std::this_thread::sleep_for(std::chrono::seconds{1}); + + REQUIRE(done->load()); + CHECK(current_thread_invoked->load()); } \ No newline at end of file