diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 33ef7d16b..b50e1f850 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -16,66 +16,138 @@ #include // create_subscriber_with_state #include // own forwarding #include // constraint::subscriber_of_type +#include + +#include IMPLEMENTATION_FILE(delay_tag); namespace rpp::details { -/** - * Functor (type-erasure) of "delay" for on_next operator. - */ -struct delay_on_next +struct completion {}; + +template +class queue_based_worker final : public std::enable_shared_from_this> { - schedulers::duration delay; +public: + queue_based_worker(schedulers::duration delay, Worker&& worker, const Subscriber& subscriber) + : m_delay{delay} + , m_worker{std::move(worker)} + , m_subscriber{subscriber} {} - void operator()(auto&& value, const auto& subscriber, const auto& worker) const + queue_based_worker(schedulers::duration delay, Worker&& worker, Subscriber&& subscriber) + : m_delay{delay} + , m_worker{std::move(worker)} + , m_subscriber{std::move(subscriber)} {} + + struct on_next { - worker.schedule(delay, - [value = std::forward(value), subscriber]() - { - subscriber.on_next(std::move(value)); - return schedulers::optional_duration{}; - }); - } -}; + void operator()(auto&& value, const std::shared_ptr>& state) const + { + state->emplace(std::forward(value)); + } + }; -/** - * Functor (type-erasure) of "delay" for on_error operator. - */ -struct delay_on_error -{ - void operator()(const std::exception_ptr& err, const auto& subscriber, const auto& worker) const + struct on_error { - // on-error must be delivered as soon as possible - worker.schedule([err, subscriber]() - { - subscriber.on_error(err); - return schedulers::optional_duration{}; - }); + void operator()(const std::exception_ptr& err, const std::shared_ptr>& state) const + { + state->emplace(err); + } + }; + + struct on_completed + { + void operator()(const std::shared_ptr>& state) const + { + state->emplace(completion{}); + } + }; + +private: + template + void emplace(TT&& item) + { + if (const auto timepoint = emplace_safe(std::forward(item))) + { + m_worker.schedule(timepoint.value(), + [state = this->shared_from_this()]()-> schedulers::optional_duration + { + return state->drain_queue(); + }); + } } -}; -/** - * Functor (type-erasure) of "delay" for on_completed operator. - */ -struct delay_on_completed -{ - schedulers::duration delay; + template + std::optional emplace_safe(TT&& item) + { + std::lock_guard lock{m_mutex}; + const auto delay = std::is_same_v> ? schedulers::duration{0} : m_delay; + m_queue.emplace(++m_current_id, m_worker.now()+delay, std::forward(item)); + if (!m_active && m_queue.size() == 1) + { + m_active = true; + return m_queue.top().time; + } + return {}; + } - void operator()(const auto& subscriber, const auto& worker) const + schedulers::optional_duration drain_queue() { - worker.schedule(delay, - [subscriber]() - { - subscriber.on_completed(); - return schedulers::optional_duration{}; - }); + while (true) + { + std::unique_lock lock{m_mutex}; + if (m_queue.empty()) + { + m_active = false; + return {}; + } + + auto& top = m_queue.top(); + const auto now = m_worker.now(); + if (top.time > now) + return top.time - now; + + auto item = std::move(top.item); + m_queue.pop(); + lock.unlock(); + + std::visit(utils::overloaded + { + [&](T&& v) { m_subscriber.on_next(std::move(v)); }, + [&](const std::exception_ptr& err) { m_subscriber.on_error(err); }, + [&](completion) { m_subscriber.on_completed(); } + }, + std::move(item)); + } } + +private: + struct emission + { + template + emission(size_t id, schedulers::time_point time, TT&& item) + : id{id} + , time{std::move(time)} + , item{std::forward(item)} {} + + size_t id{}; + schedulers::time_point time{}; + std::variant item{}; + + bool operator<(const emission& other) const { return std::tie(time, id) >= std::tie(other.time, other.id); } + }; + + schedulers::duration m_delay; + Worker m_worker; + Subscriber m_subscriber; + std::mutex m_mutex{}; + size_t m_current_id{}; + std::priority_queue m_queue{}; + bool m_active{}; }; -/** - * \brief Functor of OperatorFn for "combine_latest" operator (used by "lift"). - */ + template struct delay_impl { @@ -86,14 +158,16 @@ struct delay_impl auto operator()(TSub&& subscriber) const { auto worker = scheduler.create_worker(subscriber.get_subscription()); - auto subscription = subscriber.get_subscription().make_child(); + + using state_t = queue_based_worker, std::decay_t>; + auto state = std::make_shared(delay, std::move(worker), std::forward(subscriber)); + return create_subscriber_with_state(std::move(subscription), - delay_on_next{delay}, - delay_on_error{}, - delay_on_completed{delay}, - std::forward(subscriber), - std::move(worker)); + typename state_t::on_next{}, + typename state_t::on_error{}, + typename state_t::on_completed{}, + std::move(state)); } }; } // namespace rpp::details diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index dfafa9ae7..bdd78ea7c 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -13,17 +13,18 @@ #include #include #include +#include #include #include #include SCENARIO("delay mirrors both source observable and trigger observable", "[delay]") { + auto mock = mock_observer{}; std::chrono::milliseconds delay_duration{300}; GIVEN("observable of -1-|") { - auto mock = mock_observer{}; const auto now = rpp::schedulers::clock_type::now(); rpp::source::just(1) @@ -60,7 +61,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] GIVEN("observable of -x") { - auto mock = mock_observer{}; const auto now = rpp::schedulers::clock_type::now(); rpp::source::error(std::make_exception_ptr(std::runtime_error{""})) @@ -87,7 +87,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] GIVEN("observable of -|") { - auto mock = mock_observer{}; const auto now = rpp::schedulers::clock_type::now(); rpp::source::empty() @@ -111,4 +110,43 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay] CHECK(mock.get_on_error_count() == 0); } } + + GIVEN("subject with items") + { + auto subj = rpp::subjects::publish_subject{}; + + WHEN("subscribe on subject via delay and doing recursive submit from another thread") + { + THEN("all values obtained in the same thread") + { + auto current_thread = std::this_thread::get_id(); + + auto sub = subj.get_observable() + .delay(delay_duration, rpp::schedulers::trampoline{}) + .subscribe([&](int v) + { + CHECK(std::this_thread::get_id() == current_thread); + + mock.on_next(v); + + if (v == 1) + { + std::thread{[&]{subj.get_subscriber().on_next(2);}}.join(); + + THEN("no recursive on_next calls") + { + CHECK(mock.get_received_values() == std::vector{1}); + } + } + }); + + subj.get_subscriber().on_next(1); + + AND_THEN("all values obtained") + { + CHECK(mock.get_received_values() == std::vector{ 1, 2 }); + } + } + } + } } diff --git a/src/tests/rpp/test_observe_on.cpp b/src/tests/rpp/test_observe_on.cpp index 2ce5d90df..8b8480b1a 100644 --- a/src/tests/rpp/test_observe_on.cpp +++ b/src/tests/rpp/test_observe_on.cpp @@ -1,10 +1,10 @@ // ReactivePlusPlus library -// +// // Copyright Aleksey Loginov 2022 - present. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // https://www.boost.org/LICENSE_1_0.txt) -// +// // Project home: https://github.com/victimsnino/ReactivePlusPlus #include "copy_count_tracker.hpp" @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -39,8 +40,8 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" CHECK(mock.get_received_values() == vals); CHECK(mock.get_on_completed_count() == 1); - CHECK(scheduler.get_schedulings() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed - CHECK(scheduler.get_executions() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed + CHECK(scheduler.get_schedulings() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed + CHECK(scheduler.get_executions() == std::vector{ initial_time, initial_time, initial_time });//2 items + on_completed } } } @@ -85,34 +86,73 @@ SCENARIO("observe_on transfers emssions to scheduler", "[operators][observe_on]" } } } -} -SCENARIO("observe_on with immediate doesn't produce a lot of copies", "[operators][observe_on][track_copy]") -{ - GIVEN("observable with value by copy") + GIVEN("subject with items") { - auto tracker = copy_count_tracker{}; - WHEN("subscribe on it via scheduler") + auto int_mock = mock_observer{}; + auto subj = rpp::subjects::publish_subject{}; + WHEN("subscribe on subject via observe_on and doing recursive submit") { - tracker.get_observable().observe_on(rpp::schedulers::immediate{}).subscribe(); - THEN("only 1 extra copy") + auto sub = subj.get_observable() + .observe_on(scheduler) + .subscribe([&](int v) + { + int_mock.on_next(v); + + if (v == 1) + { + subj.get_subscriber().on_next(2); + THEN("no direct schedule to scheduler after recursive on_next") + { + CHECK(scheduler.get_schedulings() == std::vector{ initial_time }); + CHECK(scheduler.get_executions() == std::vector{ initial_time }); + CHECK(int_mock.get_received_values() == std::vector{1}); + } + } + }); + + subj.get_subscriber().on_next(1); + + THEN("second job executed without extra schedule") { - CHECK(tracker.get_copy_count() == 1); - CHECK(tracker.get_move_count() == 0); + CHECK(scheduler.get_schedulings() == std::vector{ initial_time }); + CHECK(scheduler.get_executions() == std::vector{ initial_time }); + CHECK(int_mock.get_received_values() == std::vector{ 1, 2 }); } } - } - GIVEN("observable with value by move") - { - auto tracker = copy_count_tracker{}; - WHEN("subscribe on it via scheduler") + + WHEN("subscribe on subject via observe_on trampoline and doing recursive submit from another thread") { - tracker.get_observable_for_move().observe_on(rpp::schedulers::immediate{}).subscribe(); - THEN("only 1 extra move") + THEN("all values obtained in the same thread") { - CHECK(tracker.get_copy_count() == 0); - CHECK(tracker.get_move_count() == 1); + auto current_thread = std::this_thread::get_id(); + + auto sub = subj.get_observable() + .observe_on(rpp::schedulers::trampoline{}) + .subscribe([&](int v) + { + CHECK(std::this_thread::get_id() == current_thread); + + int_mock.on_next(v); + + if (v == 1) + { + std::thread{[&]{subj.get_subscriber().on_next(2);}}.join(); + + THEN("no recursive on_next calls") + { + CHECK(int_mock.get_received_values() == std::vector{1}); + } + } + }); + + subj.get_subscriber().on_next(1); + + AND_THEN("all values obtained") + { + CHECK(int_mock.get_received_values() == std::vector{ 1, 2 }); + } } } } -} +} \ No newline at end of file diff --git a/src/tests/rppqt/test_main_thread_scheduler.cpp b/src/tests/rppqt/test_main_thread_scheduler.cpp index ff18d3987..2210202de 100644 --- a/src/tests/rppqt/test_main_thread_scheduler.cpp +++ b/src/tests/rppqt/test_main_thread_scheduler.cpp @@ -9,6 +9,7 @@ #include #include +#include #include