diff --git a/src/rpp/rpp/disposables/refcount_disposable.hpp b/src/rpp/rpp/disposables/refcount_disposable.hpp index 90396fc14..c41521cde 100644 --- a/src/rpp/rpp/disposables/refcount_disposable.hpp +++ b/src/rpp/rpp/disposables/refcount_disposable.hpp @@ -54,12 +54,7 @@ namespace rpp friend class details::refocunt_disposable_inner; refcount_disposable() = default; - enum class Mode : bool - { - WeakRefStrongSource, - StrongRefRefSource - }; - composite_disposable_wrapper add_ref(Mode mode = Mode::WeakRefStrongSource); + composite_disposable_wrapper add_ref(); private: std::atomic m_refcount{0}; @@ -96,7 +91,7 @@ namespace rpp::details namespace rpp { - inline composite_disposable_wrapper refcount_disposable::add_ref(refcount_disposable::Mode mode) + inline composite_disposable_wrapper refcount_disposable::add_ref() { auto current_value = m_refcount.load(std::memory_order::seq_cst); while (true) @@ -107,8 +102,8 @@ namespace rpp // just need atomicity, not guarding anything if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst)) { - auto inner = composite_disposable_wrapper::make(mode == Mode::WeakRefStrongSource ? wrapper_from_this() : wrapper_from_this().as_weak()); - add(mode == Mode::WeakRefStrongSource ? inner.as_weak() : inner); + auto inner = composite_disposable_wrapper::make(wrapper_from_this()); + add(inner.as_weak()); return inner; } } diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index a843647bb..2be977ac2 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -19,11 +19,11 @@ namespace rpp::operators::details { template - class combine_latest_state final : public combining_state + class combine_latest_disposable final : public combining_disposable { public: - explicit combine_latest_state(Observer&& observer, const TSelector& selector) - : combining_state(std::move(observer), sizeof...(Args)) + explicit combine_latest_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -40,23 +40,23 @@ namespace rpp::operators::details template struct combine_latest_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::state; + using combining_observer_strategy>::disposable; template void on_next(T&& v) const { // mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one - const auto observer = state->get_observer_under_lock(); - state->get_values().template get().emplace(std::forward(v)); + const auto observer = disposable->get_observer_under_lock(); + disposable->get_values().template get().emplace(std::forward(v)); - state->get_values().apply(&apply_impl, state, observer); + disposable->get_values().apply(&apply_impl, disposable, observer); } private: - template - static void apply_impl(const TState& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) + template + static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock& observer, const std::optional&... vals) { if ((vals.has_value() && ...)) observer->on_next(disposable->get_selector()(vals.value()...)); @@ -64,7 +64,7 @@ namespace rpp::operators::details }; template - struct combine_latest_t : public combining_operator_t + struct combine_latest_t : public combining_operator_t { }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index fbe29fced..68534f80d 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -34,33 +34,29 @@ namespace rpp::operators::details }; template - class concat_state_t final : public std::enable_shared_from_this> + class concat_disposable final : public rpp::refcount_disposable { public: - concat_state_t(TObserver&& observer) + concat_disposable(TObserver&& observer) : m_observer{std::move(observer)} { - const auto d = disposable_wrapper_impl::make(); - m_disposable = d.lock(); - get_observer()->set_upstream(d); } rpp::utils::pointer_under_lock get_observer() { return m_observer; } rpp::utils::pointer_under_lock> get_queue() { return m_queue; } - const std::shared_ptr& get_disposable() const { return m_disposable; } std::atomic& stage() { return m_stage; } void drain(rpp::composite_disposable_wrapper refcounted) { - while (!m_disposable->is_disposed()) + while (!is_disposed()) { const auto observable = get_observable(); if (!observable) { stage().store(ConcatStage::None, std::memory_order::relaxed); refcounted.dispose(); - if (m_disposable->is_disposed()) + if (is_disposed()) get_observer()->on_completed(); return; } @@ -78,13 +74,12 @@ namespace rpp::operators::details drain(refcounted); } - private: bool handle_observable_impl(const rpp::constraint::decayed_same_as auto& observable, rpp::composite_disposable_wrapper refcounted) { stage().store(ConcatStage::Draining, std::memory_order::relaxed); refcounted.clear(); - observable.subscribe(concat_inner_observer_strategy{this->shared_from_this(), std::move(refcounted)}); + observable.subscribe(concat_inner_observer_strategy{disposable_wrapper_impl{wrapper_from_this()}.lock(), std::move(refcounted)}); ConcatStage current = ConcatStage::Draining; return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst); @@ -102,7 +97,6 @@ namespace rpp::operators::details } private: - std::shared_ptr m_disposable{}; rpp::utils::value_with_mutex m_observer; rpp::utils::value_with_mutex> m_queue; std::atomic m_stage{}; @@ -111,23 +105,23 @@ namespace rpp::operators::details template struct concat_observer_strategy_base { - concat_observer_strategy_base(std::shared_ptr> state, rpp::composite_disposable_wrapper refcounted) - : state{std::move(state)} + concat_observer_strategy_base(std::shared_ptr> disposable, rpp::composite_disposable_wrapper refcounted) + : disposable{std::move(disposable)} , refcounted{std::move(refcounted)} { } - concat_observer_strategy_base(std::shared_ptr> state) - : concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)} + concat_observer_strategy_base(std::shared_ptr> disposable) + : concat_observer_strategy_base{disposable, disposable->add_ref()} { } - std::shared_ptr> state; - rpp::composite_disposable_wrapper refcounted; + std::shared_ptr> disposable; + rpp::composite_disposable_wrapper refcounted; void on_error(const std::exception_ptr& err) const { - state->get_observer()->on_error(err); + disposable->get_observer()->on_error(err); } void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); } @@ -146,18 +140,18 @@ namespace rpp::operators::details template void on_next(T&& v) const { - base::state->get_observer()->on_next(std::forward(v)); + base::disposable->get_observer()->on_next(std::forward(v)); } void on_completed() const { ConcatStage current{ConcatStage::Draining}; - if (base::state->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst)) + if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst)) return; assert(current == ConcatStage::Processing); - base::state->drain(base::refcounted); + base::disposable->drain(base::refcounted); } }; @@ -168,7 +162,7 @@ namespace rpp::operators::details static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; concat_observer_strategy(TObserver&& observer) - : base{std::make_shared>(std::move(observer))} + : base{init_state(std::move(observer))} { } @@ -176,17 +170,27 @@ namespace rpp::operators::details void on_next(T&& v) const { ConcatStage current = ConcatStage::None; - if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) - base::state->handle_observable(std::forward(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)); + if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst)) + base::disposable->handle_observable(std::forward(v), base::disposable->add_ref()); else - base::state->get_queue()->push(std::forward(v)); + base::disposable->get_queue()->push(std::forward(v)); } void on_completed() const { base::refcounted.dispose(); - if (base::state->get_disposable()->is_disposed()) - base::state->get_observer()->on_completed(); + if (base::disposable->is_disposed()) + base::disposable->get_observer()->on_completed(); + } + + + private: + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); + return ptr; } }; diff --git a/src/rpp/rpp/operators/debounce.hpp b/src/rpp/rpp/operators/debounce.hpp index 3130fe9e8..8e0d3e916 100644 --- a/src/rpp/rpp/operators/debounce.hpp +++ b/src/rpp/rpp/operators/debounce.hpp @@ -12,32 +12,33 @@ #include +#include #include #include namespace rpp::operators::details { - template - class debounce_state; + template + class debounce_disposable; - template - struct debounce_state_wrapper + template + struct debounce_disposable_wrapper { - std::shared_ptr> state{}; + std::shared_ptr> disposable{}; - bool is_disposed() const { return state->is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } - void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); } + void on_error(const std::exception_ptr& err) const { disposable->get_observer_under_lock()->on_error(err); } }; - template - class debounce_state final : public rpp::details::enable_wrapper_from_this> - , public rpp::details::base_disposable + template + class debounce_disposable final : public rpp::composite_disposable_impl + , public rpp::details::enable_wrapper_from_this> { using T = rpp::utils::extract_observer_type_t; public: - debounce_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) + debounce_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period) : m_observer(std::move(in_observer)) , m_worker{std::move(in_worker)} , m_period{period} @@ -70,17 +71,17 @@ namespace rpp::operators::details { m_worker.schedule( m_time_when_value_should_be_emitted.value(), - [](const debounce_state_wrapper& handler) -> schedulers::optional_delay_to { - auto value_or_duration = handler.state->extract_value_or_time(); + [](const debounce_disposable_wrapper& handler) -> schedulers::optional_delay_to { + auto value_or_duration = handler.disposable->extract_value_or_time(); if (auto* timepoint = std::get_if(&value_or_duration)) return schedulers::optional_delay_to{*timepoint}; if (auto* value = std::get_if(&value_or_duration)) - handler.state->get_observer_under_lock()->on_next(std::move(*value)); + handler.disposable->get_observer_under_lock()->on_next(std::move(*value)); return std::nullopt; }, - debounce_state_wrapper{this->wrapper_from_this().lock()}); + debounce_disposable_wrapper{this->wrapper_from_this().lock()}); } std::variant extract_value_or_time() @@ -107,38 +108,38 @@ namespace rpp::operators::details std::optional m_value_to_be_emitted{}; }; - template + template struct debounce_observer_strategy { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - std::shared_ptr> state{}; + std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_observer_under_lock()->set_upstream(d); + disposable->add(d); } bool is_disposed() const { - return state->is_disposed(); + return disposable->is_disposed(); } template void on_next(T&& v) const { - state->emplace_safe(std::forward(v)); + disposable->emplace_safe(std::forward(v)); } void on_error(const std::exception_ptr& err) const noexcept { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const noexcept { - const auto observer = state->get_observer_under_lock(); - if (const auto value = state->extract_value()) + const auto observer = disposable->get_observer_under_lock(); + if (const auto value = disposable->extract_value()) observer->on_next(std::move(value).value()); observer->on_completed(); } @@ -154,20 +155,22 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = typename Prev::template add<1>; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift(Observer&& observer) const + template + auto lift_with_disposable_strategy(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; + using worker_t = rpp::schedulers::utils::get_worker_t; + using container = typename DisposableStrategy::disposables_container; - auto d = rpp::disposable_wrapper_impl, worker_t>>::make(std::forward(observer), scheduler.create_worker(), duration); - auto ptr = d.lock(); - ptr->get_observer_under_lock()->set_upstream(d.as_weak()); - return rpp::observer, worker_t>>{std::move(ptr)}; + + const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = disposable.lock(); + ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); + return rpp::observer, worker_t, container>>{std::move(ptr)}; } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 8b924ee4b..010eb968a 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -34,12 +35,12 @@ namespace rpp::operators::details rpp::schedulers::time_point time_point{}; }; - template - struct delay_state final + template + struct delay_disposable final : public rpp::composite_disposable_impl { using T = rpp::utils::extract_observer_type_t; - delay_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) + delay_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration delay) : observer(std::move(in_observer)) , worker{std::move(in_worker)} , delay{delay} @@ -55,31 +56,30 @@ namespace rpp::operators::details bool is_active{}; }; - template - struct delay_state_wrapper + template + struct delay_disposable_wrapper { - std::shared_ptr> state{}; + std::shared_ptr> disposable{}; - bool is_disposed() const { return state->observer.is_disposed(); } + bool is_disposed() const { return disposable->is_disposed(); } - void on_error(const std::exception_ptr& err) const { state->observer.on_error(err); } + void on_error(const std::exception_ptr& err) const { disposable->observer.on_error(err); } }; - template + template struct delay_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - - std::shared_ptr> state{}; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->observer.set_upstream(d); + disposable->add(d); } bool is_disposed() const { - return state->observer.is_disposed(); + return disposable->is_disposed(); } template @@ -104,59 +104,59 @@ namespace rpp::operators::details { if (const auto tp = emplace_safe(std::forward(value))) { - state->worker.schedule( + disposable->worker.schedule( tp.value(), - [](const delay_state_wrapper& wrapper) { return drain_queue(wrapper.state); }, - delay_state_wrapper{state}); + [](const delay_disposable_wrapper& wrapper) { return drain_queue(wrapper.disposable); }, + delay_disposable_wrapper{disposable}); } } template std::optional emplace_safe(TT&& item) const { - std::lock_guard lock{state->mutex}; + std::lock_guard lock{disposable->mutex}; if constexpr (ClearOnError && rpp::constraint::decayed_same_as) { - state->queue = std::queue>>{}; - state->observer.on_error(std::forward(item)); + disposable->queue = std::queue>>{}; + disposable->observer.on_error(std::forward(item)); return std::nullopt; } else { - const auto tp = state->worker.now() + state->delay; - state->queue.emplace(std::forward(item), tp); - if (!state->is_active) + const auto tp = disposable->worker.now() + disposable->delay; + disposable->queue.emplace(std::forward(item), tp); + if (!disposable->is_active) { - state->is_active = true; + disposable->is_active = true; return tp; } return std::nullopt; } } - static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& state) + static schedulers::optional_delay_to drain_queue(const std::shared_ptr>& disposable) { while (true) { - std::unique_lock lock{state->mutex}; - if (state->queue.empty()) + std::unique_lock lock{disposable->mutex}; + if (disposable->queue.empty()) { - state->is_active = false; + disposable->is_active = false; return std::nullopt; } - auto& top = state->queue.front(); - if (top.time_point > state->worker.now()) + auto& top = disposable->queue.front(); + if (top.time_point > disposable->worker.now()) return schedulers::optional_delay_to{top.time_point}; auto item = std::move(top.value); - state->queue.pop(); + disposable->queue.pop(); lock.unlock(); - std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { state->observer.on_next(std::move(v)); }, - [&](const std::exception_ptr& err) { state->observer.on_error(err); }, + std::visit(rpp::utils::overloaded{[&](rpp::utils::extract_observer_type_t&& v) { disposable->observer.on_next(std::move(v)); }, + [&](const std::exception_ptr& err) { disposable->observer.on_error(err); }, [&](rpp::utils::none) { - state->observer.on_completed(); + disposable->observer.on_completed(); }}, std::move(item)); } @@ -173,18 +173,21 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = Prev; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; rpp::schedulers::duration duration; RPP_NO_UNIQUE_ADDRESS Scheduler scheduler; - template - auto lift(Observer&& observer) const + template + auto lift_with_disposable_strategy(Observer&& observer) const { - using worker_t = rpp::schedulers::utils::get_worker_t; + using worker_t = rpp::schedulers::utils::get_worker_t; + using container = typename DisposableStrategy::disposables_container; - auto state = std::make_shared, worker_t>>(std::forward(observer), scheduler.create_worker(), duration); - return rpp::observer, worker_t, ClearOnError>>{std::move(state)}; + const auto disposable = disposable_wrapper_impl, worker_t, container>>::make(std::forward(observer), scheduler.create_worker(), duration); + auto ptr = disposable.lock(); + ptr->observer.set_upstream(disposable.as_weak()); + return rpp::observer, worker_t, container, ClearOnError>>{std::move(ptr)}; } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/details/combining_strategy.hpp b/src/rpp/rpp/operators/details/combining_strategy.hpp index 89fbe5f60..af3ca98ee 100644 --- a/src/rpp/rpp/operators/details/combining_strategy.hpp +++ b/src/rpp/rpp/operators/details/combining_strategy.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -22,11 +23,10 @@ namespace rpp::operators::details { template - class combining_state : public rpp::details::enable_wrapper_from_this> - , public rpp::details::base_disposable + class combining_disposable : public composite_disposable { public: - explicit combining_state(Observer&& observer, size_t on_completed_needed) + explicit combining_disposable(Observer&& observer, size_t on_completed_needed) : m_observer_with_mutex{std::move(observer)} , m_on_completed_needed{on_completed_needed} { @@ -46,36 +46,36 @@ namespace rpp::operators::details std::atomic_size_t m_on_completed_needed; }; - template + template struct combining_observer_strategy { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - std::shared_ptr state{}; + std::shared_ptr disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_observer_under_lock()->set_upstream(d); + disposable->add(d); } bool is_disposed() const { - return state->is_disposed(); + return disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (state->decrement_on_completed()) - state->get_observer_under_lock()->on_completed(); + if (disposable->decrement_on_completed()) + disposable->get_observer_under_lock()->on_completed(); } }; - template typename TState, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> + template typename TDisposable, template typename TStrategy, typename TSelector, rpp::constraint::observable... TObservables> struct combining_operator_t { RPP_NO_UNIQUE_ADDRESS rpp::utils::tuple observables; @@ -92,7 +92,7 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = ::rpp::details::observables::default_disposables_strategy; // TODO: sum of Prev + TObservables + using updated_optimal_disposables_strategy = ::rpp::details::observables::fixed_disposables_strategy<0>; template auto lift(Observer&& observer) const @@ -104,21 +104,21 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using State = TState...>; + using Disposable = TDisposable...>; - const auto d = rpp::disposable_wrapper_impl::make(std::forward(observer), selector); - auto state = d.lock(); - state->get_observer_under_lock()->set_upstream(d.as_weak()); + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto locked = disposable.lock(); + locked->get_observer_under_lock()->set_upstream(disposable.as_weak()); - subscribe>(state, std::index_sequence_for{}, observables...); + subscribe>(locked, std::index_sequence_for{}, observables...); - return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(state)}; + return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(locked)}; } template - static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, TStrategy...>>{state})); + (..., observables.subscribe(rpp::observer, TStrategy...>>{disposable})); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/merge.hpp b/src/rpp/rpp/operators/merge.hpp index ed5a4ae61..ee480a924 100644 --- a/src/rpp/rpp/operators/merge.hpp +++ b/src/rpp/rpp/operators/merge.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -23,13 +24,12 @@ namespace rpp::operators::details { template - class merge_state final + class merge_disposable final : public composite_disposable { public: - merge_state(TObserver&& observer) + merge_disposable(TObserver&& observer) : m_observer(std::move(observer)) { - get_observer_under_lock()->set_upstream(m_disposable); } // just need atomicity, not guarding anything @@ -40,11 +40,8 @@ namespace rpp::operators::details rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer; } - const rpp::composite_disposable_wrapper& get_disposable() const { return m_disposable; } - private: rpp::utils::value_with_mutex m_observer{}; - rpp::composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); std::atomic_size_t m_on_completed_needed{1}; }; @@ -52,50 +49,49 @@ namespace rpp::operators::details struct merge_observer_base_strategy { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - - merge_observer_base_strategy(std::shared_ptr>&& state) - : m_state{std::move(state)} + merge_observer_base_strategy(std::shared_ptr>&& disposable) + : m_disposable{std::move(disposable)} { } - merge_observer_base_strategy(const std::shared_ptr>& state) - : m_state{state} + merge_observer_base_strategy(const std::shared_ptr>& disposable) + : m_disposable{disposable} { } void set_upstream(const rpp::disposable_wrapper& d) const { - m_state->get_disposable().add(d); + m_disposable->add(d); m_disposables.push_back(d); } bool is_disposed() const { - return m_state->get_disposable().is_disposed(); + return m_disposable->is_disposed(); } void on_error(const std::exception_ptr& err) const { - m_state->get_observer_under_lock()->on_error(err); + m_disposable->get_observer_under_lock()->on_error(err); } void on_completed() const { - if (m_state->decrement_on_completed()) + if (m_disposable->decrement_on_completed()) { - m_state->get_observer_under_lock()->on_completed(); + m_disposable->get_observer_under_lock()->on_completed(); } else { for (const auto& v : m_disposables) { - m_state->get_disposable().remove(v); + m_disposable->remove(v); } } } protected: - std::shared_ptr> m_state; + std::shared_ptr> m_disposable; mutable std::vector m_disposables{}; }; @@ -107,7 +103,7 @@ namespace rpp::operators::details template void on_next(T&& v) const { - merge_observer_base_strategy::m_state->get_observer_under_lock()->on_next(std::forward(v)); + merge_observer_base_strategy::m_disposable->get_observer_under_lock()->on_next(std::forward(v)); } }; @@ -116,15 +112,24 @@ namespace rpp::operators::details { public: explicit merge_observer_strategy(TObserver&& observer) - : merge_observer_base_strategy{std::make_shared>(std::move(observer))} + : merge_observer_base_strategy{init_state(std::move(observer))} { } template void on_next(T&& v) const { - merge_observer_base_strategy::m_state->increment_on_completed(); - std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_state}}); + merge_observer_base_strategy::m_disposable->increment_on_completed(); + std::forward(v).subscribe(rpp::observer, merge_observer_inner_strategy>{merge_observer_inner_strategy{merge_observer_base_strategy::m_disposable}}); + } + + private: + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->get_observer_under_lock()->set_upstream(d.as_weak()); + return ptr; } }; diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 87ce69773..aa5930969 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -17,6 +17,18 @@ namespace rpp::operators::details { + template + struct on_error_resume_next_disposable final : public rpp::composite_disposable + { + on_error_resume_next_disposable(TObserver&& observer) + : rpp::composite_disposable{} + , observer(std::move(observer)) + { + } + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + }; + template struct on_error_resume_next_inner_observer_strategy { @@ -52,43 +64,52 @@ namespace rpp::operators::details static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector) - : state{std::make_shared(std::move(observer))} + : state{init_state(std::move(observer))} , selector{selector} { } - std::shared_ptr state; - RPP_NO_UNIQUE_ADDRESS Selector selector; + std::shared_ptr> state; + RPP_NO_UNIQUE_ADDRESS Selector selector; template void on_next(T&& v) const { - state->on_next(std::forward(v)); + state->observer.on_next(std::forward(v)); } void on_error(const std::exception_ptr& err) const { try { - selector(err).subscribe(on_error_resume_next_inner_observer_strategy{state}); + selector(err).subscribe(on_error_resume_next_inner_observer_strategy{std::shared_ptr(state, &state->observer)}); } catch (...) { - state->on_error(std::current_exception()); + state->observer.on_error(std::current_exception()); } + state->dispose(); } void on_completed() const { - state->on_completed(); + state->observer.on_completed(); } void set_upstream(const disposable_wrapper& d) const { - state->set_upstream(d); + state->add(d); } bool is_disposed() const { return state->is_disposed(); } + + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->observer.set_upstream(d.as_weak()); + return ptr; + } }; template diff --git a/src/rpp/rpp/operators/retry.hpp b/src/rpp/rpp/operators/retry.hpp index 73797acdc..cb6700cde 100644 --- a/src/rpp/rpp/operators/retry.hpp +++ b/src/rpp/rpp/operators/retry.hpp @@ -18,7 +18,7 @@ namespace rpp::operators::details { template - struct retry_state_t final + struct retry_state_t final : public rpp::composite_disposable { retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional count) : count{count} @@ -26,7 +26,6 @@ namespace rpp::operators::details , observable(observable) { - observer.set_upstream(disposable); } std::optional count; @@ -34,8 +33,6 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS Observable observable; - - rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); }; template @@ -64,7 +61,7 @@ namespace rpp::operators::details return; } - state->disposable.clear(); + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -80,16 +77,16 @@ namespace rpp::operators::details void set_upstream(const disposable_wrapper& d) const { - state->disposable.add(d); + state->add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->disposable.is_disposed()) + while (!state->is_disposed()) { if (state->count) --state->count.value(); @@ -126,7 +123,10 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observble) const { - const auto ptr = std::make_shared, std::decay_t>>(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); + const auto d = disposable_wrapper_impl, std::decay_t>>::make(std::forward(observer), std::forward(observble), count ? count.value() + 1 : count); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index 79155d8aa..b1bea7e8e 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -23,14 +23,13 @@ namespace rpp::operators::details template - struct retry_when_state final + struct retry_when_state final : public rpp::composite_disposable { - retry_when_state(TObserver&& in_observer, const TObservable& observable, const TNotifier& notifier) - : observer(std::move(in_observer)) + retry_when_state(TObserver&& observer, const TObservable& observable, const TNotifier& notifier) + : observer(std::move(observer)) , observable(observable) , notifier(notifier) { - observer.set_upstream(disposable); } std::atomic_bool is_inside_drain{}; @@ -38,8 +37,6 @@ namespace rpp::operators::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS TObservable observable; RPP_NO_UNIQUE_ADDRESS TNotifier notifier; - - rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); }; template @@ -59,7 +56,7 @@ namespace rpp::operators::details void on_next(T&&) const { locally_disposed = true; - state->disposable.clear(); + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -79,9 +76,9 @@ namespace rpp::operators::details state->observer.on_completed(); } - void set_upstream(const disposable_wrapper& d) const { state->disposable.add(d); } + void set_upstream(const disposable_wrapper& d) const { state->add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return locally_disposed || state->is_disposed(); } }; templateobserver.on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return state->disposable.is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } }; template void drain(const std::shared_ptr>& state) { - while (!state->disposable.is_disposed()) + while (!state->is_disposed()) { state->is_inside_drain.store(true, std::memory_order::seq_cst); try @@ -160,7 +157,10 @@ namespace rpp::operators::details template void subscribe(TObserver&& observer, TObservable&& observable) const { - const auto ptr = std::make_shared, std::decay_t, std::decay_t>>(std::forward(observer), std::forward(observable), notifier); + const auto d = disposable_wrapper_impl, std::decay_t, std::decay_t>>::make(std::forward(observer), std::forward(observable), notifier); + auto ptr = d.lock(); + + ptr->observer.set_upstream(d.as_weak()); drain(ptr); } }; diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index 6a9933953..a9b6d03bb 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -13,21 +13,22 @@ #include #include +#include #include #include namespace rpp::operators::details { template - class take_until_state final + class take_until_disposable final : public rpp::composite_disposable { public: - take_until_state(TObserver&& observer) + take_until_disposable(TObserver&& observer) : m_observer_with_mutex(std::move(observer)) { } - take_until_state(const TObserver& observer) + take_until_disposable(const TObserver& observer) : m_observer_with_mutex(observer) { } @@ -48,7 +49,7 @@ namespace rpp::operators::details { static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - std::shared_ptr> state; + std::shared_ptr> state; void on_error(const std::exception_ptr& err) const { @@ -62,9 +63,9 @@ namespace rpp::operators::details state->get_observer()->on_completed(); } - void set_upstream(const disposable_wrapper& d) { state->get_observer()->set_upstream(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return state->get_observer()->is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } }; template @@ -103,12 +104,14 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; template auto lift(Observer&& observer) const { - auto ptr = std::make_shared>>(std::forward(observer)); + const auto d = disposable_wrapper_impl>>::make(std::forward(observer)); + auto ptr = d.lock(); + ptr->get_observer()->set_upstream(d.as_weak()); observable.subscribe(take_until_throttle_observer_strategy>{ptr}); return rpp::observer>>(std::move(ptr)); diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index f952d43f8..f55bfa3c9 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -23,57 +23,53 @@ namespace rpp::operators::details { template - class with_latest_from_state final + class with_latest_from_disposable final : public composite_disposable { public: - explicit with_latest_from_state(Observer&& observer, const TSelector& selector) + explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector) : m_observer_with_mutex{std::move(observer)} , m_selector{selector} { - get_observer_under_lock()->set_upstream(m_disposable); } rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer_with_mutex; } rpp::utils::tuple>...>& get_values() { return m_values; } - const TSelector& get_selector() const { return m_selector; } - const composite_disposable_wrapper& get_disposable() const { return m_disposable; } + const TSelector& get_selector() const { return m_selector; } private: rpp::utils::value_with_mutex m_observer_with_mutex{}; rpp::utils::tuple>...> m_values{}; - composite_disposable_wrapper m_disposable = composite_disposable_wrapper::make(); RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template struct with_latest_from_inner_observer_strategy { - static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; - - std::shared_ptr> state{}; + static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::Auto; + std::shared_ptr> disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_disposable().add(d); + disposable->add(d); } bool is_disposed() const { - return state->get_disposable().is_disposed(); + return disposable->is_disposed(); } template void on_next(T&& v) const { - auto locked_value = state->get_values().template get().lock(); + auto locked_value = disposable->get_values().template get().lock(); locked_value->emplace(std::forward(v)); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } static constexpr rpp::utils::empty_function_t<> on_completed{}; @@ -83,26 +79,26 @@ namespace rpp::operators::details requires std::invocable struct with_latest_from_observer_strategy { - using Disposable = with_latest_from_state; + using Disposable = with_latest_from_disposable; using Result = std::invoke_result_t; static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None; - std::shared_ptr state{}; + std::shared_ptr disposable{}; void set_upstream(const rpp::disposable_wrapper& d) const { - state->get_disposable().add(d); + disposable->add(d); } bool is_disposed() const { - return state->get_disposable().is_disposed(); + return disposable->is_disposed(); } template void on_next(T&& v) const { - auto result = state->get_values().apply([&d = this->state, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { + auto result = disposable->get_values().apply([&d = this->disposable, &v](rpp::utils::value_with_mutex>&... vals) -> std::optional { auto lock = std::scoped_lock{vals.get_mutex()...}; if ((vals.get_value_unsafe().has_value() && ...)) @@ -111,17 +107,17 @@ namespace rpp::operators::details }); if (result.has_value()) - state->get_observer_under_lock()->on_next(std::move(result).value()); + disposable->get_observer_under_lock()->on_next(std::move(result).value()); } void on_error(const std::exception_ptr& err) const { - state->get_observer_under_lock()->on_error(err); + disposable->get_observer_under_lock()->on_error(err); } void on_completed() const { - state->get_observer_under_lock()->on_completed(); + disposable->get_observer_under_lock()->on_completed(); } }; @@ -142,7 +138,7 @@ namespace rpp::operators::details }; template - using updated_optimal_disposables_strategy = rpp::details::observables::default_disposables_strategy; + using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>; template auto lift(Observer&& observer) const @@ -154,18 +150,20 @@ namespace rpp::operators::details template static auto subscribe_impl(Observer&& observer, const TSelector& selector, const TObservables&... observables) { - using State = with_latest_from_state...>; + using Disposable = with_latest_from_disposable...>; - auto ptr = std::make_shared(std::forward(observer), selector); + const auto disposable = disposable_wrapper_impl::make(std::forward(observer), selector); + auto ptr = disposable.lock(); + ptr->get_observer_under_lock()->set_upstream(disposable.as_weak()); subscribe(ptr, std::index_sequence_for{}, observables...); return rpp::observer, TSelector, Type, rpp::utils::extract_observable_type_t...>>{std::move(ptr)}; } template - static void subscribe(const std::shared_ptr...>>& state, std::index_sequence, const TObservables&... observables) + static void subscribe(const std::shared_ptr...>>& disposable, std::index_sequence, const TObservables&... observables) { - (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{state})); + (..., observables.subscribe(rpp::observer, with_latest_from_inner_observer_strategy...>>{disposable})); } }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/operators/zip.hpp b/src/rpp/rpp/operators/zip.hpp index ec29fd04e..71d0369d0 100644 --- a/src/rpp/rpp/operators/zip.hpp +++ b/src/rpp/rpp/operators/zip.hpp @@ -21,11 +21,11 @@ namespace rpp::operators::details { template - class zip_state final : public combining_state + class zip_disposable final : public combining_disposable { public: - explicit zip_state(Observer&& observer, const TSelector& selector) - : combining_state(std::move(observer), sizeof...(Args)) + explicit zip_disposable(Observer&& observer, const TSelector& selector) + : combining_disposable(std::move(observer), sizeof...(Args)) , m_selector(selector) { } @@ -42,17 +42,17 @@ namespace rpp::operators::details template struct zip_observer_strategy final - : public combining_observer_strategy> + : public combining_observer_strategy> { - using combining_observer_strategy>::state; + using combining_observer_strategy>::disposable; template void on_next(T&& v) const { - const auto observer = state->get_observer_under_lock(); - state->get_pendings().template get().push_back(std::forward(v)); + const auto observer = disposable->get_observer_under_lock(); + disposable->get_pendings().template get().push_back(std::forward(v)); - state->get_pendings().apply(&apply_impl, state, observer); + disposable->get_pendings().apply(&apply_impl, disposable, observer); } private: @@ -68,7 +68,7 @@ namespace rpp::operators::details }; template - struct zip_t : public combining_operator_t + struct zip_t : public combining_operator_t { }; } // namespace rpp::operators::details diff --git a/src/rpp/rpp/sources/concat.hpp b/src/rpp/rpp/sources/concat.hpp index d8e74722d..b22f86c85 100644 --- a/src/rpp/rpp/sources/concat.hpp +++ b/src/rpp/rpp/sources/concat.hpp @@ -22,13 +22,12 @@ namespace rpp::details { template - struct concat_state_t + struct concat_state_t : public rpp::composite_disposable { concat_state_t(TObserver&& in_observer, const PackedContainer& in_container) : observer(std::move(in_observer)) , container(in_container) { - observer.set_upstream(disposable); try { itr = std::cbegin(container); @@ -41,7 +40,6 @@ namespace rpp::details RPP_NO_UNIQUE_ADDRESS TObserver observer; RPP_NO_UNIQUE_ADDRESS PackedContainer container; - rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); std::optional itr{}; std::atomic is_inside_drain{}; }; @@ -69,14 +67,14 @@ namespace rpp::details state->observer.on_error(err); } - void set_upstream(const disposable_wrapper& d) { state->disposable.add(d); } + void set_upstream(const disposable_wrapper& d) { state->add(d); } - bool is_disposed() const { return locally_disposed || state->disposable.is_disposed(); } + bool is_disposed() const { return locally_disposed || state->is_disposed(); } void on_completed() const { locally_disposed = true; - state->disposable.clear(); + state->clear(); if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst)) return; @@ -88,7 +86,7 @@ namespace rpp::details template void drain(const std::shared_ptr>& state) { - while (!state->disposable.is_disposed()) + while (!state->is_disposed()) { if (state->itr.value() == std::cend(state->container)) { @@ -133,7 +131,10 @@ namespace rpp::details template Strategy> void subscribe(observer&& obs) const { - drain(std::make_shared, PackedContainer>>(std::move(obs), container)); + const auto d = disposable_wrapper_impl, PackedContainer>>::make(std::move(obs), container); + const auto state = d.lock(); + state->observer.set_upstream(d.as_weak()); + drain(state); } };