diff --git a/include/another-rxcpp/internal/tools/stream_controller.h b/include/another-rxcpp/internal/tools/stream_controller.h index 7a80110..3c9082f 100644 --- a/include/another-rxcpp/internal/tools/stream_controller.h +++ b/include/another-rxcpp/internal/tools/stream_controller.h @@ -25,7 +25,7 @@ template class stream_controller { serial_type serial_; unsubscriber_map unsubscribers_; on_finalizes_t on_finalizes_; - inner(const subscriber_type& sbsc) : subscriber_(sbsc) {} + inner(const subscriber_type& sbsc) noexcept : subscriber_(sbsc), serial_(0) {} }; mutable std::shared_ptr inner_; @@ -33,6 +33,8 @@ template class stream_controller { stream_controller(std::shared_ptr inner) noexcept : inner_(inner) {} + stream_controller() = delete; + public: stream_controller(const subscriber_type& subscriber) noexcept { inner_ = std::make_shared(subscriber); @@ -41,14 +43,9 @@ template class stream_controller { }); } - void set_on_finalize(const on_finalize_t& f) const noexcept { - std::lock_guard lock(inner_->mtx_); - inner_->on_finalizes_.push(f); - } - - void set_on_finalize(on_finalize_t&& f) const noexcept { + template void set_on_finalize(F&& f) const noexcept { std::lock_guard lock(inner_->mtx_); - inner_->on_finalizes_.reset(new on_finalize_t(std::move(f))); + inner_->on_finalizes_ = std::make_shared(std::forward(f)); } template observer new_observer( diff --git a/include/another-rxcpp/internal/tools/util.h b/include/another-rxcpp/internal/tools/util.h index 382b3f5..88c0d19 100644 --- a/include/another-rxcpp/internal/tools/util.h +++ b/include/another-rxcpp/internal/tools/util.h @@ -39,16 +39,6 @@ template return std::weak_ptr(sp); } -template - auto to_shared(T&& value) noexcept { - return std::make_shared(std::forward(value)); - } - -template - auto to_shared(const T& value) noexcept { - return std::make_shared(value); - } - }} /* namespace another_rxcpp::internal */ #endif /* !defined(__another_rxcpp_h_util__) */ diff --git a/include/another-rxcpp/observable.h b/include/another-rxcpp/observable.h index bbff7b1..3d480de 100644 --- a/include/another-rxcpp/observable.h +++ b/include/another-rxcpp/observable.h @@ -80,15 +80,13 @@ template class observable { return subscribe(observer(n, e, c)); } - subscription subscribe( - next_t&& n = {}, - error_t&& e = {}, - completed_t&& c = {} - ) const noexcept { + template + subscription subscribe(N&& n, E&& e, C&& c) const noexcept + { return subscribe(observer( - std::move(n), - std::move(e), - std::move(c) + std::forward(n), + std::forward(e), + std::forward(c) )); } diff --git a/include/another-rxcpp/observables/connectable.h b/include/another-rxcpp/observables/connectable.h index dd6bd14..fcce094 100644 --- a/include/another-rxcpp/observables/connectable.h +++ b/include/another-rxcpp/observables/connectable.h @@ -34,8 +34,8 @@ friend class connectable<>; std::recursive_mutex mtx_; serial_type serial_; subscription subscription_; - member(source_observable&& source): source_(std::move(source)) {} - member(const source_observable& source): source_(source) {} + member(source_observable&& source): source_(std::move(source)), serial_(0) {} + member(const source_observable& source): source_(source), serial_(0) {} }; std::shared_ptr m_; @@ -90,31 +90,30 @@ friend class connectable<>; auto collect = [m](bool clear){ std::lock_guard lock(m->mtx_); - std::vector ret(m->observers_.size()); - auto ret_it = ret.begin(); - for(auto it = m->observers_.begin(); it != m->observers_.end(); it++, ret_it++){ - *ret_it = it->second; - } + std::vector ret; + std::for_each(m->observers_.begin(), m->observers_.end(), [&ret](auto& it){ + ret.push_back(it.second); + }); if(clear) m->observers_.clear(); return ret; }; auto sbsc = m->source_.subscribe( - [collect](value_type x) { + [collect](const value_type& x) { auto obs = collect(false); - std::for_each(obs.begin(), obs.end(), [&](auto ob){ + std::for_each(obs.begin(), obs.end(), [&x](auto& ob){ ob.on_next(x); }); }, [collect](std::exception_ptr err){ auto obs = collect(true); - std::for_each(obs.begin(), obs.end(), [&](auto ob){ + std::for_each(obs.begin(), obs.end(), [&err](auto& ob){ ob.on_error(err); }); }, [collect](){ auto obs = collect(true); - std::for_each(obs.begin(), obs.end(), [&](auto ob){ + std::for_each(obs.begin(), obs.end(), [](auto& ob){ ob.on_completed(); }); } diff --git a/include/another-rxcpp/observer.h b/include/another-rxcpp/observer.h index 2be2439..b9ed547 100644 --- a/include/another-rxcpp/observer.h +++ b/include/another-rxcpp/observer.h @@ -3,7 +3,7 @@ #include #include -#include +#include #include "internal/tools/fn.h" #include "internal/tools/util.h" @@ -36,6 +36,7 @@ template struct observer { private: struct inner { + std::recursive_mutex mtx_; next_sp next_; error_sp error_; completed_sp completed_; @@ -46,45 +47,35 @@ template struct observer { friend class internal::stream_controller; - void set_on_unsubscribe(const unsubscribe_t& f) const noexcept { - inner_->unsubscribe_ = internal::to_shared(f); + template void set_on_unsubscribe(Unsb&& f) const noexcept { + inner_->unsubscribe_ = std::make_shared(std::forward(f)); } - void set_on_unsubscribe(unsubscribe_t&& f) const noexcept { - inner_->unsubscribe_ = internal::to_shared(std::move(f)); + auto fetch_and_reset_all() const noexcept { + std::lock_guard lock(inner_->mtx_); + auto e = inner_->error_; + auto c = inner_->completed_; + auto u = inner_->unsubscribe_; + inner_->next_.reset(); + inner_->error_.reset(); + inner_->completed_.reset(); + inner_->unsubscribe_.reset(); + return std::make_tuple(e, c, u); } - public: - observer(const next_t& n = {}, const error_t& e = {}, const completed_t& c = {}) noexcept { + template observer(N&& n, E&& e, C&& c) noexcept { inner_ = std::make_shared(); - inner_->next_ = internal::to_shared(n); - inner_->error_ = internal::to_shared(e); - inner_->completed_ = internal::to_shared(c); + inner_->next_ = std::make_shared(std::forward(n)); + inner_->error_ = std::make_shared(std::forward(e)); + inner_->completed_ = std::make_shared(std::forward(c)); } - observer(next_t&& n, error_t&& e, completed_t&& c) noexcept { + observer(const next_t& n = {}, const error_t& e = {}, const completed_t& c = {}) noexcept { inner_ = std::make_shared(); - inner_->next_ = internal::to_shared(std::move(n)); - inner_->error_ = internal::to_shared(std::move(e)); - inner_->completed_ = internal::to_shared(std::move(c)); - } - - void unsubscribe() const noexcept { - auto unsb = inner_->unsubscribe_; - - inner_->next_.reset(); - inner_->error_.reset(); - inner_->completed_.reset(); - inner_->unsubscribe_.reset(); - - if(unsb && *unsb) { - (*unsb)(); - } - } - - bool is_subscribed() const noexcept { - return inner_->next_ && inner_->error_ && inner_->completed_; + inner_->next_ = std::make_shared(n); + inner_->error_ = std::make_shared(e); + inner_->completed_ = std::make_shared(c); } void on_next(const value_type& value) const noexcept { @@ -98,20 +89,30 @@ template struct observer { } void on_error(std::exception_ptr err) const noexcept { - auto e = inner_->error_; - auto u = inner_->unsubscribe_; - unsubscribe(); + auto ecu = fetch_and_reset_all(); + auto e = std::get<0>(ecu); + auto u = std::get<2>(ecu); if(e && *e) (*e)(err); if(u && *u) (*u)(); } void on_completed() const noexcept { - auto c = inner_->completed_; - auto u = inner_->unsubscribe_; - unsubscribe(); + auto ecu = fetch_and_reset_all(); + auto c = std::get<1>(ecu); + auto u = std::get<2>(ecu); if(c && *c) (*c)(); if(u && *u) (*u)(); } + + void unsubscribe() const noexcept { + auto ecu = fetch_and_reset_all(); + auto u = std::get<2>(ecu); + if(u && *u) (*u)(); + } + + bool is_subscribed() const noexcept { + return inner_->next_ && inner_->error_ && inner_->completed_; + } }; } /* namespace another_rxcpp */ diff --git a/include/another-rxcpp/operators/amb.h b/include/another-rxcpp/operators/amb.h index 0f84aec..b43ffdc 100644 --- a/include/another-rxcpp/operators/amb.h +++ b/include/another-rxcpp/operators/amb.h @@ -15,7 +15,7 @@ namespace amb_internal { template auto amb(scheduler::creator_fn sccr, std::vector>& arr, OB ob) noexcept { arr.push_back(ob); - return [sccr, arr = internal::to_shared(std::move(arr))](auto src) { + return [sccr, arr = std::make_shared>>(std::move(arr))](auto src) { arr->push_back(src); return observable<>::create([sccr, arr](subscriber s) { auto sctl = internal::stream_controller(s); diff --git a/include/another-rxcpp/operators/merge.h b/include/another-rxcpp/operators/merge.h index a72e3b6..eea1041 100644 --- a/include/another-rxcpp/operators/merge.h +++ b/include/another-rxcpp/operators/merge.h @@ -15,7 +15,7 @@ namespace merge_internal { template auto merge(scheduler::creator_fn sccr, std::vector>& arr, OB ob) noexcept { arr.push_back(ob); - return [sccr, arr = internal::to_shared(std::move(arr))](auto src) { + return [sccr, arr = std::make_shared>>(std::move(arr))](auto src) { arr->push_back(src); return observable<>::create([sccr, arr](subscriber s) { auto sctl = internal::stream_controller(s); diff --git a/include/another-rxcpp/subscriber.h b/include/another-rxcpp/subscriber.h index de2fb1e..f9f730a 100644 --- a/include/another-rxcpp/subscriber.h +++ b/include/another-rxcpp/subscriber.h @@ -1,9 +1,6 @@ #if !defined(__another_rxcpp_h_subscriber__) #define __another_rxcpp_h_subscriber__ -#include -#include -#include #include "observer.h" namespace another_rxcpp { diff --git a/include/another-rxcpp/subscription.h b/include/another-rxcpp/subscription.h index 81a2736..420b1cb 100644 --- a/include/another-rxcpp/subscription.h +++ b/include/another-rxcpp/subscription.h @@ -10,27 +10,25 @@ class subscription { struct member { const internal::fn unsubscribe_; const internal::fn is_subscribed_; + template + member(Unsb&& unsb, Issb&& issb) noexcept : + unsubscribe_(std::forward(unsb)), + is_subscribed_(std::forward(issb)) {} }; std::shared_ptr m_; public: - subscription() = default; - subscription( - const internal::fn& unsubscribe, - const internal::fn& is_subscribed - ) noexcept : m_(std::make_shared(member{ - .unsubscribe_ = unsubscribe, - .is_subscribed_ = is_subscribed - })) {} - subscription( - internal::fn&& unsubscribe, - internal::fn&& is_subscribed - ) noexcept : m_(std::make_shared(member{ - .unsubscribe_ = std::move(unsubscribe), - .is_subscribed_ = std::move(is_subscribed) - })) {} - subscription(const subscription& src): m_(src.m_){} + subscription() noexcept = default; + + template + subscription(Unsb&& unsb, Issb&& issb) noexcept : + m_(std::make_shared( + std::forward(unsb), + std::forward(issb) + )){} + + subscription(const subscription& src) noexcept : m_(src.m_){} const subscription& operator = (const subscription& src) { m_ = src.m_;