From aa4b4bbf6a8261d7a28c5b829456a56b662d73c3 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 17 Feb 2024 13:19:22 +0300 Subject: [PATCH 01/10] format --- .clang-format | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.clang-format b/.clang-format index 5f1d37131..0adc3b90f 100644 --- a/.clang-format +++ b/.clang-format @@ -63,6 +63,8 @@ DerivePointerAlignment: false EmptyLineAfterAccessModifier: Never EmptyLineBeforeAccessModifier: Always ExperimentalAutoDetectBinPacking: true +# AllowShortCompoundRequirementOnASingleLine: true +RequiresExpressionIndentation: OuterScope FixNamespaceComments: true IncludeBlocks: Regroup IncludeCategories: From 33e3e19e579f9200f9386f78e6a677f39c12ce5f Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 17 Feb 2024 13:19:25 +0300 Subject: [PATCH 02/10] update subjects --- docs/readme.md | 2 +- .../observables/connectable_observable.hpp | 4 +- src/rpp/rpp/observables/fwd.hpp | 3 +- .../operators/details/forwarding_subject.hpp | 18 +- src/rpp/rpp/sources/create.hpp | 1 + src/rpp/rpp/subjects.hpp | 2 +- src/rpp/rpp/subjects/details/base_subject.hpp | 62 ------ .../subjects/details/subject_on_subscribe.hpp | 38 ++++ .../rpp/subjects/details/subject_state.hpp | 24 ++- src/rpp/rpp/subjects/fwd.hpp | 54 +++--- src/rpp/rpp/subjects/publish_subject.hpp | 42 +++-- src/rpp/rpp/subjects/replay_subject.hpp | 177 +++++++----------- src/rpp/rpp/subjects/serialized_subject.hpp | 101 ---------- src/rpp/rpp/utils/utils.hpp | 7 + src/tests/rpp/test_subjects.cpp | 25 ++- 15 files changed, 200 insertions(+), 360 deletions(-) delete mode 100644 src/rpp/rpp/subjects/details/base_subject.hpp create mode 100644 src/rpp/rpp/subjects/details/subject_on_subscribe.hpp delete mode 100644 src/rpp/rpp/subjects/serialized_subject.hpp diff --git a/docs/readme.md b/docs/readme.md index 2a5efa0a1..b9a6d53d4 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -143,7 +143,7 @@ RPP follows this contract and especially this part. It means, that: 2. Any user-provided callbacks (for operators or observers) can be not thread-safe due to thread-safety of observable is guaranteed.
For example: internal logic of `take` operator doesn't use mutexes or atomics due to underlying observable **MUST** emit items serially 3. When you implement your own operator via `create` be careful to **follow this contract**! -4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_subject instead if can't guarantee serial emissions! +4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_* instead if can't guarantee serial emissions! It means, that for example: diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index 16b60c059..affe9ad10 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -68,12 +68,14 @@ namespace rpp * * @ingroup observables */ - template + template class connectable_observable final : public decltype(std::declval().get_observable()) { using base = decltype(std::declval().get_observable()); public: + static_assert(rpp::constraint::subject); + connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{}) : base{subject.get_observable()} , m_original_observable{original_observable} diff --git a/src/rpp/rpp/observables/fwd.hpp b/src/rpp/rpp/observables/fwd.hpp index f882a001b..7411fdca4 100644 --- a/src/rpp/rpp/observables/fwd.hpp +++ b/src/rpp/rpp/observables/fwd.hpp @@ -10,7 +10,6 @@ #pragma once #include -#include #include #include @@ -70,7 +69,7 @@ namespace rpp::constraint namespace rpp { - template + template class connectable_observable; } // namespace rpp diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index a9e05bee8..0a9fb39a1 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -13,7 +13,7 @@ #include #include -#include +#include #include #include @@ -27,7 +27,7 @@ namespace rpp::operators::details { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr> state{}; + std::shared_ptr> state{}; void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } @@ -53,14 +53,16 @@ namespace rpp::operators::details return rpp::observer{m_state.lock()}; } - template TObs> - void on_subscribe(TObs&& observer) const + auto get_observable() const { - if (const auto locked = m_refcount.lock()) - observer.set_upstream(locked->add_ref()); - m_state.lock()->on_subscribe(std::forward(observer)); + return rpp::source::create([state = m_state, refcount = m_refcount] TObs>(TObs&& observer) { + if (const auto locked = refcount.lock()) + observer.set_upstream(locked->add_ref()); + state.lock()->on_subscribe(std::forward(observer)); + }); } + rpp::composite_disposable_wrapper get_disposable() const { return m_state.as_weak(); @@ -68,7 +70,7 @@ namespace rpp::operators::details private: disposable_wrapper_impl m_refcount; - disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); + disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); }; template diff --git a/src/rpp/rpp/sources/create.hpp b/src/rpp/rpp/sources/create.hpp index b65f15253..cab996447 100644 --- a/src/rpp/rpp/sources/create.hpp +++ b/src/rpp/rpp/sources/create.hpp @@ -19,6 +19,7 @@ namespace rpp::details struct create_strategy { using value_type = Type; + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe; }; diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index 3e5791c32..bf5ed2934 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -18,4 +18,4 @@ */ #include -#include \ No newline at end of file +#include \ No newline at end of file diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp deleted file mode 100644 index be2fe52ab..000000000 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ /dev/null @@ -1,62 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2022 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus - -#pragma once - -#include -#include - -#include - -namespace rpp::subjects::details -{ - template Strategy> - class base_subject - { - struct on_subscribe - { - using value_type = T; - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; - - Strategy strategy; - - template TObs> - void subscribe(TObs&& sub) const - { - strategy.on_subscribe(std::forward(sub)); - } - }; - - public: - template - requires (rpp::constraint::is_constructible_from && !rpp::constraint::variadic_decayed_same_as) - explicit base_subject(Args&&... args) - : m_strategy{std::forward(args)...} - { - } - - auto get_observer() const - { - return m_strategy.get_observer(); - } - - auto get_observable() const - { - return rpp::observable{m_strategy}; - } - - auto get_disposable() const - { - return m_strategy.get_disposable(); - } - - private: - Strategy m_strategy{}; - }; -} // namespace rpp::subjects::details diff --git a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp new file mode 100644 index 000000000..78ce3fe62 --- /dev/null +++ b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp @@ -0,0 +1,38 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include + +namespace rpp::subjects::details +{ + namespace constraint + { + template + concept subject_on_subscribe = requires(const Strategy& t, rpp::details::observers::fake_observer&& obs) { + t.on_subscribe(std::move(obs)); + }; + } + + template Strategy> + struct subject_on_subscribe + { + using value_type = T; + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; + + Strategy strategy; + + template TObs> + void subscribe(TObs&& sub) const + { + strategy.on_subscribe(std::forward(sub)); + } + }; +} // namespace rpp::subjects::details diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 742662a1c..0d902745f 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -35,8 +35,8 @@ namespace rpp::subjects::details { }; - template - class subject_state : public std::enable_shared_from_this> + template + class subject_state : public std::enable_shared_from_this> , public composite_disposable { using shared_observers = std::shared_ptr>>; @@ -72,21 +72,28 @@ namespace rpp::subjects::details void on_next(const Type& v) { + std::lock_guard lock{m_serialized_mutex}; if (const auto observers = extract_observers_under_lock_if_there()) rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_next(v); }); } void on_error(const std::exception_ptr& err) { - if (const auto observers = exchange_observers_under_lock_if_there(err)) - rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); }); + { + std::lock_guard lock{m_serialized_mutex}; + if (const auto observers = exchange_observers_under_lock_if_there(err)) + rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); }); + } dispose(); } void on_completed() { - if (const auto observers = exchange_observers_under_lock_if_there(completed{})) - rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{}); + { + std::lock_guard lock{m_serialized_mutex}; + if (const auto observers = exchange_observers_under_lock_if_there(completed{})) + rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{}); + } dispose(); } @@ -163,7 +170,8 @@ namespace rpp::subjects::details } private: - state_t m_state{}; - std::mutex m_mutex{}; + state_t m_state{}; + std::mutex m_mutex{}; + RPP_NO_UNIQUE_ADDRESS std::conditional_t m_serialized_mutex{}; }; } // namespace rpp::subjects::details \ No newline at end of file diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 816bb0c13..764cf5f9c 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -11,52 +11,42 @@ #include #include +#include #include #include -namespace rpp::subjects::details +namespace rpp::subjects { - namespace constraint - { - template - concept subject_strategy = requires(Strategy t, rpp::details::observers::fake_observer&& obs) { - { - t.get_observer() - } -> rpp::constraint::observer; - t.on_subscribe(std::move(obs)); - { - t.get_disposable() - } -> rpp::constraint::decayed_any_of; - }; - } // namespace constraint - template Strategy> - class base_subject; - template - class publish_strategy; - + class publish_subject; + template - class serialized_strategy; -} // namespace rpp::subjects::details + class serialized_publish_subject; + -namespace rpp::subjects -{ template - class publish_subject; + class replay_subject; template - class serialized_subject; + class serialized_replay_subject; + + } // namespace rpp::subjects -namespace rpp::subjects::utils +namespace rpp::constraint { template - using extract_subject_type_t = typename rpp::utils::extract_base_type_params_t::template type_at_index_t<0>; -} // namespace rpp::subjects::utils + concept subject = requires(const T& subj) + { + { subj.get_observer() } -> rpp::constraint::observer; + { subj.get_observable() } -> rpp::constraint::observable; + { subj.get_disposable() } -> rpp::constraint::decayed_any_of; + }; +} -namespace rpp::constraint +namespace rpp::subjects::utils { - template - concept subject = rpp::utils::is_base_of_v; -} // namespace rpp::constraint + template + using extract_subject_type_t = rpp::utils::extract_observer_type_t().get_observer())>; +} // namespace rpp::subjects::utils diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 2063dcfc7..4a2a1cdec 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -13,21 +13,19 @@ #include #include -#include +#include #include -#include - namespace rpp::subjects::details { - template - class publish_strategy + template + class publish_subject_base { struct observer_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - std::shared_ptr> state{}; + std::shared_ptr> state{}; void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } @@ -41,17 +39,18 @@ namespace rpp::subjects::details }; public: - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; + + publish_subject_base() = default; auto get_observer() const { return rpp::observer{m_state.lock()}; } - template TObs> - void on_subscribe(TObs&& observer) const + auto get_observable() const { - m_state.lock()->on_subscribe(std::forward(observer)); + return rpp::source::create([state=m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); }); } rpp::disposable_wrapper get_disposable() const @@ -60,10 +59,9 @@ namespace rpp::subjects::details } private: - disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); + disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); }; -} // namespace rpp::subjects::details - +} namespace rpp::subjects { /** @@ -71,7 +69,7 @@ namespace rpp::subjects * * @details Each observer obtains only values which emitted after corresponding subscribe. on_error/on_completer/unsubscribe cached and provided to new observers if any * - * @warning this subject is not synchronized/serialized! It means, that expected to call callbacks of observer in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_subject. + * @warning this subject is not synchronized/serialized! It means, that expected to call callbacks of observer in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_publish_subject. * * @tparam Type value provided by this subject * @@ -79,9 +77,15 @@ namespace rpp::subjects * @see https://reactivex.io/documentation/subject.html */ template - class publish_subject final : public details::base_subject> - { - public: - using details::base_subject>::base_subject; - }; + class publish_subject final : public details::publish_subject_base{}; + + /** + * @brief Serialized version of rpp::subjects::publish_subject + * @details When you are using ordinary rpp::subjects::publish_subject, then you must take care not to call its on_next method (or its other on methods) in async way. + * + * @ingroup subjects + * @see https://reactivex.io/documentation/subject.html + */ + template + class serialized_publish_subject final : public details::publish_subject_base{}; } // namespace rpp::subjects \ No newline at end of file diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index 6d26faf8d..2e14a0e66 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -13,171 +13,123 @@ #include #include -#include +#include #include -#include -#include +#include #include namespace rpp::subjects::details { template - class replay_strategy + class replay_subject_base { - struct replay_state final : public subject_state + struct replay_state final : public subject_state { - replay_state(std::optional count, std::optional duration) - : count(count) - , duration(duration) + replay_state(size_t limit = std::numeric_limits::max(), rpp::schedulers::duration duration_limit = std::numeric_limits::max()) + : m_limit(limit) + , m_duration_limit(duration_limit) { } - auto collect_duration() + void add_value(const Type& v) { - if (duration.has_value()) - { - auto now = rpp::schedulers::clock_type::now(); - while (!values.empty() && (now - values.front().second > duration.value())) - { - values.pop_front(); - } - return now; - } - return rpp::schedulers::clock_type::time_point{}; + std::unique_lock lock{m_values_mutex}; + while (m_values.size() >= m_limit) + m_values.pop_front(); + + m_values.emplace_back(v, deduce_timepoint()); } - void collect_bound() + struct value_with_time + { + value_with_time(const Type& v, rpp::schedulers::clock_type::time_point timepoint) + : value{v} + , timepoint{timepoint} + {} + + Type value; + rpp::schedulers::clock_type::time_point timepoint; + }; + + + std::deque get_actual_values() { - if (count.has_value()) - { - if (values.size() == count.value()) - { - values.pop_front(); - } - } + std::unique_lock lock{m_values_mutex}; + deduce_timepoint(); + return m_values; } - template - void collect(T&& v) + private: + rpp::schedulers::clock_type::time_point deduce_timepoint() { - std::unique_lock lock{list_mutex}; - collect_bound(); - const auto time_point = collect_duration(); + if (std::numeric_limits::max() == m_duration_limit) + return rpp::schedulers::clock_type::time_point{}; - values.emplace_back(std::forward(v), time_point); + auto now = rpp::schedulers::clock_type::now(); + while (!m_values.empty() && (now - m_values.front().timepoint > m_duration_limit)) + m_values.pop_front(); + return now; } - std::optional count; - std::optional duration; + private: + std::mutex m_values_mutex{}; + std::deque m_values{}; - std::list> values{}; - - std::mutex list_mutex{}; - std::mutex serialized_mutex{}; + const size_t m_limit; + const rpp::schedulers::duration m_duration_limit; }; struct observer_strategy { std::shared_ptr state; - template - void collect_and_on_next(T&& v) const - requires Serialized - { - state->collect(std::forward(v)); - - std::unique_lock lock{state->serialized_mutex}; - state->on_next(state->values.back().first); - } - - template - void collect_and_on_next(T&& v) const - { - state->collect(std::forward(v)); - state->on_next(state->values.back().first); - } - void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } - bool is_disposed() const noexcept - { - return state->is_disposed(); - } - - void on_next(Type&& v) const - { - collect_and_on_next(std::move(v)); - } + bool is_disposed() const noexcept { return state->is_disposed(); } void on_next(const Type& v) const { - collect_and_on_next(v); + state->add_value(v); + state->on_next(v); } - void on_error(const std::exception_ptr& err) const - requires Serialized - { - std::unique_lock lock{state->serialized_mutex}; - state->on_error(err); - } - - void on_error(const std::exception_ptr& err) const - { - state->on_error(err); - } - - void on_completed() const - requires Serialized - { - std::unique_lock lock{state->serialized_mutex}; - state->on_completed(); - } + void on_error(const std::exception_ptr& err) const { state->on_error(err); } - void on_completed() const - { - state->on_completed(); - } + void on_completed() const { state->on_completed(); } }; public: - replay_strategy() - : m_state(disposable_wrapper_impl::make(std::nullopt, std::nullopt)) + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + replay_subject_base() + : m_state{disposable_wrapper_impl::make()} { } - replay_strategy(size_t count) - : m_state{disposable_wrapper_impl::make(std::max(1, count), std::nullopt)} + replay_subject_base(size_t count) + : m_state{disposable_wrapper_impl::make(std::max(1, count))} { } - replay_strategy(size_t count, rpp::schedulers::duration duration) + replay_subject_base(size_t count, rpp::schedulers::duration duration) : m_state{disposable_wrapper_impl::make(std::max(1, count), duration)} { } - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - auto get_observer() const { return rpp::observer{m_state.lock()}; } - template TObs> - void on_subscribe(TObs&& observer) const + auto get_observable() const { - if (const auto locked = m_state.lock()) - { - { - std::unique_lock lock{locked->list_mutex}; - locked->collect_duration(); - for (const auto& value : locked->values) - { - observer.on_next(value.first); - } - } + return rpp::source::create([state = m_state] TObs>(TObs&& observer) { + const auto locked = state.lock(); + for (auto&& value : locked->get_actual_values()) + observer.on_next(std::move(value.value)); locked->on_subscribe(std::forward(observer)); - } + }); } rpp::disposable_wrapper get_disposable() const @@ -204,22 +156,23 @@ namespace rpp::subjects * @see https://reactivex.io/documentation/subject.html */ template - class replay_subject final : public details::base_subject> + class replay_subject final : public details::replay_subject_base { public: - using details::base_subject>::base_subject; + using details::replay_subject_base::replay_subject_base; }; /** * @brief Same as rpp::subjects::replay_subject but on_next/on_error/on_completed calls are serialized via mutex. + * @details When you are using ordinary rpp::subjects::replay_subject, then you must take care not to call its on_next method (or its other on methods) in async way. * * @ingroup subjects * @see https://reactivex.io/documentation/subject.html */ template - class serialized_replay_subject final : public details::base_subject> + class serialized_replay_subject final : public details::replay_subject_base { public: - using details::base_subject>::base_subject; + using details::replay_subject_base::replay_subject_base; }; } // namespace rpp::subjects \ No newline at end of file diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp deleted file mode 100644 index 32fce8914..000000000 --- a/src/rpp/rpp/subjects/serialized_subject.hpp +++ /dev/null @@ -1,101 +0,0 @@ -// ReactivePlusPlus library -// -// Copyright Aleksey Loginov 2022 - present. -// Distributed under the Boost Software License, Version 1.0. -// (See accompanying file LICENSE_1_0.txt or copy at -// https://www.boost.org/LICENSE_1_0.txt) -// -// Project home: https://github.com/victimsnino/ReactivePlusPlus - -#pragma once - -#include - -#include -#include -#include -#include - -#include - -namespace rpp::subjects::details -{ - template - class serialized_strategy - { - struct serialized_state final : public subject_state - { - std::mutex mutex{}; - }; - - struct observer_strategy - { - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - - std::shared_ptr state{}; - - void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } - - bool is_disposed() const noexcept - { - return state->is_disposed(); - } - - void on_next(const Type& v) const - { - std::lock_guard lock{state->mutex}; - state->on_next(v); - } - - void on_error(const std::exception_ptr& err) const - { - std::lock_guard lock{state->mutex}; - state->on_error(err); - } - - void on_completed() const - { - std::lock_guard lock{state->mutex}; - state->on_completed(); - } - }; - - public: - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; - - auto get_observer() const - { - return rpp::observer{m_state.lock()}; - } - - template TObs> - void on_subscribe(TObs&& observer) const - { - m_state.lock()->on_subscribe(std::forward(observer)); - } - - rpp::composite_disposable_wrapper get_disposable() const - { - return m_state; - } - - private: - disposable_wrapper_impl m_state = disposable_wrapper_impl::make(); - }; -} // namespace rpp::subjects::details - -namespace rpp::subjects -{ - /** - * @brief Same as rpp::subjects::publish_subject, but on_next/on_error/on_completed calls are serialized via mutex - * - * @ingroup subjects - * @see https://reactivex.io/documentation/subject.html - */ - template - class serialized_subject final : public details::base_subject> - { - public: - using details::base_subject>::base_subject; - }; -} // namespace rpp::subjects \ No newline at end of file diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 466cf4779..65c33865a 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -246,6 +246,13 @@ namespace rpp::utils T m_value; }; + struct none_mutex + { + static constexpr void lock() {} + static constexpr void unlock() {} + static constexpr void try_lock() {} + }; + #define RPP_CALL_DURING_CONSTRUCTION(...) RPP_NO_UNIQUE_ADDRESS rpp::utils::none _ = [&]() { \ __VA_ARGS__; \ return rpp::utils::none{}; \ diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index ad989827a..4b414726e 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -15,7 +15,6 @@ #include #include #include -#include #include "copy_count_tracker.hpp" #include "mock_observer.hpp" @@ -232,7 +231,7 @@ TEST_CASE("publish subject caches error/completed") } } -TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_subject, rpp::subjects::serialized_replay_subject) +TEMPLATE_TEST_CASE("serialized subjects handles race condition", "", rpp::subjects::serialized_publish_subject, rpp::subjects::serialized_replay_subject) { auto subj = TestType{}; @@ -398,16 +397,16 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp { auto sub = TestType{}; - sub.get_observable().subscribe([](const copy_count_tracker& tracker) { - CHECK(tracker.get_copy_count() == 0); - CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer + sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT + CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer + CHECK(tracker.get_move_count() == 0); }); sub.get_observer().on_next(copy_count_tracker{}); - sub.get_observable().subscribe([](const copy_count_tracker& tracker) { - CHECK(tracker.get_copy_count() == 0); - CHECK(tracker.get_move_count() == 1); // 1 move to internal replay buffer + sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT + CHECK(tracker.get_copy_count() == 2+1); // + 1 copy values from buffer for this observer + CHECK(tracker.get_move_count() == 0+1); // + 1 move to this observer }); } @@ -416,16 +415,16 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp copy_count_tracker tracker{}; auto sub = TestType{}; - sub.get_observable().subscribe([](const copy_count_tracker& tracker) { - CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer + sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT + CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer CHECK(tracker.get_move_count() == 0); }); sub.get_observer().on_next(tracker); - sub.get_observable().subscribe([](const copy_count_tracker& tracker) { - CHECK(tracker.get_copy_count() == 1); // 1 copy to internal replay buffer - CHECK(tracker.get_move_count() == 0); + sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT + CHECK(tracker.get_copy_count() == 2+1); // + 1 copy values from buffer for this observer + CHECK(tracker.get_move_count() == 0+1); // + 1 move to this observer }); } } \ No newline at end of file From 93173170e1f7fb5f279ae393971a745c300c7d05 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 17 Feb 2024 10:19:39 +0000 Subject: [PATCH 03/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../observables/connectable_observable.hpp | 2 +- .../operators/details/forwarding_subject.hpp | 2 +- src/rpp/rpp/sources/create.hpp | 2 +- .../subjects/details/subject_on_subscribe.hpp | 2 +- src/rpp/rpp/subjects/fwd.hpp | 23 +++++++++++-------- src/rpp/rpp/subjects/publish_subject.hpp | 12 ++++++---- src/rpp/rpp/subjects/replay_subject.hpp | 5 ++-- src/tests/rpp/test_subjects.cpp | 14 +++++------ 8 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index affe9ad10..479790949 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -75,7 +75,7 @@ namespace rpp public: static_assert(rpp::constraint::subject); - + connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{}) : base{subject.get_observable()} , m_original_observable{original_observable} diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 0a9fb39a1..e43aea975 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -69,7 +69,7 @@ namespace rpp::operators::details } private: - disposable_wrapper_impl m_refcount; + disposable_wrapper_impl m_refcount; disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); }; diff --git a/src/rpp/rpp/sources/create.hpp b/src/rpp/rpp/sources/create.hpp index cab996447..da1bdd27c 100644 --- a/src/rpp/rpp/sources/create.hpp +++ b/src/rpp/rpp/sources/create.hpp @@ -18,7 +18,7 @@ namespace rpp::details template OnSubscribe> struct create_strategy { - using value_type = Type; + using value_type = Type; using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe; diff --git a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp index 78ce3fe62..6513b1ea7 100644 --- a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp +++ b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp @@ -19,7 +19,7 @@ namespace rpp::subjects::details concept subject_on_subscribe = requires(const Strategy& t, rpp::details::observers::fake_observer&& obs) { t.on_subscribe(std::move(obs)); }; - } + } // namespace constraint template Strategy> struct subject_on_subscribe diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 764cf5f9c..22daa29ed 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -10,8 +10,8 @@ #pragma once #include -#include #include +#include #include #include @@ -20,7 +20,7 @@ namespace rpp::subjects { template class publish_subject; - + template class serialized_publish_subject; @@ -31,19 +31,24 @@ namespace rpp::subjects template class serialized_replay_subject; - + } // namespace rpp::subjects namespace rpp::constraint { template - concept subject = requires(const T& subj) - { - { subj.get_observer() } -> rpp::constraint::observer; - { subj.get_observable() } -> rpp::constraint::observable; - { subj.get_disposable() } -> rpp::constraint::decayed_any_of; + concept subject = requires(const T& subj) { + { + subj.get_observer() + } -> rpp::constraint::observer; + { + subj.get_observable() + } -> rpp::constraint::observable; + { + subj.get_disposable() + } -> rpp::constraint::decayed_any_of; }; -} +} // namespace rpp::constraint namespace rpp::subjects::utils { diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 4a2a1cdec..52eba58b4 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -50,7 +50,7 @@ namespace rpp::subjects::details auto get_observable() const { - return rpp::source::create([state=m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); }); + return rpp::source::create([state = m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); }); } rpp::disposable_wrapper get_disposable() const @@ -61,7 +61,7 @@ namespace rpp::subjects::details private: disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); }; -} +} // namespace rpp::subjects::details namespace rpp::subjects { /** @@ -77,7 +77,9 @@ namespace rpp::subjects * @see https://reactivex.io/documentation/subject.html */ template - class publish_subject final : public details::publish_subject_base{}; + class publish_subject final : public details::publish_subject_base + { + }; /** * @brief Serialized version of rpp::subjects::publish_subject @@ -87,5 +89,7 @@ namespace rpp::subjects * @see https://reactivex.io/documentation/subject.html */ template - class serialized_publish_subject final : public details::publish_subject_base{}; + class serialized_publish_subject final : public details::publish_subject_base + { + }; } // namespace rpp::subjects \ No newline at end of file diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index 2e14a0e66..ed54547e6 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -43,10 +43,11 @@ namespace rpp::subjects::details struct value_with_time { - value_with_time(const Type& v, rpp::schedulers::clock_type::time_point timepoint) + value_with_time(const Type& v, rpp::schedulers::clock_type::time_point timepoint) : value{v} , timepoint{timepoint} - {} + { + } Type value; rpp::schedulers::clock_type::time_point timepoint; diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index 4b414726e..a50469f66 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -398,15 +398,15 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp auto sub = TestType{}; sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT - CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer - CHECK(tracker.get_move_count() == 0); + CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer + CHECK(tracker.get_move_count() == 0); }); sub.get_observer().on_next(copy_count_tracker{}); sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT - CHECK(tracker.get_copy_count() == 2+1); // + 1 copy values from buffer for this observer - CHECK(tracker.get_move_count() == 0+1); // + 1 move to this observer + CHECK(tracker.get_copy_count() == 2 + 1); // + 1 copy values from buffer for this observer + CHECK(tracker.get_move_count() == 0 + 1); // + 1 move to this observer }); } @@ -416,15 +416,15 @@ TEMPLATE_TEST_CASE("replay subject doesn't introduce additional copies", "", rpp auto sub = TestType{}; sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT - CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer + CHECK(tracker.get_copy_count() == 2); // 1 copy to internal replay buffer + 1 copy to this observer CHECK(tracker.get_move_count() == 0); }); sub.get_observer().on_next(tracker); sub.get_observable().subscribe([](copy_count_tracker tracker) { // NOLINT - CHECK(tracker.get_copy_count() == 2+1); // + 1 copy values from buffer for this observer - CHECK(tracker.get_move_count() == 0+1); // + 1 move to this observer + CHECK(tracker.get_copy_count() == 2 + 1); // + 1 copy values from buffer for this observer + CHECK(tracker.get_move_count() == 0 + 1); // + 1 move to this observer }); } } \ No newline at end of file From d6852566eb937d1698eadb6971749fe7176e7ede Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 17 Feb 2024 13:22:18 +0300 Subject: [PATCH 04/10] compile fix --- src/rpp/rpp/operators/details/forwarding_subject.hpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index e43aea975..728873c43 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -21,7 +21,7 @@ namespace rpp::operators::details { template - class forwarding_strategy + class forwarding_subject { struct observer_strategy { @@ -41,9 +41,9 @@ namespace rpp::operators::details }; public: - using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t>::template add<1>; + using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t>::template add<1>; - explicit forwarding_strategy(disposable_wrapper_impl refcount) + explicit forwarding_subject(disposable_wrapper_impl refcount) : m_refcount{std::move(refcount)} { } @@ -72,7 +72,4 @@ namespace rpp::operators::details disposable_wrapper_impl m_refcount; disposable_wrapper_impl> m_state = disposable_wrapper_impl>::make(); }; - - template - using forwarding_subject = subjects::details::base_subject>; } // namespace rpp::operators::details \ No newline at end of file From 54616213d251de3989a642bc43bbf2364ebf15a4 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 17 Feb 2024 14:33:08 +0300 Subject: [PATCH 05/10] fixes --- .clang-tidy | 10 +++++----- src/rpp/rpp/operators/concat.hpp | 2 +- src/rpp/rpp/operators/details/forwarding_subject.hpp | 1 + src/rpp/rpp/operators/details/strategy.hpp | 2 +- src/rpp/rpp/subjects/details/subject_state.hpp | 5 ++--- src/rpp/rpp/subjects/publish_subject.hpp | 2 +- src/rpp/rpp/subjects/replay_subject.hpp | 1 + 7 files changed, 12 insertions(+), 11 deletions(-) diff --git a/.clang-tidy b/.clang-tidy index c8f8f1c39..f969c9178 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -4,11 +4,11 @@ WarningsAsErrors: '*' HeaderFilterRegex: './src/.*' AnalyzeTemporaryDtors: false FormatStyle: 'file' -HeaderFileExtensions: - - h - - hpp -ImplementationFileExtensions: - - cpp +# HeaderFileExtensions: +# - h +# - hpp +# ImplementationFileExtensions: +# - cpp CheckOptions: - { key: readability-identifier-naming.NamespaceCase, value: lower_case } - { key: readability-identifier-naming.ClassCase, value: lower_case } diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index c8e4bfd68..89bb5ee32 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -90,7 +90,7 @@ namespace rpp::operators::details { auto queue = get_queue(); if (queue->empty()) - return {}; + return std::nullopt; auto observable = queue->front(); queue->pop(); return observable; diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 728873c43..c3db6647d 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include diff --git a/src/rpp/rpp/operators/details/strategy.hpp b/src/rpp/rpp/operators/details/strategy.hpp index 4b6ba35d9..c6bdb1097 100644 --- a/src/rpp/rpp/operators/details/strategy.hpp +++ b/src/rpp/rpp/operators/details/strategy.hpp @@ -46,7 +46,7 @@ namespace rpp::operators::details static auto apply(Observer&& observer, const Args&... vals) { static_assert(rpp::constraint::observer_of_type, typename Operator::template operator_traits::result_type>); - return rpp::observer::template observer_strategy>>{std::forward(observer), vals...}; + return rpp::observer::template observer_strategy>>{std::forward(observer), vals...}; // NOLINT } private: diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 0d902745f..1519ccd32 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -36,8 +36,7 @@ namespace rpp::subjects::details }; template - class subject_state : public std::enable_shared_from_this> - , public composite_disposable + class subject_state : public composite_disposable, public rpp::details::enable_wrapper_from_this> { using shared_observers = std::shared_ptr>>; using state_t = std::variant; @@ -106,7 +105,7 @@ namespace rpp::subjects::details void set_upstream(rpp::dynamic_observer& obs) { obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable( - [weak = this->weak_from_this()]() noexcept // NOLINT(bugprone-exception-escape) + [weak = this->wrapper_from_this()]() noexcept // NOLINT(bugprone-exception-escape) { if (const auto shared = weak.lock()) { diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 52eba58b4..05048c7fc 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -10,11 +10,11 @@ #pragma once #include +#include #include #include #include -#include namespace rpp::subjects::details { diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index ed54547e6..22cd1161a 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include From 5445ac129f799aba9a1355fe970e0594d5fc5fd9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 17 Feb 2024 11:33:18 +0000 Subject: [PATCH 06/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/operators/details/forwarding_subject.hpp | 2 +- src/rpp/rpp/subjects/details/subject_state.hpp | 3 ++- src/rpp/rpp/subjects/publish_subject.hpp | 2 +- src/rpp/rpp/subjects/replay_subject.hpp | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index c3db6647d..19256cf19 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -11,8 +11,8 @@ #include -#include #include +#include #include #include #include diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 1519ccd32..ba6854c4f 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -36,7 +36,8 @@ namespace rpp::subjects::details }; template - class subject_state : public composite_disposable, public rpp::details::enable_wrapper_from_this> + class subject_state : public composite_disposable + , public rpp::details::enable_wrapper_from_this> { using shared_observers = std::shared_ptr>>; using state_t = std::variant; diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 05048c7fc..52eba58b4 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -10,11 +10,11 @@ #pragma once #include -#include #include #include #include +#include namespace rpp::subjects::details { diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index 22cd1161a..aed8cf019 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -12,10 +12,10 @@ #include #include +#include #include #include #include -#include #include #include From 3fe35885ec8e560fc34ba4f1b990744d26154b32 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 17 Feb 2024 14:43:24 +0300 Subject: [PATCH 07/10] compile fix --- src/rpp/rpp/subjects/details/subject_state.hpp | 4 +++- src/rpp/rpp/subjects/publish_subject.hpp | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index ba6854c4f..752f8acc4 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -37,7 +37,7 @@ namespace rpp::subjects::details template class subject_state : public composite_disposable - , public rpp::details::enable_wrapper_from_this> + , public rpp::details::enable_wrapper_from_this> { using shared_observers = std::shared_ptr>>; using state_t = std::variant; @@ -45,6 +45,8 @@ namespace rpp::subjects::details public: using expected_disposable_strategy = rpp::details::observables::atomic_fixed_disposable_strategy_selector<1>; + subject_state() = default; + template TObs> void on_subscribe(TObs&& observer) { diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 52eba58b4..e5168cfe5 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -79,6 +79,8 @@ namespace rpp::subjects template class publish_subject final : public details::publish_subject_base { + public: + using details::publish_subject_base::publish_subject_base; }; /** @@ -91,5 +93,7 @@ namespace rpp::subjects template class serialized_publish_subject final : public details::publish_subject_base { + public: + using details::publish_subject_base::publish_subject_base; }; } // namespace rpp::subjects \ No newline at end of file From 0b7eca948e51e8b5e5e9014722e507fce86e61b2 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 17 Feb 2024 15:06:46 +0300 Subject: [PATCH 08/10] fix --- .../operators/details/forwarding_subject.hpp | 14 ++++---- .../subjects/details/subject_on_subscribe.hpp | 33 ++++++++----------- src/rpp/rpp/subjects/publish_subject.hpp | 4 +-- src/rpp/rpp/subjects/replay_subject.hpp | 8 +++-- 4 files changed, 28 insertions(+), 31 deletions(-) diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 19256cf19..fdc059082 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include @@ -56,14 +56,16 @@ namespace rpp::operators::details auto get_observable() const { - return rpp::source::create([state = m_state, refcount = m_refcount] TObs>(TObs&& observer) { - if (const auto locked = refcount.lock()) - observer.set_upstream(locked->add_ref()); - state.lock()->on_subscribe(std::forward(observer)); + return subjects::details::create_subject_on_subscribe_observable([state = m_state.as_weak(), refcount = m_refcount] TObs>(TObs&& observer) { + if (const auto locked_state = state.lock()) + { + if (const auto locked = refcount.lock()) + observer.set_upstream(locked->add_ref()); + locked_state->on_subscribe(std::forward(observer)); + } }); } - rpp::composite_disposable_wrapper get_disposable() const { return m_state.as_weak(); diff --git a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp index 6513b1ea7..1fbda64ac 100644 --- a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp +++ b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp @@ -9,30 +9,23 @@ #pragma once -#include +#include +#include namespace rpp::subjects::details { - namespace constraint + template OnSubscribe, typename DisposableStrategy> + struct subject_on_subscribe_strategy { - template - concept subject_on_subscribe = requires(const Strategy& t, rpp::details::observers::fake_observer&& obs) { - t.on_subscribe(std::move(obs)); - }; - } // namespace constraint + using value_type = Type; + using expected_disposable_strategy = DisposableStrategy; - template Strategy> - struct subject_on_subscribe - { - using value_type = T; - using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t; - - Strategy strategy; - - template TObs> - void subscribe(TObs&& sub) const - { - strategy.on_subscribe(std::forward(sub)); - } + RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe; }; + + template OnSubscribe> + auto create_subject_on_subscribe_observable(OnSubscribe&& on_subscribe) + { + return rpp::observable, DisposableStrategy>>(std::forward(on_subscribe)); + } } // namespace rpp::subjects::details diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index e5168cfe5..2237768f8 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -13,7 +13,7 @@ #include #include -#include +#include #include namespace rpp::subjects::details @@ -50,7 +50,7 @@ namespace rpp::subjects::details auto get_observable() const { - return rpp::source::create([state = m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); }); + return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { state.lock()->on_subscribe(std::forward(observer)); }); } rpp::disposable_wrapper get_disposable() const diff --git a/src/rpp/rpp/subjects/replay_subject.hpp b/src/rpp/rpp/subjects/replay_subject.hpp index aed8cf019..c86477520 100644 --- a/src/rpp/rpp/subjects/replay_subject.hpp +++ b/src/rpp/rpp/subjects/replay_subject.hpp @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include @@ -84,6 +84,8 @@ namespace rpp::subjects::details struct observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + std::shared_ptr state; void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } @@ -102,7 +104,7 @@ namespace rpp::subjects::details }; public: - using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t>; replay_subject_base() : m_state{disposable_wrapper_impl::make()} @@ -126,7 +128,7 @@ namespace rpp::subjects::details auto get_observable() const { - return rpp::source::create([state = m_state] TObs>(TObs&& observer) { + return create_subject_on_subscribe_observable([state = m_state] TObs>(TObs&& observer) { const auto locked = state.lock(); for (auto&& value : locked->get_actual_values()) observer.on_next(std::move(value.value)); From ceddc94e07a6ff195bfdab328936cdea252b7cce Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 17 Feb 2024 12:06:57 +0000 Subject: [PATCH 09/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/rpp/rpp/subjects/details/subject_on_subscribe.hpp | 3 ++- src/rpp/rpp/subjects/details/subject_state.hpp | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp index 1fbda64ac..06841aa02 100644 --- a/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp +++ b/src/rpp/rpp/subjects/details/subject_on_subscribe.hpp @@ -9,9 +9,10 @@ #pragma once -#include #include +#include + namespace rpp::subjects::details { template OnSubscribe, typename DisposableStrategy> diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 752f8acc4..636a6c2ec 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -37,7 +37,7 @@ namespace rpp::subjects::details template class subject_state : public composite_disposable - , public rpp::details::enable_wrapper_from_this> + , public rpp::details::enable_wrapper_from_this> { using shared_observers = std::shared_ptr>>; using state_t = std::variant; From 051f10ace4fe2780cae58b1a01570f4cc1b4f098 Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Sat, 17 Feb 2024 15:14:12 +0300 Subject: [PATCH 10/10] fix --- src/rpp/rpp/subjects/details/subject_state.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 636a6c2ec..de624fa1e 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -108,7 +108,7 @@ namespace rpp::subjects::details void set_upstream(rpp::dynamic_observer& obs) { obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable( - [weak = this->wrapper_from_this()]() noexcept // NOLINT(bugprone-exception-escape) + [weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape) { if (const auto shared = weak.lock()) {