diff --git a/docs/Implementation Status.md b/docs/Implementation Status.md index bbbac7ebb..89d16a1da 100644 --- a/docs/Implementation Status.md +++ b/docs/Implementation Status.md @@ -150,7 +150,7 @@ ## Subjects - [x] publish_subject +- [x] behavior_subject - [ ] serialized_subject - [ ] replay_subject -- [ ] publish_subject - [ ] async_subject diff --git a/src/rpp/rpp/subjects.hpp b/src/rpp/rpp/subjects.hpp index 58fc1aafd..0a1aa2893 100644 --- a/src/rpp/rpp/subjects.hpp +++ b/src/rpp/rpp/subjects.hpp @@ -17,4 +17,5 @@ * \ingroup rpp */ -#include \ No newline at end of file +#include +#include \ No newline at end of file diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp new file mode 100644 index 000000000..17d7a242d --- /dev/null +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -0,0 +1,139 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once + +#include +#include +#include +#include +#include + +namespace rpp::subjects::details +{ +template +class behavior_strategy +{ +public: + template TT, rpp::constraint::decayed_same_as TSub> + behavior_strategy(TT&& v, TSub&& sub) + : m_state{std::make_shared(std::forward(v))} + , m_sub{std::forward(sub)} + { + m_sub.add([state = std::weak_ptr{m_state}] + { + if (const auto locked = state.lock()) + locked->on_unsubscribe(); + }); + } + + void on_subscribe(const dynamic_subscriber& sub) const + { + if (m_sub.is_subscribed()) + sub.on_next(m_state->get_value()); + + m_state->on_subscribe(sub); + } + + auto get_subscriber() const + { + return rpp::make_specific_subscriber(m_sub, + [state = m_state](const T& v) + { + state->set_value(v); + state->on_next(v); + }, + [state = m_state](const std::exception_ptr& err) + { + state->on_error(err); + }, + [state = m_state]() + { + state->on_completed(); + }); + } + + T get_value() const + { + return m_state->get_value(); + } + +private: + class behavior_state : public subject_state + { + public: + behavior_state(const T& v) + : subject_state{} + , value{v} {} + + behavior_state(T&& v) + : subject_state{} + , value{std::move(v)} {} + + T get_value() + { + std::lock_guard lock{mutex}; + return value; + } + + + void set_value(const T& v) + { + std::lock_guard lock{mutex}; + value = v; + } + + private: + + std::mutex mutex; + T value; + }; + + std::shared_ptr m_state; + composite_subscription m_sub{}; +}; +} // namespace rpp::subjects::details + +namespace rpp::subjects +{ +/** + * \brief Subject which multicasts values to observers subscribed on it and sends last emitted value (or initial value) on subscribe. It contains two parts: subscriber and observable at the same time. + * + * \details Each subscriber obtains only last/initial value + values which emitted after corresponding subscribe. on_error/on_completer/unsubscribe cached and provided to new subscribers if any + * + * \warning this subject is not synchronized/serialized! It means, that expected to call callbacks of subscriber in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_subject. + * + * \tparam T value provided by this subject + * + * \ingroup subjects + * \see https://reactivex.io/documentation/subject.html + */ +template +class behavior_subject final : public details::base_subject> +{ +public: + behavior_subject(const T& initial_value, const composite_subscription& sub) + : details::base_subject>{initial_value, sub} {} + + behavior_subject(T&& initial_value, const composite_subscription& sub) + : details::base_subject>{std::move(initial_value), sub} {} + + behavior_subject(const T& initial_value, composite_subscription&& sub = composite_subscription{}) + : details::base_subject>{initial_value, std::move(sub)} {} + + behavior_subject(T&& initial_value, composite_subscription&& sub = composite_subscription{}) + : details::base_subject>{std::move(initial_value), std::move(sub)} {} + + + T get_value() const + { + return details::base_subject>::get_strategy().get_value(); + } +}; +} // namespace rpp::subjects diff --git a/src/rpp/rpp/subjects/details/base_subject.hpp b/src/rpp/rpp/subjects/details/base_subject.hpp index 99f022110..003aea899 100644 --- a/src/rpp/rpp/subjects/details/base_subject.hpp +++ b/src/rpp/rpp/subjects/details/base_subject.hpp @@ -22,9 +22,6 @@ template Strategy> class base_subject : public subject_tag { public: - base_subject(const composite_subscription& sub = composite_subscription{}) - : m_strategy{sub} {} - auto get_subscriber() const { return m_strategy.get_subscriber(); @@ -38,7 +35,13 @@ class base_subject : public subject_tag }); } +protected: + base_subject(auto&& ...args) + : m_strategy{std::forward(args)...} {} + + const Strategy& get_strategy() const { return m_strategy; } + private: Strategy m_strategy{}; }; -} // namespace rpp::subjects::details \ No newline at end of file +} // namespace rpp::subjects::details diff --git a/src/rpp/rpp/subjects/fwd.hpp b/src/rpp/rpp/subjects/fwd.hpp index 3452e1c55..d7a0504e8 100644 --- a/src/rpp/rpp/subjects/fwd.hpp +++ b/src/rpp/rpp/subjects/fwd.hpp @@ -18,7 +18,7 @@ namespace rpp::subjects::details struct subject_tag; template -concept subject_strategy = std::constructible_from && requires(Strategy t) +concept subject_strategy = requires(Strategy t) { {t.get_subscriber()} -> rpp::constraint::subscriber; t.on_subscribe(std::declval>()); @@ -32,4 +32,7 @@ namespace rpp::subjects { template class publish_subject; + +template +class behavior_subject; } // namespace rpp::subjects diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 0417de5af..f55199ffc 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -21,8 +21,9 @@ template class publish_strategy { public: - publish_strategy(const composite_subscription& sub) - : m_sub{sub} + template TSub> + publish_strategy(TSub&& sub) + : m_sub{std::forward(sub)} { m_sub.add([state = std::weak_ptr{m_state}] { @@ -74,5 +75,12 @@ namespace rpp::subjects * \see https://reactivex.io/documentation/subject.html */ template -class publish_subject final : public details::base_subject>{}; +class publish_subject final : public details::base_subject>{ +public: + publish_subject(const composite_subscription& sub) + : details::base_subject>{sub} {} + + publish_subject(composite_subscription&& sub = composite_subscription{}) + : details::base_subject>{std::move(sub)} {} +}; } // namespace rpp::subjects diff --git a/src/tests/rpp/test_subjects.cpp b/src/tests/rpp/test_subjects.cpp index e3c921340..d047b9095 100644 --- a/src/tests/rpp/test_subjects.cpp +++ b/src/tests/rpp/test_subjects.cpp @@ -10,6 +10,7 @@ #include "mock_observer.hpp" #include +#include #include SCENARIO("publish subject multicasts values") @@ -231,4 +232,51 @@ SCENARIO("publish subject caches error/completed/unsubscribe") } } } -} \ No newline at end of file +} + +SCENARIO("behavior subject caches last emission") +{ + auto mock = mock_observer{}; + GIVEN("behavior subject") + { + auto subj = rpp::subjects::behavior_subject{100}; + WHEN("subscribe on it") + { + subj.get_observable().subscribe(mock); + THEN("subject emits initial value on subscribe") + { + CHECK(mock.get_received_values() == std::vector{100}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + WHEN("send new value to subject") + { + subj.get_subscriber().on_next(5); + AND_WHEN("subscribe on it") + { + subj.get_observable().subscribe(mock); + THEN("subject emits last value on subscribe") + { + CHECK(mock.get_received_values() == std::vector{5}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 0); + } + } + } + WHEN("complete subject") + { + subj.get_subscriber().on_completed(); + AND_WHEN("subscribe on it") + { + subj.get_observable().subscribe(mock); + THEN("subject emits completion on subscribe") + { + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + } +}