-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
39974ba
commit 8c6431e
Showing
7 changed files
with
213 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
// ReactivePlusPlus library | ||
// | ||
// Copyright Aleksey Loginov 2022 - present. | ||
// Distributed under the Boost Software License, Version 1.0. | ||
// (See accompanying file LICENSE_1_0.txt or copy at | ||
// https://www.boost.org/LICENSE_1_0.txt) | ||
// | ||
// Project home: https://github.com/victimsnino/ReactivePlusPlus | ||
|
||
#pragma once | ||
|
||
#include <rpp/subjects/fwd.hpp> | ||
#include <rpp/utils/constraints.hpp> | ||
#include <rpp/subscribers/dynamic_subscriber.hpp> | ||
#include <rpp/subjects/details/subject_state.hpp> | ||
#include <rpp/subjects/details/base_subject.hpp> | ||
|
||
namespace rpp::subjects::details | ||
{ | ||
template<rpp::constraint::decayed_type T> | ||
class behavior_strategy | ||
{ | ||
public: | ||
template<rpp::constraint::decayed_same_as<T> TT, rpp::constraint::decayed_same_as<composite_subscription> TSub> | ||
behavior_strategy(TT&& v, TSub&& sub) | ||
: m_state{std::make_shared<behavior_state>(std::forward<TT>(v))} | ||
, m_sub{std::forward<TSub>(sub)} | ||
{ | ||
m_sub.add([state = std::weak_ptr{m_state}] | ||
{ | ||
if (const auto locked = state.lock()) | ||
locked->on_unsubscribe(); | ||
}); | ||
} | ||
|
||
void on_subscribe(const dynamic_subscriber<T>& sub) const | ||
{ | ||
if (m_sub.is_subscribed()) | ||
sub.on_next(m_state->get_value()); | ||
|
||
m_state->on_subscribe(sub); | ||
} | ||
|
||
auto get_subscriber() const | ||
{ | ||
return rpp::make_specific_subscriber<T>(m_sub, | ||
[state = m_state](const T& v) | ||
{ | ||
state->set_value(v); | ||
state->on_next(v); | ||
}, | ||
[state = m_state](const std::exception_ptr& err) | ||
{ | ||
state->on_error(err); | ||
}, | ||
[state = m_state]() | ||
{ | ||
state->on_completed(); | ||
}); | ||
} | ||
|
||
T get_value() const | ||
{ | ||
return m_state->get_value(); | ||
} | ||
|
||
private: | ||
class behavior_state : public subject_state<T> | ||
{ | ||
public: | ||
behavior_state(const T& v) | ||
: subject_state<T>{} | ||
, value{v} {} | ||
|
||
behavior_state(T&& v) | ||
: subject_state<T>{} | ||
, value{std::move(v)} {} | ||
|
||
T get_value() | ||
{ | ||
std::lock_guard lock{mutex}; | ||
return value; | ||
} | ||
|
||
|
||
void set_value(const T& v) | ||
{ | ||
std::lock_guard lock{mutex}; | ||
value = v; | ||
} | ||
|
||
private: | ||
|
||
std::mutex mutex; | ||
T value; | ||
}; | ||
|
||
std::shared_ptr<behavior_state> m_state; | ||
composite_subscription m_sub{}; | ||
}; | ||
} // namespace rpp::subjects::details | ||
|
||
namespace rpp::subjects | ||
{ | ||
/** | ||
* \brief Subject which multicasts values to observers subscribed on it and sends last emitted value (or initial value) on subscribe. It contains two parts: subscriber and observable at the same time. | ||
* | ||
* \details Each subscriber obtains only last/initial value + values which emitted after corresponding subscribe. on_error/on_completer/unsubscribe cached and provided to new subscribers if any | ||
* | ||
* \warning this subject is not synchronized/serialized! It means, that expected to call callbacks of subscriber in the serialized way to follow observable contract: "Observables must issue notifications to observers serially (not in parallel).". If you are not sure or need extra serialization, please, use serialized_subject. | ||
* | ||
* \tparam T value provided by this subject | ||
* | ||
* \ingroup subjects | ||
* \see https://reactivex.io/documentation/subject.html | ||
*/ | ||
template<rpp::constraint::decayed_type T> | ||
class behavior_subject final : public details::base_subject<T, details::behavior_strategy<T>> | ||
{ | ||
public: | ||
behavior_subject(const T& initial_value, const composite_subscription& sub) | ||
: details::base_subject<T, details::behavior_strategy<T>>{initial_value, sub} {} | ||
|
||
behavior_subject(T&& initial_value, const composite_subscription& sub) | ||
: details::base_subject<T, details::behavior_strategy<T>>{std::move(initial_value), sub} {} | ||
|
||
behavior_subject(const T& initial_value, composite_subscription&& sub = composite_subscription{}) | ||
: details::base_subject<T, details::behavior_strategy<T>>{initial_value, std::move(sub)} {} | ||
|
||
behavior_subject(T&& initial_value, composite_subscription&& sub = composite_subscription{}) | ||
: details::base_subject<T, details::behavior_strategy<T>>{std::move(initial_value), std::move(sub)} {} | ||
|
||
|
||
T get_value() const | ||
{ | ||
return details::base_subject<T, details::behavior_strategy<T>>::get_strategy().get_value(); | ||
} | ||
}; | ||
} // namespace rpp::subjects |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
8c6431e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BENCHMARK RESULTS (AUTOGENERATED)
ci-macos
Observable construction
Table
Observable lift
Table
Observable subscribe
Table
Observable subscribe #2
Table
Observer construction
Table
OnNext
Table
Subscriber construction
Table
Subscription
Table
buffer
Table
chains creation test
Table
combine_latest
Table
concat
Table
distinct_until_changed
Table
first
Table
foundamental sources
Table
from
Table
immediate scheduler
Table
just
Table
last
Table
map
Table
merge
Table
observe_on
Table
on_error_resume_next
Table
publish_subject callbacks
Table
publish_subject routines
Table
repeat
Table
scan
Table
single-threaded locks
Table
skip
Table
switch_on_next
Table
take
Table
take_last
Table
take_until
Table
timeout
Table
trampoline scheduler
Table
window
Table
with_latest_from
Table
ci-ubuntu-clang
Observable construction
Table
Observable lift
Table
Observable subscribe
Table
Observable subscribe #2
Table
Observer construction
Table
OnNext
Table
Subscriber construction
Table
Subscription
Table
buffer
Table
chains creation test
Table
combine_latest
Table
concat
Table
distinct_until_changed
Table
first
Table
foundamental sources
Table
from
Table
immediate scheduler
Table
just
Table
last
Table
map
Table
merge
Table
observe_on
Table
on_error_resume_next
Table
publish_subject callbacks
Table
publish_subject routines
Table
repeat
Table
scan
Table
single-threaded locks
Table
skip
Table
switch_on_next
Table
take
Table
take_last
Table
take_until
Table
timeout
Table
trampoline scheduler
Table
window
Table
with_latest_from
Table
ci-ubuntu-gcc
Observable construction
Table
Observable lift
Table
Observable subscribe
Table
Observable subscribe #2
Table
Observer construction
Table
OnNext
Table
Subscriber construction
Table
Subscription
Table
buffer
Table
chains creation test
Table
combine_latest
Table
concat
Table
distinct_until_changed
Table
first
Table
foundamental sources
Table
from
Table
immediate scheduler
Table
just
Table
last
Table
map
Table
merge
Table
observe_on
Table
on_error_resume_next
Table
publish_subject callbacks
Table
publish_subject routines
Table
repeat
Table
scan
Table
single-threaded locks
Table
skip
Table
switch_on_next
Table
take
Table
take_last
Table
take_until
Table
timeout
Table
trampoline scheduler
Table
window
Table
with_latest_from
Table
ci-windows
Observable construction
Table
Observable lift
Table
Observable subscribe
Table
Observable subscribe #2
Table
Observer construction
Table
OnNext
Table
Subscriber construction
Table
Subscription
Table
buffer
Table
chains creation test
Table
combine_latest
Table
concat
Table
distinct_until_changed
Table
first
Table
foundamental sources
Table
from
Table
immediate scheduler
Table
just
Table
last
Table
map
Table
merge
Table
observe_on
Table
on_error_resume_next
Table
publish_subject callbacks
Table
publish_subject routines
Table
repeat
Table
scan
Table
single-threaded locks
Table
skip
Table
switch_on_next
Table
take
Table
take_last
Table
take_until
Table
timeout
Table
trampoline scheduler
Table
window
Table
with_latest_from
Table