diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index 4f8ceae55..8f1849e3f 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -135,7 +135,7 @@ - [x] do_on_next - [x] do_on_error - [x] do_on_completed -- [ ] timeout +- [x] timeout ### Connectable diff --git a/src/benchmarks/rpp_benchmark.cpp b/src/benchmarks/rpp_benchmark.cpp index 335801e9b..52fd6eff6 100644 --- a/src/benchmarks/rpp_benchmark.cpp +++ b/src/benchmarks/rpp_benchmark.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -859,6 +860,26 @@ TEST_CASE("skip") }; } +TEST_CASE("timeout") +{ + + BENCHMARK_ADVANCED("timeout construction from observable via dot + subscribe with run_loop")(Catch::Benchmark::Chronometer meter) + { + const auto obs = rpp::observable::create([](const auto& sub) { sub.on_next(1); }); + auto sub = rpp::specific_subscriber{[](const int&) {}}; + rpp::schedulers::run_loop rl{}; + meter.measure([&] { return obs.timeout(std::chrono::days{30}, rl).subscribe(sub); }); + }; + + BENCHMARK_ADVANCED("sending of values from observable via timeout to subscriber with unreachable timeout interval with run_loop")(Catch::Benchmark::Chronometer meter) + { + rpp::schedulers::run_loop rl{}; + rpp::source::create([&](const auto& sub) { meter.measure([&] { sub.on_next(1); }); }) + .timeout(std::chrono::days{30}, rl) + .subscribe([](const auto&) {}); + }; +} + TEST_CASE("chains creation test") { BENCHMARK_ADVANCED("long non-state chain creation + subscribe")(Catch::Benchmark::Chronometer meter) diff --git a/src/benchmarks/rxcpp_benchmark.cpp b/src/benchmarks/rxcpp_benchmark.cpp index dc41a80f2..905de9abd 100644 --- a/src/benchmarks/rxcpp_benchmark.cpp +++ b/src/benchmarks/rxcpp_benchmark.cpp @@ -853,6 +853,25 @@ TEST_CASE("skip") }; } +TEST_CASE("timeout") +{ + BENCHMARK_ADVANCED("timeout construction from observable via dot + subscribe with run_loop")(Catch::Benchmark::Chronometer meter) + { + const auto obs = rxcpp::sources::create([](const auto& sub) { sub.on_next(1); }); + auto sub = rxcpp::make_subscriber([](const int&) {}); + rxcpp::schedulers::run_loop rl{}; + meter.measure([&] { return obs.timeout(std::chrono::days{30}, rxcpp::observe_on_run_loop(rl)).subscribe(sub); }); + }; + + BENCHMARK_ADVANCED("sending of values from observable via timeout to subscriber with unreachable timeout interval with run_loop")(Catch::Benchmark::Chronometer meter) + { + rxcpp::schedulers::run_loop rl{}; + rxcpp::sources::create([&](const auto& sub) { meter.measure([&] { sub.on_next(1); }); }) + .timeout(std::chrono::days{30}, rxcpp::observe_on_run_loop(rl)) + .subscribe([](const auto&) {}); + }; +} + TEST_CASE("chains creation test") { BENCHMARK_ADVANCED("long non-state chain creation + subscribe")(Catch::Benchmark::Chronometer meter) diff --git a/src/examples/doxygen/timeout.cpp b/src/examples/doxygen/timeout.cpp new file mode 100644 index 000000000..e7ef71281 --- /dev/null +++ b/src/examples/doxygen/timeout.cpp @@ -0,0 +1,42 @@ +#include + +#include + +/** + * \example timeout.cpp + **/ +int main() +{ + //! [timeout] + rpp::subjects::publish_subject subj{}; + subj.get_observable() + .timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{}) + .subscribe([](int v) { std::cout << "new value " << v << std::endl; }, + [](std::exception_ptr err) + { + try + { + std::rethrow_exception(err); + } + catch (const std::exception& exc) + { + std::cout << "ERR: " << exc.what() << std::endl; + } + }, + []() { std::cout << "completed" << std::endl; }); + for (int i = 0; i < 10; ++i) + { + std::this_thread::sleep_for(std::chrono::milliseconds{i * 100}); + subj.get_subscriber().on_next(i); + } + + // Output: + // new value 0 + // new value 1 + // new value 2 + // new value 3 + // new value 4 + // ERR : Timeout reached + //! [timeout] + return 0; +} diff --git a/src/rpp/rpp/observables/interface_observable.hpp b/src/rpp/rpp/observables/interface_observable.hpp index a27a9d182..ed276c578 100644 --- a/src/rpp/rpp/observables/interface_observable.hpp +++ b/src/rpp/rpp/observables/interface_observable.hpp @@ -66,6 +66,7 @@ struct RPP_EMPTY_BASES interface_observable , details::member_overload , details::member_overload , details::member_overload + , details::member_overload , details::member_overload , details::member_overload { diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index dbd94e1c4..cb5e97aa2 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -91,6 +91,7 @@ #include #include #include +#include /** * \defgroup connectable_operators Connectable Operators diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index 1aa551061..8fdf700a3 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -42,5 +42,6 @@ #include #include #include +#include #include #include diff --git a/src/rpp/rpp/operators/fwd/timeout.hpp b/src/rpp/rpp/operators/fwd/timeout.hpp new file mode 100644 index 000000000..07dd51214 --- /dev/null +++ b/src/rpp/rpp/operators/fwd/timeout.hpp @@ -0,0 +1,61 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +namespace rpp::details +{ +struct timeout_tag; +} + +namespace rpp::details +{ +template +struct timeout_impl; + +template +struct member_overload +{ + /** + * \brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission) + * + * \marble timeout + { + source observable : +--1-2-3-4------5-| + operator "timeout(4)" : +--1-2-3-4----# + } + * \param period is maximum duration between emitted items before a timeout occurs + * \param scheduler is scheduler used to run timer for timeout + * \return new specific_observable with the timeout operator as most recent operator. + * \warning #include + * + * \par Example + * \snippet timeout.cpp timeout + * + * \ingroup utility_operators + * \see https://reactivex.io/documentation/operators/timeout.html + */ + template + auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included + { + return static_cast(this)->template lift(timeout_impl{period, scheduler}); + } + + template + auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included + { + return std::move(*static_cast(this)).template lift(timeout_impl{period, scheduler}); + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp new file mode 100644 index 000000000..70815a834 --- /dev/null +++ b/src/rpp/rpp/operators/timeout.hpp @@ -0,0 +1,119 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include // required due to operator uses lift +#include +#include +#include +#include +#include +#include + +#include + +#include + +IMPLEMENTATION_FILE(timeout_tag); + +namespace rpp::details +{ +struct timeout_state : early_unsubscribe_state +{ + using early_unsubscribe_state::early_unsubscribe_state; + + std::atomic last_emission_time{}; + + static constexpr schedulers::time_point s_timeout_reached = schedulers::time_point::min(); +}; + +template +struct timeout_on_next +{ + template + void operator()(Value&& v, const auto& subscriber, const std::shared_ptr& state) const + { + if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state::s_timeout_reached) + subscriber.on_next(std::forward(v)); + } +}; + +using timeout_on_error = early_unsubscribe_on_error; +using timeout_on_completed = early_unsubscribe_on_completed; + +struct timeout_state_with_serialized_spinlock : timeout_state +{ + using timeout_state::timeout_state; + + // spinlock because most part of time there is only one thread would be active + utils::spinlock spinlock{}; +}; + +template +struct timeout_impl +{ + schedulers::duration period; + TScheduler scheduler; + + template TSub> + auto operator()(TSub&& in_subscriber) const + { + auto state = std::make_shared(in_subscriber.get_subscription()); + // change subscriber to serialized to avoid manual using of mutex + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), + std::shared_ptr{state, &state->spinlock}); + + const auto worker = scheduler.create_worker(state->children_subscriptions); + state->last_emission_time.store(worker.now(), std::memory_order_relaxed); + + const auto last_emission_time = state->last_emission_time.load(std::memory_order_relaxed); + worker.schedule(last_emission_time + period, + [period = period, prev_emission_time = last_emission_time, subscriber, state]() mutable -> schedulers::optional_duration + { + while (true) + { + // last emission time still same value -> timeout reached, else -> prev_emission_time + // would be update to actual emission time + if (state->last_emission_time.compare_exchange_strong(prev_emission_time, + timeout_state::s_timeout_reached, + std::memory_order_acq_rel)) + return time_is_out(state, subscriber); + + // if we still need to wait a bit more -> let's wait + if (const auto diff_to_schedule = (prev_emission_time + period) - decltype(worker)::now(); + diff_to_schedule > rpp::schedulers::duration{0}) + return diff_to_schedule; + + // okay, we here because: + // 1) last_emission_time was not equal to prev_emission_time + // 2) last_emission_time + period before now -> we are still in timeout state + // 3) prev_emission_time updated to last_emission_time + // So we can return to begin + } + }); + + return create_subscriber_with_state(state->children_subscriptions, + timeout_on_next{}, + timeout_on_error{}, + timeout_on_completed{}, + std::move(subscriber), + std::move(state)); + } + +private: + static schedulers::optional_duration time_is_out(const auto& state, const auto& subscriber) + { + state->children_subscriptions.unsubscribe(); + subscriber.on_error(std::make_exception_ptr(utils::timeout{"Timeout reached"})); + return std::nullopt; + } +}; +} // namespace rpp::details diff --git a/src/rpp/rpp/schedulers/details/worker.hpp b/src/rpp/rpp/schedulers/details/worker.hpp index 47f08f648..3cb76443e 100644 --- a/src/rpp/rpp/schedulers/details/worker.hpp +++ b/src/rpp/rpp/schedulers/details/worker.hpp @@ -75,6 +75,8 @@ class worker final : public details::worker_tag m_strategy.defer_at(time_point, std::forward(fn)); } + static time_point now() { return Strategy::now(); } + private: Strategy m_strategy; }; diff --git a/src/rpp/rpp/utils/exceptions.hpp b/src/rpp/rpp/utils/exceptions.hpp index 2a219c26d..fb45f99a0 100644 --- a/src/rpp/rpp/utils/exceptions.hpp +++ b/src/rpp/rpp/utils/exceptions.hpp @@ -14,10 +14,13 @@ namespace rpp::utils { - struct not_enough_emissions : std::runtime_error { using std::runtime_error::runtime_error; }; +struct timeout : std::runtime_error +{ + using std::runtime_error::runtime_error; +}; } // namespace rpp::utils diff --git a/src/tests/test_scheduler.hpp b/src/tests/test_scheduler.hpp index 8909744e8..6f9abb0c9 100644 --- a/src/tests/test_scheduler.hpp +++ b/src/tests/test_scheduler.hpp @@ -11,7 +11,7 @@ #include -static rpp::schedulers::time_point s_current_time{std::chrono::seconds{0}}; +static rpp::schedulers::time_point s_current_time{std::chrono::seconds{10}}; class test_scheduler final : public rpp::schedulers::details::scheduler_tag { diff --git a/src/tests/test_timeout.cpp b/src/tests/test_timeout.cpp new file mode 100644 index 000000000..95ffee2cd --- /dev/null +++ b/src/tests/test_timeout.cpp @@ -0,0 +1,114 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#include "mock_observer.hpp" +#include "test_scheduler.hpp" + +#include + +#include +#include +#include +#include +#include + +SCENARIO("timeout sends error only on timeout", "[operators][timeout]") +{ + auto mock = mock_observer{}; + auto interval_scheduler = test_scheduler{}; + auto timeout_scheduler = test_scheduler{}; + auto start_time = test_scheduler::worker_strategy::now(); + + GIVEN("interval observable") + { + constexpr rpp::schedulers::duration interval_duration = std::chrono::milliseconds{10}; + auto obs = rpp::source::interval(interval_duration, interval_scheduler); + WHEN("subscribe on it via timeout with period > interval period") + { + constexpr auto timeout_duration = std::chrono::milliseconds{15}; + auto sub = obs.take(5).timeout(timeout_duration, timeout_scheduler).subscribe(mock); + while (sub.is_subscribed()) + { + interval_scheduler.time_advance(std::chrono::milliseconds{1}); + timeout_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_scheduler uses same global time, but each of them drains different queues + } + + THEN("no any timeout events") + { + CHECK(mock.get_received_values() == std::vector {0, 1, 2, 3, 4}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration, + start_time + 4 * interval_duration, + start_time + 5 * interval_duration}); + CHECK(timeout_scheduler.get_executions() == + std::vector{start_time + timeout_duration, + start_time + interval_duration + timeout_duration, + start_time + 2 * interval_duration + timeout_duration, + start_time + 3 * interval_duration + timeout_duration}); + } + } + WHEN("subscribe on it via timeout with period < interval period") + { + constexpr auto timeout_duration = std::chrono::milliseconds{5}; + auto sub = obs.take(5).timeout(timeout_duration, timeout_scheduler).subscribe(mock); + while (sub.is_subscribed()) + { + interval_scheduler.time_advance(std::chrono::milliseconds{1}); + timeout_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_scheduler + // uses same global time, but each of them + // drains different queues + } + + THEN("timeout event happens before interval event") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + CHECK(interval_scheduler.get_executions() == std::vector{}); + CHECK(timeout_scheduler.get_executions() == std::vector{start_time + timeout_duration}); + } + } + WHEN("subscribe on it via timeout with period > interval period, but interval ends without completion") + { + constexpr auto timeout_duration = std::chrono::milliseconds{15}; + auto sub = obs.take(5).concat_with(rpp::source::never()).timeout(timeout_duration, timeout_scheduler).subscribe(mock); + while (sub.is_subscribed()) + { + interval_scheduler.time_advance(std::chrono::milliseconds{1}); + timeout_scheduler.time_advance(rpp::schedulers::duration{}); // empty due to actually test_scheduler + // uses same global time, but each of them + // drains different queues + } + + THEN("timeout event after last emission + timeout duration") + { + CHECK(mock.get_received_values() == std::vector{0, 1, 2, 3, 4}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + CHECK(interval_scheduler.get_executions() == + std::vector{start_time + interval_duration, + start_time + 2 * interval_duration, + start_time + 3 * interval_duration, + start_time + 4 * interval_duration, + start_time + 5 * interval_duration}); + CHECK(timeout_scheduler.get_executions() == + std::vector{start_time + timeout_duration, + start_time + interval_duration + timeout_duration, + start_time + 2 * interval_duration + timeout_duration, + start_time + 3 * interval_duration + timeout_duration, + start_time + 4 * interval_duration + timeout_duration, + start_time + 5 * interval_duration + timeout_duration}); + } + } + } +}