Skip to content

Commit

Permalink
0.3.3: Merge pull request #6 from CODIANZ/development
Browse files Browse the repository at this point in the history
0.3.3
  • Loading branch information
terukazu-inoue authored Apr 28, 2023
2 parents c6bbdd0 + 36a35ec commit 388c65c
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 95 deletions.
13 changes: 5 additions & 8 deletions include/another-rxcpp/internal/tools/stream_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ template <typename T> class stream_controller {
serial_type serial_;
unsubscriber_map unsubscribers_;
on_finalizes_t on_finalizes_;
inner(const subscriber_type& sbsc) : subscriber_(sbsc) {}
inner(const subscriber_type& sbsc) noexcept : subscriber_(sbsc), serial_(0) {}
};

mutable std::shared_ptr<inner> inner_;

stream_controller(std::shared_ptr<inner> inner) noexcept
: inner_(inner) {}

stream_controller() = delete;

public:
stream_controller(const subscriber_type& subscriber) noexcept {
inner_ = std::make_shared<inner>(subscriber);
Expand All @@ -41,14 +43,9 @@ template <typename T> class stream_controller {
});
}

void set_on_finalize(const on_finalize_t& f) const noexcept {
std::lock_guard<std::recursive_mutex> lock(inner_->mtx_);
inner_->on_finalizes_.push(f);
}

void set_on_finalize(on_finalize_t&& f) const noexcept {
template <typename F> void set_on_finalize(F&& f) const noexcept {
std::lock_guard<std::recursive_mutex> lock(inner_->mtx_);
inner_->on_finalizes_.reset(new on_finalize_t(std::move(f)));
inner_->on_finalizes_ = std::make_shared<on_finalize_t>(std::forward<F>(f));
}

template <typename In> observer<In> new_observer(
Expand Down
10 changes: 0 additions & 10 deletions include/another-rxcpp/internal/tools/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ template <typename SP>
return std::weak_ptr<typename SP::element_type>(sp);
}

template <typename T>
auto to_shared(T&& value) noexcept {
return std::make_shared<T>(std::forward<T>(value));
}

template <typename T>
auto to_shared(const T& value) noexcept {
return std::make_shared<T>(value);
}

}} /* namespace another_rxcpp::internal */

#endif /* !defined(__another_rxcpp_h_util__) */
14 changes: 6 additions & 8 deletions include/another-rxcpp/observable.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,13 @@ template <typename T> class observable {
return subscribe(observer<value_type>(n, e, c));
}

subscription subscribe(
next_t&& n = {},
error_t&& e = {},
completed_t&& c = {}
) const noexcept {
template <typename N, typename E, typename C>
subscription subscribe(N&& n, E&& e, C&& c) const noexcept
{
return subscribe(observer<value_type>(
std::move(n),
std::move(e),
std::move(c)
std::forward<N>(n),
std::forward<E>(e),
std::forward<C>(c)
));
}

Expand Down
21 changes: 10 additions & 11 deletions include/another-rxcpp/observables/connectable.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ friend class connectable<>;
std::recursive_mutex mtx_;
serial_type serial_;
subscription subscription_;
member(source_observable&& source): source_(std::move(source)) {}
member(const source_observable& source): source_(source) {}
member(source_observable&& source): source_(std::move(source)), serial_(0) {}
member(const source_observable& source): source_(source), serial_(0) {}
};
std::shared_ptr<member> m_;

Expand Down Expand Up @@ -90,31 +90,30 @@ friend class connectable<>;

auto collect = [m](bool clear){
std::lock_guard<std::recursive_mutex> lock(m->mtx_);
std::vector<observer_type> ret(m->observers_.size());
auto ret_it = ret.begin();
for(auto it = m->observers_.begin(); it != m->observers_.end(); it++, ret_it++){
*ret_it = it->second;
}
std::vector<observer_type> ret;
std::for_each(m->observers_.begin(), m->observers_.end(), [&ret](auto& it){
ret.push_back(it.second);
});
if(clear) m->observers_.clear();
return ret;
};

