From e661682c62768eb4e4eaf88dd3bb603d9054a8db Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sun, 18 Aug 2024 21:15:11 +0200 Subject: [PATCH 01/12] retry_when operator --- src/rpp/rpp/operators.hpp | 1 + src/rpp/rpp/operators/fwd.hpp | 4 + src/rpp/rpp/operators/retry_when.hpp | 158 +++++++++++++++++++++++++ src/tests/rpp/test_retry_when.cpp | 171 +++++++++++++++++++++++++++ 4 files changed, 334 insertions(+) create mode 100644 src/rpp/rpp/operators/retry_when.hpp create mode 100644 src/tests/rpp/test_retry_when.cpp diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index bc7c75bee..cd2cc3da2 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -129,3 +129,4 @@ */ #include +#include diff --git a/src/rpp/rpp/operators/fwd.hpp b/src/rpp/rpp/operators/fwd.hpp index a8d73803e..cf3d4947f 100644 --- a/src/rpp/rpp/operators/fwd.hpp +++ b/src/rpp/rpp/operators/fwd.hpp @@ -182,6 +182,10 @@ namespace rpp::operators requires rpp::constraint::observable> auto on_error_resume_next(Selector&& selector); + template + requires rpp::constraint::observable> + auto retry_when(Notifier&& notifier); + template requires (!rpp::constraint::observable && (!utils::is_not_template_callable || std::invocable, utils::extract_observable_type_t...>)) auto with_latest_from(TSelector&& selector, TObservable&& observable, TObservables&&... observables); diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp new file mode 100644 index 000000000..b4266cdcf --- /dev/null +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -0,0 +1,158 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace rpp::operators::details +{ + template + struct retry_when_impl_inner_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; + RPP_NO_UNIQUE_ADDRESS TObservable observable; + RPP_NO_UNIQUE_ADDRESS Notifier notifier; + + template + void on_next(T&&) const + { + observable.subscribe(std::move(observer)); + } + + void on_error(const std::exception_ptr& err) const + { + observer.on_error(err); + } + + void on_completed() const + { + observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } + }; + + template + struct retry_when_impl_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; + RPP_NO_UNIQUE_ADDRESS TObservable observable; + RPP_NO_UNIQUE_ADDRESS Notifier notifier; + + template + void on_next(T&& v) const + { + observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + std::optional> notifier_obs; + try + { + notifier_obs.emplace(notifier(err)); + } + catch (...) + { + observer.on_error(std::current_exception()); + } + if (notifier_obs.has_value()) + { + std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy{std::move(observer), std::move(observable), std::move(notifier)}); + } + } + + void on_completed() const + { + observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } + }; + + template + struct retry_when_impl_t : lift_operator, TObservable, Notifier> + { + using lift_operator, TObservable, Notifier>::lift_operator; + + template + struct operator_traits + { + using result_type = T; + + template TObserver> + using observer_strategy = retry_when_impl_strategy; + }; + + template + using updated_disposable_strategy = Prev; + }; + + template + struct retry_when_t + { + RPP_NO_UNIQUE_ADDRESS Notifier notifier; + + template + auto operator()(TObservable&& observable) const & + { + return std::forward(observable) + | retry_when_impl_t, std::decay_t>{observable, notifier}; + } + + template + auto operator()(TObservable&& observable) && + { + return std::forward(observable) + | retry_when_impl_t, std::decay_t>{observable, std::forward(notifier)}; + } + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief If an error occurs, invoke @p notifier and when returned observable emits a value + * resubscribe to the source observable. If the notifier throws or returns an error/empty + * observable, then error/completed emission is forwarded to original subscription. + * + * @param notifier callable taking a std::exception_ptr and returning observable notifying when to resubscribe + * + * @warning #include + * + * @ingroup error_handling_operators + * @see https://reactivex.io/documentation/operators/retry.html + */ + template + requires rpp::constraint::observable> + auto retry_when(Notifier&& notifier) + { + return details::retry_when_t>{std::forward(notifier)}; + } +} // namespace rpp::operators \ No newline at end of file diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp new file mode 100644 index 000000000..b0aa20a7b --- /dev/null +++ b/src/tests/rpp/test_retry_when.cpp @@ -0,0 +1,171 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" + + +TEST_CASE("retry_when resubscribes on notifier emission") +{ + auto observer = mock_observer_strategy(); + SECTION("observable without error emission") + { + size_t subscribe_count = 0; + auto observable = rpp::source::create([&subscribe_count](const auto& sub) { + ++subscribe_count; + sub.on_next(std::string{"1"}); + sub.on_completed(); + }); + SECTION("subscribe") + { + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) + | rpp::operators::subscribe(observer); + SECTION("observer obtains values from observable") + { + CHECK(subscribe_count == 1); + CHECK(observer.get_total_on_next_count() == 1); + CHECK(observer.get_on_error_count() == 0); + CHECK(observer.get_on_completed_count() == 1); + } + } + } + + SECTION("observable with 1 error") + { + size_t subscribe_count = 0; + auto observable = rpp::source::create([&subscribe_count](const auto& sub) { + if (subscribe_count++ == 0) + { + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + } + else + { + sub.on_next(std::string{"1"}); + sub.on_completed(); + } + }); + + SECTION("subscribe on it with single emission notifier") + { + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) + | rpp::operators::subscribe(observer); + SECTION("original observable is subscribed twice and observer receives one emission") + { + CHECK(subscribe_count == 2); + CHECK(observer.get_total_on_next_count() == 1); + CHECK(observer.get_on_error_count() == 0); + CHECK(observer.get_on_completed_count() == 1); + } + } + + SECTION("subscribe on it with multiple emissions notifier") + { + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1, 2, 3); }) + | rpp::operators::subscribe(observer); + SECTION("original observable is subscribed twice and observer receives only one emission") + { + CHECK(subscribe_count == 2); + CHECK(observer.get_total_on_next_count() == 1); + CHECK(observer.get_on_error_count() == 0); + CHECK(observer.get_on_completed_count() == 1); + } + } + + SECTION("subscribe on it with throwing notifier") + { + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { + throw std::runtime_error{"error"}; + return rpp::source::just(1); + }) + | rpp::operators::subscribe(observer); + SECTION("original observable is subscribed only once and observer receives error emission") + { + CHECK(subscribe_count == 1); + CHECK(observer.get_total_on_next_count() == 0); + CHECK(observer.get_on_error_count() == 1); + CHECK(observer.get_on_completed_count() == 0); + } + } + + SECTION("subscribe on it with error notifier") + { + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { + return rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"})); + }) + | rpp::operators::subscribe(observer); + SECTION("original observable is subscribed only once and observer receives error emission") + { + CHECK(subscribe_count == 1); + CHECK(observer.get_total_on_next_count() == 0); + CHECK(observer.get_on_error_count() == 1); + CHECK(observer.get_on_completed_count() == 0); + } + } + + SECTION("subscribe on it with empty notifier") + { + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { + return rpp::source::empty(); + }) + | rpp::operators::subscribe(observer); + SECTION("original observable is subscribed only once and observer receives completed emission") + { + CHECK(subscribe_count == 1); + CHECK(observer.get_total_on_next_count() == 0); + CHECK(observer.get_on_error_count() == 0); + CHECK(observer.get_on_completed_count() == 1); + } + } + } +} + +TEST_CASE("retry_when doesn't produce extra copies") +{ + SECTION("retry_when(empty_notifier)") + { + copy_count_tracker::test_operator(rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); }), + { + .send_by_copy = {.copy_count = 1, // 1 copy to final subscriber + .move_count = 0}, + .send_by_move = {.copy_count = 0, + .move_count = 1} // 1 move to final subscriber + }); + } +} + +TEST_CASE("retry_when satisfies disposable contracts") +{ + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + { + auto observable = observable_with_disposable(observable_disposable); + auto op = rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); }); + + test_operator_with_disposable(op); + test_operator_finish_before_dispose(op); + } + + CHECK((observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2)); +} \ No newline at end of file From 9b232082865f0fe1ec77bbf68dcbd0dfa6653f05 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 18 Aug 2024 19:15:56 +0000 Subject: [PATCH 02/12] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/retry_when.hpp | 2 +- src/tests/rpp/test_retry_when.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index b4266cdcf..546601fd6 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -155,4 +155,4 @@ namespace rpp::operators { return details::retry_when_t>{std::forward(notifier)}; } -} // namespace rpp::operators \ No newline at end of file +} // namespace rpp::operators diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index b0aa20a7b..0498ea23a 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -168,4 +168,4 @@ TEST_CASE("retry_when satisfies disposable contracts") } CHECK((observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2)); -} \ No newline at end of file +} From 4f5f845a34ea51860443fecef047a2e38d489028 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Sun, 18 Aug 2024 21:18:41 +0200 Subject: [PATCH 03/12] Add missing mutables --- src/rpp/rpp/operators/retry_when.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 546601fd6..e58fcdbc7 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -59,9 +59,9 @@ namespace rpp::operators::details { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; - RPP_NO_UNIQUE_ADDRESS TObservable observable; - RPP_NO_UNIQUE_ADDRESS Notifier notifier; + RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; + RPP_NO_UNIQUE_ADDRESS mutable TObservable observable; + RPP_NO_UNIQUE_ADDRESS mutable Notifier notifier; template void on_next(T&& v) const From 883c8a6ba39a56f2d7e661ae31d386c0a104fc32 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Mon, 19 Aug 2024 21:43:34 +0200 Subject: [PATCH 04/12] Address comments --- src/rpp/rpp/operators/retry_when.hpp | 106 ++++++++++++++++++--------- src/tests/rpp/test_retry_when.cpp | 37 ++++++++++ 2 files changed, 108 insertions(+), 35 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index e58fcdbc7..a53367d6a 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -22,84 +22,110 @@ namespace rpp::operators::details { template + rpp::constraint::decayed_type TNotifier> + struct retry_when_state : public rpp::composite_disposable + { + template + retry_when_state(Observer&& observer, Observable&& observable, Notifier&& notifier) + : observer(std::forward(observer)) + , observable(std::forward(observable)) + , notifier(std::forward(notifier)) + { + } + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS TObservable observable; + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + + bool retrying{}; + }; + + template + struct retry_when_impl_strategy; + + template struct retry_when_impl_inner_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; - RPP_NO_UNIQUE_ADDRESS TObservable observable; - RPP_NO_UNIQUE_ADDRESS Notifier notifier; + std::shared_ptr> state; template void on_next(T&&) const { - observable.subscribe(std::move(observer)); + state->retrying = true; + state->clear(); + state->observable.subscribe(rpp::observer, retry_when_impl_strategy, std::decay_t, std::decay_t>>(std::move(state))); } void on_error(const std::exception_ptr& err) const { - observer.on_error(err); + if (!state->retrying) + state->observer.on_error(err); } void on_completed() const { - observer.on_completed(); + if (!state->retrying) + state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + void set_upstream(const disposable_wrapper& d) { state->observer.set_upstream(d); } - bool is_disposed() const { return observer.is_disposed(); } + bool is_disposed() const { return state->observer.is_disposed(); } }; template + rpp::constraint::decayed_type TNotifier> struct retry_when_impl_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; - RPP_NO_UNIQUE_ADDRESS mutable TObservable observable; - RPP_NO_UNIQUE_ADDRESS mutable Notifier notifier; + std::shared_ptr> state; template void on_next(T&& v) const { - observer.on_next(std::forward(v)); + state->observer.on_next(std::forward(v)); } void on_error(const std::exception_ptr& err) const { - std::optional> notifier_obs; + std::optional> notifier_obs; try { - notifier_obs.emplace(notifier(err)); + notifier_obs.emplace(state->notifier(err)); } - catch (...) + catch (const std::exception& e) { - observer.on_error(std::current_exception()); + state->observer.on_error(std::current_exception()); } if (notifier_obs.has_value()) { - std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy{std::move(observer), std::move(observable), std::move(notifier)}); + std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy{state}); } } void on_completed() const { - observer.on_completed(); + state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return observer.is_disposed(); } + bool is_disposed() const { return state->observer.is_disposed(); } }; - template - struct retry_when_impl_t : lift_operator, TObservable, Notifier> + template + struct retry_when_impl_t { - using lift_operator, TObservable, Notifier>::lift_operator; + RPP_NO_UNIQUE_ADDRESS TObservable observable; + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; template struct operator_traits @@ -107,30 +133,40 @@ namespace rpp::operators::details using result_type = T; template TObserver> - using observer_strategy = retry_when_impl_strategy; + using observer_strategy = retry_when_impl_strategy; }; template - using updated_disposable_strategy = Prev; + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + + template + auto lift(Observer&& observer) const + { + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), observable, notifier); + auto ptr = d.lock(); + ptr->observer.set_upstream(d.as_weak()); + + return rpp::observer, std::decay_t, std::decay_t>>(std::move(ptr)); + } }; - template + template struct retry_when_t { - RPP_NO_UNIQUE_ADDRESS Notifier notifier; + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; template auto operator()(TObservable&& observable) const & { return std::forward(observable) - | retry_when_impl_t, std::decay_t>{observable, notifier}; + | retry_when_impl_t, std::decay_t>{observable, notifier}; } template auto operator()(TObservable&& observable) && { return std::forward(observable) - | retry_when_impl_t, std::decay_t>{observable, std::forward(notifier)}; + | retry_when_impl_t, std::decay_t>{observable, std::forward(notifier)}; } }; } // namespace rpp::operators::details @@ -149,10 +185,10 @@ namespace rpp::operators * @ingroup error_handling_operators * @see https://reactivex.io/documentation/operators/retry.html */ - template - requires rpp::constraint::observable> - auto retry_when(Notifier&& notifier) + template + requires rpp::constraint::observable> + auto retry_when(TNotifier&& notifier) { - return details::retry_when_t>{std::forward(notifier)}; + return details::retry_when_t>{std::forward(notifier)}; } } // namespace rpp::operators diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index 0498ea23a..40a158916 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -140,6 +141,36 @@ TEST_CASE("retry_when resubscribes on notifier emission") } } } + + SECTION("observable with 4 errors") + { + size_t subscribe_count = 0; + auto observable = rpp::source::create([&subscribe_count](const auto& sub) { + if (subscribe_count++ < 4) + { + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + } + else + { + sub.on_next(std::string{"1"}); + sub.on_completed(); + } + }); + + SECTION("subscribe on it with single emission notifier") + { + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) + | rpp::operators::subscribe(observer); + SECTION("original observable is subscribed 5 times and observer receives one emission") + { + CHECK(subscribe_count == 4 + 1); + CHECK(observer.get_total_on_next_count() == 1); + CHECK(observer.get_on_error_count() == 0); + CHECK(observer.get_on_completed_count() == 1); + } + } + } } TEST_CASE("retry_when doesn't produce extra copies") @@ -165,6 +196,12 @@ TEST_CASE("retry_when satisfies disposable contracts") test_operator_with_disposable(op); test_operator_finish_before_dispose(op); + + test_operator_over_observable_with_disposable( + [](auto observable) { + return rpp::source::concat(observable, rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"}))) + | rpp::ops::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }); + }); } CHECK((observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2)); From 003df79edda253ac3d51e241cd26ec3187ac9783 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Tue, 20 Aug 2024 21:00:35 +0200 Subject: [PATCH 05/12] wip --- src/rpp/rpp/operators/retry_when.hpp | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index a53367d6a..4cab45a20 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -23,7 +23,7 @@ namespace rpp::operators::details template - struct retry_when_state : public rpp::composite_disposable + struct retry_when_state final : public rpp::composite_disposable { template retry_when_state(Observer&& observer, Observable&& observable, Notifier&& notifier) @@ -33,11 +33,11 @@ namespace rpp::operators::details { } + bool retrying{}; + RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - - bool retrying{}; }; template void on_next(T&&) const { - state->retrying = true; - state->clear(); - state->observable.subscribe(rpp::observer, retry_when_impl_strategy, std::decay_t, std::decay_t>>(std::move(state))); + if (!state->retrying) + { + state->retrying = true; + state->clear(); + state->observable.subscribe(rpp::observer, retry_when_impl_strategy, std::decay_t, std::decay_t>>(std::move(state))); + } } void on_error(const std::exception_ptr& err) const @@ -76,7 +79,7 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) { state->observer.set_upstream(d); } - bool is_disposed() const { return state->observer.is_disposed(); } + bool is_disposed() const { return state->retrying; } }; templatenotifier(err)); } - catch (const std::exception& e) + catch (...) { state->observer.on_error(std::current_exception()); } if (notifier_obs.has_value()) { + state->retrying = false; std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy{state}); } } From 23514f833a343a2a0b9219b615110f95dcf631f6 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Tue, 20 Aug 2024 22:25:14 +0200 Subject: [PATCH 06/12] wip --- src/rpp/rpp/operators/retry_when.hpp | 72 +++++++++++++++------------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 4cab45a20..e10cddc2c 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -33,17 +33,16 @@ namespace rpp::operators::details { } - bool retrying{}; + bool retrying{}; + std::atomic_bool is_inside_drain{}; RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; }; - template - struct retry_when_impl_strategy; + template + void drain(const std::shared_ptr, std::decay_t, std::decay_t>>& state); template void on_next(T&&) const { - if (!state->retrying) - { - state->retrying = true; - state->clear(); - state->observable.subscribe(rpp::observer, retry_when_impl_strategy, std::decay_t, std::decay_t>>(std::move(state))); - } + drain(state); } void on_error(const std::exception_ptr& err) const { - if (!state->retrying) + if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) state->observer.on_error(err); } void on_completed() const { - if (!state->retrying) + if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) state->observer.on_completed(); } @@ -110,7 +104,6 @@ namespace rpp::operators::details } if (notifier_obs.has_value()) { - state->retrying = false; std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy{state}); } } @@ -122,35 +115,48 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return state->observer.is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } }; + template + void drain(const std::shared_ptr, std::decay_t, std::decay_t>>& state) + { + while (!state->is_disposed()) + { + state->clear(); + state->is_inside_drain.store(true, std::memory_order::seq_cst); + try + { + using value_type = rpp::utils::extract_observable_type_t; + state->observable.subscribe(rpp::observer>{state}); + + if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; + } + catch (...) + { + state->observer.on_error(std::current_exception()); + return; + } + } + } + template struct retry_when_impl_t { RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - template - struct operator_traits - { - using result_type = T; - - template TObserver> - using observer_strategy = retry_when_impl_strategy; - }; - - template - using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; + using value_type = rpp::utils::extract_observable_type_t; - template - auto lift(Observer&& observer) const + template + void subscribe(TObserver&& observer) const { - const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), observable, notifier); + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), observable, notifier); auto ptr = d.lock(); ptr->observer.set_upstream(d.as_weak()); - return rpp::observer, std::decay_t, std::decay_t>>(std::move(ptr)); + drain(ptr); } }; @@ -162,15 +168,13 @@ namespace rpp::operators::details template auto operator()(TObservable&& observable) const & { - return std::forward(observable) - | retry_when_impl_t, std::decay_t>{observable, notifier}; + return rpp::observable, retry_when_impl_t, std::decay_t>>{observable, notifier}; } template auto operator()(TObservable&& observable) && { - return std::forward(observable) - | retry_when_impl_t, std::decay_t>{observable, std::forward(notifier)}; + return rpp::observable, retry_when_impl_t, std::decay_t>>{observable, std::forward(notifier)}; } }; } // namespace rpp::operators::details From 352d10d366c612c043ef5fb39122c95e353973ba Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Tue, 20 Aug 2024 22:37:29 +0200 Subject: [PATCH 07/12] wip --- src/rpp/rpp/operators/retry_when.hpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index e10cddc2c..9afb6ef21 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -33,7 +33,6 @@ namespace rpp::operators::details { } - bool retrying{}; std::atomic_bool is_inside_drain{}; RPP_NO_UNIQUE_ADDRESS TObserver observer; @@ -52,28 +51,32 @@ namespace rpp::operators::details using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; std::shared_ptr> state; + mutable bool locally_disposed{}; template void on_next(T&&) const { + locally_disposed = true; drain(state); } void on_error(const std::exception_ptr& err) const { + locally_disposed = true; if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) state->observer.on_error(err); } void on_completed() const { + locally_disposed = true; if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) state->observer.on_completed(); } void set_upstream(const disposable_wrapper& d) { state->observer.set_upstream(d); } - bool is_disposed() const { return state->retrying; } + bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; template Date: Tue, 20 Aug 2024 23:33:43 +0200 Subject: [PATCH 08/12] working drain approach --- src/rpp/rpp/operators/retry_when.hpp | 89 ++++++++---------- src/tests/rpp/test_retry_when.cpp | 136 ++++++++++++++------------- 2 files changed, 109 insertions(+), 116 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 9afb6ef21..a3fd6f931 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -20,32 +20,31 @@ namespace rpp::operators::details { - template + template struct retry_when_state final : public rpp::composite_disposable { - template - retry_when_state(Observer&& observer, Observable&& observable, Notifier&& notifier) - : observer(std::forward(observer)) - , observable(std::forward(observable)) - , notifier(std::forward(notifier)) + retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) + : observer(std::move(observer)) + , observable(observable) + , notifier(notifier) { } - std::atomic_bool is_inside_drain{}; - RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + + std::atomic_bool is_inside_drain{}; }; - template - void drain(const std::shared_ptr, std::decay_t, std::decay_t>>& state); + template + void drain(const std::shared_ptr>& state); - template + template struct retry_when_impl_inner_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; @@ -63,15 +62,13 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { locally_disposed = true; - if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) - state->observer.on_error(err); + state->observer.on_error(err); } void on_completed() const { locally_disposed = true; - if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) - state->observer.on_completed(); + state->observer.on_completed(); } void set_upstream(const disposable_wrapper& d) { state->observer.set_upstream(d); } @@ -79,9 +76,9 @@ namespace rpp::operators::details bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; - template + template struct retry_when_impl_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; @@ -121,8 +118,8 @@ namespace rpp::operators::details bool is_disposed() const { return state->is_disposed(); } }; - template - void drain(const std::shared_ptr, std::decay_t, std::decay_t>>& state) + template + void drain(const std::shared_ptr>& state) { while (!state->is_disposed()) { @@ -130,10 +127,10 @@ namespace rpp::operators::details state->is_inside_drain.store(true, std::memory_order::seq_cst); try { - using value_type = rpp::utils::extract_observable_type_t; + using value_type = rpp::utils::extract_observer_type_t; state->observable.subscribe(rpp::observer>{state}); - if (!state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; } catch (...) @@ -144,40 +141,28 @@ namespace rpp::operators::details } } - template - struct retry_when_impl_t - { - RPP_NO_UNIQUE_ADDRESS TObservable observable; - RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - - using value_type = rpp::utils::extract_observable_type_t; - - template - void subscribe(TObserver&& observer) const - { - const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), observable, notifier); - auto ptr = d.lock(); - ptr->observer.set_upstream(d.as_weak()); - - drain(ptr); - } - }; - template struct retry_when_t { RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - template - auto operator()(TObservable&& observable) const & + template + struct operator_traits { - return rpp::observable, retry_when_impl_t, std::decay_t>>{observable, notifier}; - } + using result_type = T; + }; + + template + using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>; - template - auto operator()(TObservable&& observable) && + template + void subscribe(TObserver&& observer, TObservable&& observable) const { - return rpp::observable, retry_when_impl_t, std::decay_t>>{observable, std::forward(notifier)}; + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); + drain(ptr); } }; } // namespace rpp::operators::details diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index 40a158916..2c4af8124 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -21,11 +22,14 @@ #include "copy_count_tracker.hpp" #include "disposable_observable.hpp" +#include "rpp_trompeloil.hpp" TEST_CASE("retry_when resubscribes on notifier emission") { - auto observer = mock_observer_strategy(); + mock_observer mock{}; + trompeloeil::sequence seq; + SECTION("observable without error emission") { size_t subscribe_count = 0; @@ -34,18 +38,16 @@ TEST_CASE("retry_when resubscribes on notifier emission") sub.on_next(std::string{"1"}); sub.on_completed(); }); - SECTION("subscribe") + SECTION("observer obtains values from observable") { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) - | rpp::operators::subscribe(observer); - SECTION("observer obtains values from observable") - { - CHECK(subscribe_count == 1); - CHECK(observer.get_total_on_next_count() == 1); - CHECK(observer.get_on_error_count() == 0); - CHECK(observer.get_on_completed_count() == 1); - } + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); } } @@ -64,81 +66,68 @@ TEST_CASE("retry_when resubscribes on notifier emission") } }); - SECTION("subscribe on it with single emission notifier") + SECTION("original observable is subscribed twice and observer receives one emission") { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) - | rpp::operators::subscribe(observer); - SECTION("original observable is subscribed twice and observer receives one emission") - { - CHECK(subscribe_count == 2); - CHECK(observer.get_total_on_next_count() == 1); - CHECK(observer.get_on_error_count() == 0); - CHECK(observer.get_on_completed_count() == 1); - } + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 2); } - SECTION("subscribe on it with multiple emissions notifier") + SECTION("original observable is subscribed twice and observer receives only one emission") { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1, 2, 3); }) - | rpp::operators::subscribe(observer); - SECTION("original observable is subscribed twice and observer receives only one emission") - { - CHECK(subscribe_count == 2); - CHECK(observer.get_total_on_next_count() == 1); - CHECK(observer.get_on_error_count() == 0); - CHECK(observer.get_on_completed_count() == 1); - } + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 2); } - SECTION("subscribe on it with throwing notifier") + SECTION("original observable is subscribed only once and observer receives error emission") { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + observable | rpp::operators::retry_when([](const std::exception_ptr&) { throw std::runtime_error{"error"}; return rpp::source::just(1); }) - | rpp::operators::subscribe(observer); - SECTION("original observable is subscribed only once and observer receives error emission") - { - CHECK(subscribe_count == 1); - CHECK(observer.get_total_on_next_count() == 0); - CHECK(observer.get_on_error_count() == 1); - CHECK(observer.get_on_completed_count() == 0); - } + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); } - SECTION("subscribe on it with error notifier") + SECTION("original observable is subscribed only once and observer receives error emission") { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::error(std::make_exception_ptr(std::runtime_error{"error"})); }) - | rpp::operators::subscribe(observer); - SECTION("original observable is subscribed only once and observer receives error emission") - { - CHECK(subscribe_count == 1); - CHECK(observer.get_total_on_next_count() == 0); - CHECK(observer.get_on_error_count() == 1); - CHECK(observer.get_on_completed_count() == 0); - } + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); } - SECTION("subscribe on it with empty notifier") + SECTION("original observable is subscribed only once and observer receives completed emission") { + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::empty(); }) - | rpp::operators::subscribe(observer); - SECTION("original observable is subscribed only once and observer receives completed emission") - { - CHECK(subscribe_count == 1); - CHECK(observer.get_total_on_next_count() == 0); - CHECK(observer.get_on_error_count() == 0); - CHECK(observer.get_on_completed_count() == 1); - } + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); } } @@ -157,22 +146,41 @@ TEST_CASE("retry_when resubscribes on notifier emission") } }); - SECTION("subscribe on it with single emission notifier") + SECTION("original observable is subscribed 5 times and observer receives one emission") { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(1); }) - | rpp::operators::subscribe(observer); - SECTION("original observable is subscribed 5 times and observer receives one emission") - { - CHECK(subscribe_count == 4 + 1); - CHECK(observer.get_total_on_next_count() == 1); - CHECK(observer.get_on_error_count() == 0); - CHECK(observer.get_on_completed_count() == 1); - } + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 4 + 1); } } } +TEST_CASE("repeat_when does not stack overflow") +{ + mock_observer mock{}; + trompeloeil::sequence seq; + + constexpr size_t count = 500000; + + REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + rpp::source::create([](const auto& obs) { + obs.on_next(1); + obs.on_error({}); + }) + | rpp::operators::retry_when([i = count](const std::exception_ptr& ep) mutable { + if (--i != 0) + return rpp::source::just(1).as_dynamic(); + return rpp::source::error(ep).as_dynamic(); }) + | rpp::operators::subscribe(mock); +} + TEST_CASE("retry_when doesn't produce extra copies") { SECTION("retry_when(empty_notifier)") From 0b4d9e8481d95c69895e4ecca50ee4a5d7075e0a Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Tue, 20 Aug 2024 23:35:24 +0200 Subject: [PATCH 09/12] fix ub --- src/rpp/rpp/operators/retry_when.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index a3fd6f931..5f3329b47 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -32,11 +32,11 @@ namespace rpp::operators::details { } + std::atomic_bool is_inside_drain{}; + RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - - std::atomic_bool is_inside_drain{}; }; template @@ -144,7 +144,7 @@ namespace rpp::operators::details template struct retry_when_t { - RPP_NO_UNIQUE_ADDRESS TNotifier notifier; + TNotifier notifier; template struct operator_traits From 9e3bd72702d6f8d2e13405c4ffb5c316cde33a27 Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 21 Aug 2024 22:58:28 +0200 Subject: [PATCH 10/12] Add examples --- src/rpp/rpp/operators/retry_when.hpp | 7 +++++++ src/tests/rpp/test_retry_when.cpp | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 5f3329b47..d77cda420 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -56,6 +56,9 @@ namespace rpp::operators::details void on_next(T&&) const { locally_disposed = true; + + if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) + return; drain(state); } @@ -178,6 +181,10 @@ namespace rpp::operators * * @warning #include * + * @par Examples: + * @snippet retry_when.cpp retry_when delay + * @snippet retry_when.cpp retry_when + * * @ingroup error_handling_operators * @see https://reactivex.io/documentation/operators/retry.html */ diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index 2c4af8124..0f469655c 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -176,7 +177,7 @@ TEST_CASE("repeat_when does not stack overflow") }) | rpp::operators::retry_when([i = count](const std::exception_ptr& ep) mutable { if (--i != 0) - return rpp::source::just(1).as_dynamic(); + return rpp::source::just(rpp::schedulers::immediate{}, 1).as_dynamic(); // Use immediate scheduler for recursion return rpp::source::error(ep).as_dynamic(); }) | rpp::operators::subscribe(mock); } From 4939c913c4bc5f89fa586fb6538c10d25e50470c Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Wed, 21 Aug 2024 23:08:33 +0200 Subject: [PATCH 11/12] Address ai comment --- src/examples/rpp/doxygen/retry_when.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/examples/rpp/doxygen/retry_when.cpp b/src/examples/rpp/doxygen/retry_when.cpp index 5219e369f..4fcf9a1e3 100644 --- a/src/examples/rpp/doxygen/retry_when.cpp +++ b/src/examples/rpp/doxygen/retry_when.cpp @@ -12,7 +12,7 @@ int main() //! [retry_when delay] size_t retry_count = 0; rpp::source::create([&retry_count](const auto& sub) { - if (retry_count != 4) + if (++retry_count != 4) { sub.on_error({}); } @@ -32,7 +32,7 @@ int main() //! [retry_when] retry_count = 0; rpp::source::create([&retry_count](const auto& sub) { - if (retry_count != 4) + if (++retry_count != 4) { sub.on_error({}); } From 31301b636cd264a99f0d93708a00ffc8ae89276f Mon Sep 17 00:00:00 2001 From: CorentinBT Date: Thu, 22 Aug 2024 23:47:38 +0200 Subject: [PATCH 12/12] Address comments --- src/rpp/rpp/operators/retry_when.hpp | 9 +++++++-- src/tests/rpp/test_retry_when.cpp | 28 ++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index d77cda420..f875e562b 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -74,7 +74,7 @@ namespace rpp::operators::details state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->observer.set_upstream(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; @@ -147,7 +147,7 @@ namespace rpp::operators::details template struct retry_when_t { - TNotifier notifier; + RPP_NO_UNIQUE_ADDRESS TNotifier notifier; template struct operator_traits @@ -179,6 +179,11 @@ namespace rpp::operators * * @param notifier callable taking a std::exception_ptr and returning observable notifying when to resubscribe * + * @warning retry_when along with other re-subscribing operators needs to be carefully used with + * hot observables, as re-subscribing to a hot observable can have unwanted behaviors. For example, + * a hot observable behind a replay subject can indefinitely yield an error on each re-subscription + * and using retry_when on it would lead to an infinite execution. + * * @warning #include * * @par Examples: diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index 0f469655c..4d5632ee3 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -13,8 +13,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -79,6 +81,19 @@ TEST_CASE("retry_when resubscribes on notifier emission") CHECK(subscribe_count == 2); } + SECTION("original observable is subscribed twice and observer receives one emission, notifier emits on new_thread") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(rpp::schedulers::new_thread{}, 1); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 2); + } + SECTION("original observable is subscribed twice and observer receives only one emission") { REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); @@ -158,6 +173,19 @@ TEST_CASE("retry_when resubscribes on notifier emission") CHECK(subscribe_count == 4 + 1); } + + SECTION("original observable is subscribed twice and observer receives one emission, notifier emits on new_thread") + { + REQUIRE_CALL(*mock, on_next_rvalue("1")).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(rpp::schedulers::new_thread{}, 1); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 4 + 1); + } } }