diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index f404f06b1..f882ebc2f 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -655,6 +655,24 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); }); } + SECTION("create(on_error())+retry(1)+subscribe") + { + TEST_RPP([&]() { + rpp::source::create([&](auto&& observer) { + observer.on_error({}); + }) + | rpp::operators::retry(1) + | rpp::operators::subscribe([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); }); + }); + + TEST_RXCPP([&]() { + rxcpp::observable<>::create([&](auto&& observer) { + observer.on_error({}); + }) + | rxcpp::operators::retry(1) + | rxcpp::operators::subscribe([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); }); + }); + } } // BENCHMARK("Error Handling Operators") BENCHMARK("Subjects") diff --git a/src/examples/rpp/doxygen/retry.cpp b/src/examples/rpp/doxygen/retry.cpp new file mode 100644 index 000000000..4daecab4f --- /dev/null +++ b/src/examples/rpp/doxygen/retry.cpp @@ -0,0 +1,30 @@ +#include + +#include +#include + +/** + * @example retry.cpp + **/ +int main() +{ + //! [retry] + rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error({})) + | rpp::operators::retry(2) + | rpp::operators::subscribe([](int v) { std::cout << v << " "; }, + [](const std::exception_ptr&) { std::cout << "error" << std::endl; }, + []() { std::cout << "completed" << std::endl; }); + // Output: 1 2 3 1 2 3 1 2 3 error + //! [retry] + + //! [retry_infinitely] + rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error({})) + | rpp::operators::retry() + | rpp::operators::take(10) + | rpp::operators::subscribe([](int v) { std::cout << v << " "; }, + [](const std::exception_ptr&) { std::cout << "error" << std::endl; }, + []() { std::cout << "completed" << std::endl; }); + // Output: 1 2 3 1 2 3 1 2 3 1 completed + //! [retry_infinitely] + return 0; +} diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index bc7c75bee..53b225d0a 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -129,3 +129,4 @@ */ #include +#include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index a8d73803e..8651ee668 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -106,6 +106,10 @@ namespace rpp::operators auto repeat(); + auto retry(size_t count); + + auto retry(); + template requires (!utils::is_not_template_callable || std::same_as, std::invoke_result_t &&, rpp::utils::convertible_to_any>>) auto scan(InitialValue&& initial_value, Fn&& accumulator); diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 6d064cadc..78a4d6d3f 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -60,7 +60,7 @@ namespace rpp::operators::details disposable.dispose(); } - void set_upstream(const disposable_wrapper& d) + void set_upstream(const disposable_wrapper& d) const { disposable.add(d); } diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp new file mode 100644 index 000000000..4362f2ea4 --- /dev/null +++ b/src/rpp/rpp/operators/retry.hpp @@ -0,0 +1,182 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +namespace rpp::operators::details +{ + template + struct retry_state_t final : public rpp::composite_disposable + { + retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional count) + : count{count} + , observer(std::move(in_observer)) + , observable(observable) + + { + } + + std::optional count; + std::atomic is_inside_drain{}; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS Observable observable; + }; + + template + void drain(const std::shared_ptr>& state); + + template + struct retry_observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr> state; + mutable bool locally_disposed{}; + + template + void on_next(T&& v) const + { + state->observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + locally_disposed = true; + if (state->count == 0) + { + state->observer.on_error(err); + state->dispose(); + return; + } + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + + drain(state); + } + + void on_completed() const + { + locally_disposed = true; + state->observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) const + { + state->add(d); + } + + bool is_disposed() const { return locally_disposed || state->is_disposed(); } + }; + + template + void drain(const std::shared_ptr>& state) + { + while (!state->is_disposed()) + { + if (state->count) + --state->count.value(); + state->clear(); + state->is_inside_drain.store(true, std::memory_order::seq_cst); + try + { + using value_type = rpp::utils::extract_observer_type_t; + state->observable.subscribe(observer>{state}); + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + } + catch (...) + { + state->observer.on_error(std::current_exception()); + return; + } + } + } + + struct retry_t + { + const std::optional count{}; + + template + struct operator_traits + { + using result_type = T; + }; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + void subscribe(TObserver&& observer, TObservable&& observble) const + { + const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); + drain(ptr); + } + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief The retry operator attempts to resubscribe to the observable when an error occurs, up to the specified number of retries. + * + * @marble retry + { + source observable : +-1-x + operator "retry:(2)" : +-1-1-1-x + } + * + * @param count is the number of retries + * + * @warning #include + * + * @par Examples: + * @snippet retry.cpp retry + * + * @ingroup error_handling_operators + * @see https://reactivex.io/documentation/operators/retry.html + */ + inline auto retry(size_t count) + { + return details::retry_t{count}; + } + + /** + * @brief The infinite retry operator continuously attempts to resubscribe to the observable upon error, without a retry limit. + * + * @marble infinite_retry + { + source observable : +-1-x + operator "retry:()" : +-1-1-1-1-1-1-1-1-1-1-1-> + } + * + * @warning #include + * + * @par Examples: + * @snippet retry.cpp retry_infinitely + * + * @ingroup error_handling_operators + * @see https://reactivex.io/documentation/operators/retry.html + */ + inline auto retry() + { + return details::retry_t{}; + } +} // namespace rpp::operators diff --git a/src/tests/rpp/test_repeat.cpp b/src/tests/rpp/test_repeat.cpp index 9d1635d13..b7ddaf35f 100644 --- a/src/tests/rpp/test_repeat.cpp +++ b/src/tests/rpp/test_repeat.cpp @@ -18,6 +18,7 @@ #include "copy_count_tracker.hpp" #include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" TEST_CASE("repeat resubscribes") @@ -135,3 +136,22 @@ TEST_CASE("repeat satisfies disposable contracts") { test_operator_with_disposable(rpp::ops::repeat()); } + + +TEST_CASE("repeat handles stack overflow") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + constexpr size_t count = 500000; + + REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + rpp::source::create([](const auto& obs) { + obs.on_next(1); + obs.on_completed(); + }) + | rpp::operators::repeat(count) + | rpp::operators::subscribe(mock); +} diff --git a/src/tests/rpp/test_retry.cpp b/src/tests/rpp/test_retry.cpp new file mode 100644 index 000000000..450f5a549 --- /dev/null +++ b/src/tests/rpp/test_retry.cpp @@ -0,0 +1,178 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" + +TEST_CASE("retry handles errors properly") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + SECTION("observable 1-x-2") + { + const auto observable = rpp::source::concat(rpp::source::just(1), rpp::source::error({}), rpp::source::just(2)); + + SECTION("retry(0)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(0) | rpp::operators::subscribe(mock); + } + + SECTION("retry(1)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(1) | rpp::operators::subscribe(mock); + } + + SECTION("retry(2)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); + } + + SECTION("retry()") + { + auto d = rpp::composite_disposable_wrapper::make(); + + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).LR_SIDE_EFFECT(d.dispose()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry() | rpp::operators::subscribe(d, mock); + } + } + SECTION("observable 1-|") + { + const auto observable = rpp::source::just(1); + + SECTION("retry(0)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(0) | rpp::operators::subscribe(mock); + } + + SECTION("retry(2)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); + } + + SECTION("retry()") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable | rpp::operators::retry() | rpp::operators::subscribe(mock); + } + } + SECTION("observable 1->") + { + const auto observable = rpp::source::concat(rpp::source::just(1), rpp::source::never()); + + SECTION("retry(0)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(0) | rpp::operators::subscribe(mock); + } + + SECTION("retry(2)") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); + } + + SECTION("retry()") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry() | rpp::operators::subscribe(mock); + } + } +} + +TEST_CASE("retry handles stack overflow") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + constexpr size_t count = 500000; + + REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count + 1).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + rpp::source::create([](const auto& obs) { + obs.on_next(1); + obs.on_error({}); + }) + | rpp::operators::retry(count) + | rpp::operators::subscribe(mock); +} + +TEST_CASE("retry doesn't produce extra copies") +{ + SECTION("retry(2)") + { + copy_count_tracker::test_operator(rpp::ops::retry(2), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }); + } + SECTION("retry()") + { + copy_count_tracker::test_operator(rpp::ops::retry(), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }); + } +} + +TEST_CASE("retry satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::retry(1)); + + test_operator_over_observable_with_disposable([](const auto& observable) { + return rpp::source::concat(observable, rpp::source::error({})) | rpp::ops::retry(10); + }); +}