auto sbsc = m->source_.subscribe(
[collect](value_type x) {
[collect](const value_type& x) {
auto obs = collect(false);
std::for_each(obs.begin(), obs.end(), [&](auto ob){
std::for_each(obs.begin(), obs.end(), [&x](auto& ob){
ob.on_next(x);
});
},
[collect](std::exception_ptr err){
auto obs = collect(true);
std::for_each(obs.begin(), obs.end(), [&](auto ob){
std::for_each(obs.begin(), obs.end(), [&err](auto& ob){
ob.on_error(err);
});
},
[collect](){
auto obs = collect(true);
std::for_each(obs.begin(), obs.end(), [&](auto ob){
std::for_each(obs.begin(), obs.end(), [](auto& ob){
ob.on_completed();
});
}
Expand Down
75 changes: 38 additions & 37 deletions include/another-rxcpp/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include <functional>
#include <memory>
#include <optional>
#include <mutex>
#include "internal/tools/fn.h"
#include "internal/tools/util.h"

Expand Down Expand Up @@ -36,6 +36,7 @@ template <typename T> struct observer {

private:
struct inner {
std::recursive_mutex mtx_;
next_sp next_;
error_sp error_;
completed_sp completed_;
Expand All @@ -46,45 +47,35 @@ template <typename T> struct observer {

friend class internal::stream_controller<T>;

void set_on_unsubscribe(const unsubscribe_t& f) const noexcept {
inner_->unsubscribe_ = internal::to_shared(f);
template <typename Unsb> void set_on_unsubscribe(Unsb&& f) const noexcept {
inner_->unsubscribe_ = std::make_shared<unsubscribe_t>(std::forward<Unsb>(f));
}

void set_on_unsubscribe(unsubscribe_t&& f) const noexcept {
inner_->unsubscribe_ = internal::to_shared(std::move(f));
auto fetch_and_reset_all() const noexcept {
std::lock_guard<std::recursive_mutex> lock(inner_->mtx_);
auto e = inner_->error_;
auto c = inner_->completed_;
auto u = inner_->unsubscribe_;
inner_->next_.reset();
inner_->error_.reset();
inner_->completed_.reset();
inner_->unsubscribe_.reset();
return std::make_tuple(e, c, u);
}


public:
observer(const next_t& n = {}, const error_t& e = {}, const completed_t& c = {}) noexcept {
template<typename N, typename E, typename C> observer(N&& n, E&& e, C&& c) noexcept {
inner_ = std::make_shared<inner>();
inner_->next_ = internal::to_shared(n);
inner_->error_ = internal::to_shared(e);
inner_->completed_ = internal::to_shared(c);
inner_->next_ = std::make_shared<next_t>(std::forward<N>(n));
inner_->error_ = std::make_shared<error_t>(std::forward<E>(e));
inner_->completed_ = std::make_shared<completed_t>(std::forward<C>(c));
}

observer(next_t&& n, error_t&& e, completed_t&& c) noexcept {
observer(const next_t& n = {}, const error_t& e = {}, const completed_t& c = {}) noexcept {
inner_ = std::make_shared<inner>();
inner_->next_ = internal::to_shared(std::move(n));
inner_->error_ = internal::to_shared(std::move(e));
inner_->completed_ = internal::to_shared(std::move(c));
}

void unsubscribe() const noexcept {
auto unsb = inner_->unsubscribe_;

inner_->next_.reset();
inner_->error_.reset();
inner_->completed_.reset();
inner_->unsubscribe_.reset();

if(unsb && *unsb) {
(*unsb)();
}
}

bool is_subscribed() const noexcept {
return inner_->next_ && inner_->error_ && inner_->completed_;
inner_->next_ = std::make_shared<next_t>(n);
inner_->error_ = std::make_shared<error_t>(e);
inner_->completed_ = std::make_shared<completed_t>(c);
}

void on_next(const value_type& value) const noexcept {
Expand All @@ -98,20 +89,30 @@ template <typename T> struct observer {
}

void on_error(std::exception_ptr err) const noexcept {
auto e = inner_->error_;
auto u = inner_->unsubscribe_;
unsubscribe();
auto ecu = fetch_and_reset_all();
auto e = std::get<0>(ecu);
auto u = std::get<2>(ecu);
if(e && *e) (*e)(err);
if(u && *u) (*u)();
}

void on_completed() const noexcept {
auto c = inner_->completed_;
auto u = inner_->unsubscribe_;
unsubscribe();
auto ecu = fetch_and_reset_all();
auto c = std::get<1>(ecu);
auto u = std::get<2>(ecu);
if(c && *c) (*c)();
if(u && *u) (*u)();
}

void unsubscribe() const noexcept {
auto ecu = fetch_and_reset_all();
auto u = std::get<2>(ecu);
if(u && *u) (*u)();
}

bool is_subscribed() const noexcept {
return inner_->next_ && inner_->error_ && inner_->completed_;
}
};

} /* namespace another_rxcpp */
Expand Down
2 changes: 1 addition & 1 deletion include/another-rxcpp/operators/amb.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace amb_internal {
template <typename T, typename OB>
auto amb(scheduler::creator_fn sccr, std::vector<observable<T>>& arr, OB ob) noexcept {
arr.push_back(ob);
return [sccr, arr = internal::to_shared(std::move(arr))](auto src) {
return [sccr, arr = std::make_shared<std::vector<observable<T>>>(std::move(arr))](auto src) {
arr->push_back(src);
return observable<>::create<T>([sccr, arr](subscriber<T> s) {
auto sctl = internal::stream_controller<T>(s);
Expand Down
2 changes: 1 addition & 1 deletion include/another-rxcpp/operators/merge.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace merge_internal {
template <typename T, typename OB>
auto merge(scheduler::creator_fn sccr, std::vector<observable<T>>& arr, OB ob) noexcept {
arr.push_back(ob);
return [sccr, arr = internal::to_shared(std::move(arr))](auto src) {
return [sccr, arr = std::make_shared<std::vector<observable<T>>>(std::move(arr))](auto src) {
arr->push_back(src);
return observable<>::create<T>([sccr, arr](subscriber<T> s) {
auto sctl = internal::stream_controller<T>(s);
Expand Down
3 changes: 0 additions & 3 deletions include/another-rxcpp/subscriber.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#if !defined(__another_rxcpp_h_subscriber__)
#define __another_rxcpp_h_subscriber__

#include <functional>
#include <memory>
#include <optional>
#include "observer.h"

namespace another_rxcpp {
Expand Down
30 changes: 14 additions & 16 deletions include/another-rxcpp/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,25 @@ class subscription {
struct member {
const internal::fn<void()> unsubscribe_;
const internal::fn<bool()> is_subscribed_;
template <typename Unsb, typename Issb>
member(Unsb&& unsb, Issb&& issb) noexcept :
unsubscribe_(std::forward<Unsb>(unsb)),
is_subscribed_(std::forward<Issb>(issb)) {}
};

std::shared_ptr<member> m_;

public:
subscription() = default;
subscription(
const internal::fn<void()>& unsubscribe,
const internal::fn<bool()>& is_subscribed
) noexcept : m_(std::make_shared<member>(member{
.unsubscribe_ = unsubscribe,
.is_subscribed_ = is_subscribed
})) {}
subscription(
internal::fn<void()>&& unsubscribe,
internal::fn<bool()>&& is_subscribed
) noexcept : m_(std::make_shared<member>(member{
.unsubscribe_ = std::move(unsubscribe),
.is_subscribed_ = std::move(is_subscribed)
})) {}
subscription(const subscription& src): m_(src.m_){}
subscription() noexcept = default;

template <typename Unsb, typename Issb>
subscription(Unsb&& unsb, Issb&& issb) noexcept :
m_(std::make_shared<member>(
std::forward<Unsb>(unsb),
std::forward<Issb>(issb)
)){}

subscription(const subscription& src) noexcept : m_(src.m_){}

const subscription& operator = (const subscription& src) {
m_ = src.m_;
Expand Down

0 comments on commit 388c65c

Please sign in to comment.