Skip to content

Commit

Permalink
Partially revert commit
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino committed Nov 3, 2024
1 parent a2c2a5f commit e3706c8
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 241 deletions.
13 changes: 4 additions & 9 deletions src/rpp/rpp/disposables/refcount_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,7 @@ namespace rpp
friend class details::refocunt_disposable_inner;
refcount_disposable() = default;

enum class Mode : bool
{
WeakRefStrongSource,
StrongRefRefSource
};
composite_disposable_wrapper add_ref(Mode mode = Mode::WeakRefStrongSource);
composite_disposable_wrapper add_ref();

private:
std::atomic<size_t> m_refcount{0};
Expand Down Expand Up @@ -96,7 +91,7 @@ namespace rpp::details

namespace rpp
{
inline composite_disposable_wrapper refcount_disposable::add_ref(refcount_disposable::Mode mode)
inline composite_disposable_wrapper refcount_disposable::add_ref()
{
auto current_value = m_refcount.load(std::memory_order::seq_cst);
while (true)
Expand All @@ -107,8 +102,8 @@ namespace rpp
// just need atomicity, not guarding anything
if (m_refcount.compare_exchange_strong(current_value, current_value + 1, std::memory_order::seq_cst))
{
auto inner = composite_disposable_wrapper::make<details::refocunt_disposable_inner>(mode == Mode::WeakRefStrongSource ? wrapper_from_this() : wrapper_from_this().as_weak());
add(mode == Mode::WeakRefStrongSource ? inner.as_weak() : inner);
auto inner = composite_disposable_wrapper::make<details::refocunt_disposable_inner>(wrapper_from_this());
add(inner.as_weak());
return inner;
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
namespace rpp::operators::details
{
template<rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
class combine_latest_state final : public combining_state<Observer>
class combine_latest_disposable final : public combining_disposable<Observer>
{
public:
explicit combine_latest_state(Observer&& observer, const TSelector& selector)
: combining_state<Observer>(std::move(observer), sizeof...(Args))
explicit combine_latest_disposable(Observer&& observer, const TSelector& selector)
: combining_disposable<Observer>(std::move(observer), sizeof...(Args))
, m_selector(selector)
{
}
Expand All @@ -40,31 +40,31 @@ namespace rpp::operators::details

template<size_t I, rpp::constraint::observer Observer, typename TSelector, rpp::constraint::decayed_type... Args>
struct combine_latest_observer_strategy final
: public combining_observer_strategy<combine_latest_state<Observer, TSelector, Args...>>
: public combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>
{
using combining_observer_strategy<combine_latest_state<Observer, TSelector, Args...>>::state;
using combining_observer_strategy<combine_latest_disposable<Observer, TSelector, Args...>>::disposable;

template<typename T>
void on_next(T&& v) const
{
// mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one
const auto observer = state->get_observer_under_lock();
state->get_values().template get<I>().emplace(std::forward<T>(v));
const auto observer = disposable->get_observer_under_lock();
disposable->get_values().template get<I>().emplace(std::forward<T>(v));

state->get_values().apply(&apply_impl<decltype(state)>, state, observer);
disposable->get_values().apply(&apply_impl<decltype(disposable)>, disposable, observer);
}

private:
template<typename TState>
static void apply_impl(const TState& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
template<typename TDisposable>
static void apply_impl(const TDisposable& disposable, const rpp::utils::pointer_under_lock<Observer>& observer, const std::optional<Args>&... vals)
{
if ((vals.has_value() && ...))
observer->on_next(disposable->get_selector()(vals.value()...));
}
};

template<typename TSelector, rpp::constraint::observable... TObservables>
struct combine_latest_t : public combining_operator_t<combine_latest_state, combine_latest_observer_strategy, TSelector, TObservables...>
struct combine_latest_t : public combining_operator_t<combine_latest_disposable, combine_latest_observer_strategy, TSelector, TObservables...>
{
};
} // namespace rpp::operators::details
Expand Down
58 changes: 31 additions & 27 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,29 @@ namespace rpp::operators::details
};

template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
class concat_state_t final : public std::enable_shared_from_this<concat_state_t<TObservable, TObserver>>
class concat_disposable final : public rpp::refcount_disposable
{
public:
concat_state_t(TObserver&& observer)
concat_disposable(TObserver&& observer)
: m_observer{std::move(observer)}
{
const auto d = disposable_wrapper_impl<refcount_disposable>::make();
m_disposable = d.lock();
get_observer()->set_upstream(d);
}

rpp::utils::pointer_under_lock<TObserver> get_observer() { return m_observer; }
rpp::utils::pointer_under_lock<std::queue<TObservable>> get_queue() { return m_queue; }
const std::shared_ptr<refcount_disposable>& get_disposable() const { return m_disposable; }

std::atomic<ConcatStage>& stage() { return m_stage; }

void drain(rpp::composite_disposable_wrapper refcounted)
{
while (!m_disposable->is_disposed())
while (!is_disposed())
{
const auto observable = get_observable();
if (!observable)
{
stage().store(ConcatStage::None, std::memory_order::relaxed);
refcounted.dispose();
if (m_disposable->is_disposed())
if (is_disposed())
get_observer()->on_completed();
return;
}
Expand All @@ -78,13 +74,12 @@ namespace rpp::operators::details
drain(refcounted);
}


private:
bool handle_observable_impl(const rpp::constraint::decayed_same_as<TObservable> auto& observable, rpp::composite_disposable_wrapper refcounted)
{
stage().store(ConcatStage::Draining, std::memory_order::relaxed);
refcounted.clear();
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{this->shared_from_this(), std::move(refcounted)});
observable.subscribe(concat_inner_observer_strategy<TObservable, TObserver>{disposable_wrapper_impl<concat_disposable>{wrapper_from_this()}.lock(), std::move(refcounted)});

ConcatStage current = ConcatStage::Draining;
return stage().compare_exchange_strong(current, ConcatStage::Processing, std::memory_order::seq_cst);
Expand All @@ -102,7 +97,6 @@ namespace rpp::operators::details
}

private:
std::shared_ptr<refcount_disposable> m_disposable{};
rpp::utils::value_with_mutex<TObserver> m_observer;
rpp::utils::value_with_mutex<std::queue<TObservable>> m_queue;
std::atomic<ConcatStage> m_stage{};
Expand All @@ -111,23 +105,23 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_observer_strategy_base
{
concat_observer_strategy_base(std::shared_ptr<concat_state_t<TObservable, TObserver>> state, rpp::composite_disposable_wrapper refcounted)
: state{std::move(state)}
concat_observer_strategy_base(std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable, rpp::composite_disposable_wrapper refcounted)
: disposable{std::move(disposable)}
, refcounted{std::move(refcounted)}
{
}

concat_observer_strategy_base(std::shared_ptr<concat_state_t<TObservable, TObserver>> state)
: concat_observer_strategy_base{state, state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource)}
concat_observer_strategy_base(std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable)
: concat_observer_strategy_base{disposable, disposable->add_ref()}
{
}

std::shared_ptr<concat_state_t<TObservable, TObserver>> state;
rpp::composite_disposable_wrapper refcounted;
std::shared_ptr<concat_disposable<TObservable, TObserver>> disposable;
rpp::composite_disposable_wrapper refcounted;

void on_error(const std::exception_ptr& err) const
{
state->get_observer()->on_error(err);
disposable->get_observer()->on_error(err);
}

void set_upstream(const disposable_wrapper& d) const { refcounted.add(d); }
Expand All @@ -146,18 +140,18 @@ namespace rpp::operators::details
template<typename T>
void on_next(T&& v) const
{
base::state->get_observer()->on_next(std::forward<T>(v));
base::disposable->get_observer()->on_next(std::forward<T>(v));
}

void on_completed() const
{
ConcatStage current{ConcatStage::Draining};
if (base::state->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::CompletedWhileDraining, std::memory_order::seq_cst))
return;

assert(current == ConcatStage::Processing);

base::state->drain(base::refcounted);
base::disposable->drain(base::refcounted);
}
};

Expand All @@ -168,25 +162,35 @@ namespace rpp::operators::details
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

concat_observer_strategy(TObserver&& observer)
: base{std::make_shared<concat_state_t<TObservable, TObserver>>(std::move(observer))}
: base{init_state(std::move(observer))}
{
}

template<typename T>
void on_next(T&& v) const
{
ConcatStage current = ConcatStage::None;
if (base::state->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
base::state->handle_observable(std::forward<T>(v), base::state->get_disposable()->add_ref(refcount_disposable::Mode::StrongRefRefSource));
if (base::disposable->stage().compare_exchange_strong(current, ConcatStage::Draining, std::memory_order::seq_cst))
base::disposable->handle_observable(std::forward<T>(v), base::disposable->add_ref());
else
base::state->get_queue()->push(std::forward<T>(v));
base::disposable->get_queue()->push(std::forward<T>(v));
}

void on_completed() const
{
base::refcounted.dispose();
if (base::state->get_disposable()->is_disposed())
base::state->get_observer()->on_completed();
if (base::disposable->is_disposed())
base::disposable->get_observer()->on_completed();
}


private:
static std::shared_ptr<concat_disposable<TObservable, TObserver>> init_state(TObserver&& observer)
{
const auto d = disposable_wrapper_impl<concat_disposable<TObservable, TObserver>>::make(std::move(observer));
auto ptr = d.lock();
ptr->get_observer()->set_upstream(d.as_weak());
return ptr;
}
};

Expand Down
65 changes: 34 additions & 31 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,33 @@

#include <rpp/operators/fwd.hpp>

#include <rpp/disposables/composite_disposable.hpp>
#include <rpp/operators/details/strategy.hpp>
#include <rpp/utils/utils.hpp>

namespace rpp::operators::details
{
template<rpp::constraint::observer Observer, typename Worker>
class debounce_state;
template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
class debounce_disposable;

template<rpp::constraint::observer Observer, typename Worker>
struct debounce_state_wrapper
template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
struct debounce_disposable_wrapper
{
std::shared_ptr<debounce_state<Observer, Worker>> state{};
std::shared_ptr<debounce_disposable<Observer, Worker, Container>> disposable{};

bool is_disposed() const { return state->is_disposed(); }
bool is_disposed() const { return disposable->is_disposed(); }

void on_error(const std::exception_ptr& err) const { state->get_observer_under_lock()->on_error(err); }
void on_error(const std::exception_ptr& err) const { disposable->get_observer_under_lock()->on_error(err); }
};

template<rpp::constraint::observer Observer, typename Worker>
class debounce_state final : public rpp::details::enable_wrapper_from_this<debounce_state<Observer, Worker>>
, public rpp::details::base_disposable
template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
class debounce_disposable final : public rpp::composite_disposable_impl<Container>
, public rpp::details::enable_wrapper_from_this<debounce_disposable<Observer, Worker, Container>>
{
using T = rpp::utils::extract_observer_type_t<Observer>;

public:
debounce_state(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period)
debounce_disposable(Observer&& in_observer, Worker&& in_worker, rpp::schedulers::duration period)
: m_observer(std::move(in_observer))
, m_worker{std::move(in_worker)}
, m_period{period}
Expand Down Expand Up @@ -70,17 +71,17 @@ namespace rpp::operators::details
{
m_worker.schedule(
m_time_when_value_should_be_emitted.value(),
[](const debounce_state_wrapper<Observer, Worker>& handler) -> schedulers::optional_delay_to {
auto value_or_duration = handler.state->extract_value_or_time();
[](const debounce_disposable_wrapper<Observer, Worker, Container>& handler) -> schedulers::optional_delay_to {
auto value_or_duration = handler.disposable->extract_value_or_time();
if (auto* timepoint = std::get_if<schedulers::time_point>(&value_or_duration))
return schedulers::optional_delay_to{*timepoint};

if (auto* value = std::get_if<T>(&value_or_duration))
handler.state->get_observer_under_lock()->on_next(std::move(*value));
handler.disposable->get_observer_under_lock()->on_next(std::move(*value));

return std::nullopt;
},
debounce_state_wrapper<Observer, Worker>{this->wrapper_from_this().lock()});
debounce_disposable_wrapper<Observer, Worker, Container>{this->wrapper_from_this().lock()});
}

std::variant<std::monostate, T, schedulers::time_point> extract_value_or_time()
Expand All @@ -107,38 +108,38 @@ namespace rpp::operators::details
std::optional<T> m_value_to_be_emitted{};
};

template<rpp::constraint::observer Observer, typename Worker>
template<rpp::constraint::observer Observer, typename Worker, rpp::details::disposables::constraint::disposables_container Container>
struct debounce_observer_strategy
{
static constexpr auto preferred_disposables_mode = rpp::details::observers::disposables_mode::None;

std::shared_ptr<debounce_state<Observer, Worker>> state{};
std::shared_ptr<debounce_disposable<Observer, Worker, Container>> disposable{};

void set_upstream(const rpp::disposable_wrapper& d) const
{
state->get_observer_under_lock()->set_upstream(d);
disposable->add(d);
}

bool is_disposed() const
{
return state->is_disposed();
return disposable->is_disposed();
}

template<typename T>
void on_next(T&& v) const
{
state->emplace_safe(std::forward<T>(v));
disposable->emplace_safe(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const noexcept
{
state->get_observer_under_lock()->on_error(err);
disposable->get_observer_under_lock()->on_error(err);
}

void on_completed() const noexcept
{
const auto observer = state->get_observer_under_lock();
if (const auto value = state->extract_value())
const auto observer = disposable->get_observer_under_lock();
if (const auto value = disposable->extract_value())
observer->on_next(std::move(value).value());
observer->on_completed();
}
Expand All @@ -154,20 +155,22 @@ namespace rpp::operators::details
};

template<rpp::details::observables::constraint::disposables_strategy Prev>
using updated_optimal_disposables_strategy = typename Prev::template add<1>;
using updated_optimal_disposables_strategy = rpp::details::observables::fixed_disposables_strategy<1>;

rpp::schedulers::duration duration;
RPP_NO_UNIQUE_ADDRESS Scheduler scheduler;

template<rpp::constraint::decayed_type Type, rpp::constraint::observer Observer>
auto lift(Observer&& observer) const
template<rpp::constraint::decayed_type Type, rpp::details::observables::constraint::disposables_strategy DisposableStrategy, rpp::constraint::observer Observer>
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
using container = typename DisposableStrategy::disposables_container;

auto d = rpp::disposable_wrapper_impl<debounce_state<std::decay_t<Observer>, worker_t>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto ptr = d.lock();
ptr->get_observer_under_lock()->set_upstream(d.as_weak());
return rpp::observer<Type, debounce_observer_strategy<std::decay_t<Observer>, worker_t>>{std::move(ptr)};

const auto disposable = disposable_wrapper_impl<debounce_disposable<std::decay_t<Observer>, worker_t, container>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto ptr = disposable.lock();
ptr->get_observer_under_lock()->set_upstream(disposable.as_weak());
return rpp::observer<Type, debounce_observer_strategy<std::decay_t<Observer>, worker_t, container>>{std::move(ptr)};
}
};
} // namespace rpp::operators::details
Expand Down
Loading

0 comments on commit e3706c8

Please sign in to comment.