Skip to content

Commit

Permalink
remove disposables from scheduler worker (#632)
Browse files Browse the repository at this point in the history
* remove disposables

* Update test_scheduler.hpp

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
victimsnino and pre-commit-ci[bot] authored Sep 4, 2024
1 parent e01a513 commit 75d1a47
Show file tree
Hide file tree
Showing 16 changed files with 48 additions and 126 deletions.
2 changes: 0 additions & 2 deletions src/extensions/rppqt/rppqt/schedulers/main_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ namespace rppqt::schedulers
});
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); }

private:
Expand Down
7 changes: 1 addition & 6 deletions src/rpp/rpp/operators/debounce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ namespace rpp::operators::details
, m_worker{std::move(in_worker)}
, m_period{period}
{
if constexpr (!Worker::is_none_disposable)
{
if (auto d = m_worker.get_disposable(); !d.is_disposed())
rpp::composite_disposable_impl<Container>::add(std::move(d));
}
}

template<typename TT>
Expand Down Expand Up @@ -172,7 +167,7 @@ namespace rpp::operators::details
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
using container = typename DisposableStrategy::template add<worker_t::is_none_disposable ? 0 : 1>::disposable_container;
using container = typename DisposableStrategy::disposable_container;

const auto disposable = disposable_wrapper_impl<debounce_disposable<std::decay_t<Observer>, worker_t, container>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto ptr = disposable.lock();
Expand Down
7 changes: 1 addition & 6 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ namespace rpp::operators::details
, worker{std::move(in_worker)}
, delay{delay}
{
if constexpr (!Worker::is_none_disposable)
{
if (auto d = worker.get_disposable(); !d.is_disposed())
rpp::composite_disposable_impl<Container>::add(std::move(d));
}
}

RPP_NO_UNIQUE_ADDRESS Observer observer;
Expand Down Expand Up @@ -186,7 +181,7 @@ namespace rpp::operators::details
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<Scheduler>;
using container = typename DisposableStrategy::template add<worker_t::is_none_disposable ? 0 : 1>::disposable_container;
using container = typename DisposableStrategy::disposable_container;

const auto disposable = disposable_wrapper_impl<delay_disposable<std::decay_t<Observer>, worker_t, container>>::make(std::forward<Observer>(observer), scheduler.create_worker(), duration);
auto ptr = disposable.lock();
Expand Down
7 changes: 1 addition & 6 deletions src/rpp/rpp/operators/subscribe_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,14 @@ namespace rpp::operators::details
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = typename Prev::template add<rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable ? 0 : 1>;
using updated_disposable_strategy = Prev;

RPP_NO_UNIQUE_ADDRESS TScheduler scheduler;

template<rpp::constraint::observer Observer, typename... Strategies>
void subscribe(Observer&& observer, const rpp::details::observables::chain<Strategies...>& observable_strategy) const
{
const auto worker = scheduler.create_worker();
if constexpr (!rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable)
{
if (auto d = worker.get_disposable(); !d.is_disposed())
observer.set_upstream(std::move(d));
}
worker.schedule(subscribe_on_schedulable<rpp::details::observables::chain<Strategies...>>{observable_strategy}, std::forward<Observer>(observer));
}
};
Expand Down
12 changes: 3 additions & 9 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,22 +133,16 @@ namespace rpp::operators::details
auto lift_with_disposable_strategy(Observer&& observer) const
{
using worker_t = rpp::schedulers::utils::get_worker_t<TScheduler>;
using container = typename DisposableStrategy::template add<worker_t::is_none_disposable ? 0 : 1>::disposable_container;
using container = typename DisposableStrategy::disposable_container;

const auto timeout = rpp::schedulers::utils::get_worker_t<TScheduler>::now() + period;
const auto timeout = worker_t::now() + period;

const auto disposable = disposable_wrapper_impl<timeout_disposable<std::decay_t<Observer>, TFallbackObservable, container>>::make(std::forward<Observer>(observer), period, fallback, timeout);
auto ptr = disposable.lock();
ptr->get_observer_with_timeout_under_lock()->observer.set_upstream(disposable.as_weak());

const auto worker = scheduler.create_worker();
if constexpr (!rpp::schedulers::utils::get_worker_t<TScheduler>::is_none_disposable)
{
if (auto d = worker.get_disposable(); !d.is_disposed())
disposable.add(std::move(d));
}

using wrapper = timeout_disposable_wrapper<std::decay_t<Observer>, TFallbackObservable, container>;
using wrapper = timeout_disposable_wrapper<std::decay_t<Observer>, TFallbackObservable, container>;
worker.schedule(
timeout,
[](wrapper& handler) -> rpp::schedulers::optional_delay_to {
Expand Down
2 changes: 0 additions & 2 deletions src/rpp/rpp/schedulers/current_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ namespace rpp::schedulers
}
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return details::now(); }
};

Expand Down
10 changes: 0 additions & 10 deletions src/rpp/rpp/schedulers/details/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,8 @@ namespace rpp::schedulers
schedule(tp - now(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

rpp::disposable_wrapper get_disposable() const
{
if constexpr (is_none_disposable)
return disposable_wrapper::empty();
else
return m_strategy.get_disposable();
}

static rpp::schedulers::time_point now() { return Strategy::now(); }

static constexpr bool is_none_disposable = std::same_as<decltype(std::declval<Strategy>().get_disposable()), rpp::schedulers::details::none_disposable>;

private:
RPP_NO_UNIQUE_ADDRESS Strategy m_strategy;
};
Expand Down
11 changes: 3 additions & 8 deletions src/rpp/rpp/schedulers/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ namespace rpp::schedulers::details

static void on_error(const std::exception_ptr&) {}
};

struct none_disposable
{
};
} // namespace rpp::schedulers::details

namespace rpp::schedulers::constraint
Expand Down Expand Up @@ -148,10 +144,9 @@ namespace rpp::schedulers::constraint
};

template<typename S>
concept strategy = (defer_for_strategy<S> || defer_to_strategy<S>) && requires(const S& s, const details::fake_schedulable_handler& handler) {
{
s.get_disposable()
} -> rpp::constraint::any_of<rpp::disposable_wrapper, details::none_disposable>;
concept strategy = (defer_for_strategy<S> || defer_to_strategy<S>)&&
requires ()
{
{
S::now()
} -> std::same_as<rpp::schedulers::time_point>;
Expand Down
2 changes: 0 additions & 2 deletions src/rpp/rpp/schedulers/immediate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ namespace rpp::schedulers
details::immediate_scheduling_while_condition<worker_strategy>(duration, rpp::utils::return_true{}, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return rpp::schedulers::clock_type::now(); }
};

Expand Down
42 changes: 20 additions & 22 deletions src/rpp/rpp/schedulers/new_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,19 @@ namespace rpp::schedulers
*/
class new_thread
{
class disposable final : public rpp::details::base_disposable
class state_t final
{
public:
disposable() = default;
state_t() = default;

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
m_state->queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
m_state->has_fresh_data.store(true);
}

private:
void base_dispose_impl(interface_disposable::Mode) noexcept override
~state_t() noexcept
{
if (!m_thread.joinable())
return;

{
std::lock_guard lock{m_state->mutex};
m_state->is_disposed = true;
m_state->is_stoping = true;
}
m_state->cv.notify_all();

Expand All @@ -59,24 +51,32 @@ namespace rpp::schedulers
m_thread.detach();
}

struct state_t : public details::shared_queue_data
template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
m_state->queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
m_state->has_fresh_data.store(true);
}

private:
struct queue_data : public details::shared_queue_data
{
details::schedulables_queue<current_thread::worker_strategy> queue{};
bool is_disposed{};
bool is_stoping{};
std::atomic_bool has_fresh_data{false};
};

static void data_thread(std::shared_ptr<state_t> state)
static void data_thread(std::shared_ptr<queue_data> state)
{
current_thread::get_queue() = &state->queue;

while (true)
{
std::unique_lock lock{state->mutex};
if (state->queue.is_empty() && state->is_disposed)
if (state->queue.is_empty() && state->is_stoping)
break;

state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_disposed; });
state->cv.wait(lock, [&] { return !state->queue.is_empty() || state->is_stoping; });

if (state->queue.is_empty())
break;
Expand Down Expand Up @@ -120,7 +120,7 @@ namespace rpp::schedulers
}

private:
std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
std::shared_ptr<queue_data> m_state = std::make_shared<queue_data>();

RPP_CALL_DURING_CONSTRUCTION(m_state->queue = details::schedulables_queue<current_thread::worker_strategy>(m_state));

Expand All @@ -136,15 +136,13 @@ namespace rpp::schedulers
template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
void defer_to(time_point tp, Fn&& fn, Handler&& handler, Args&&... args) const
{
m_state.lock()->defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
m_state->defer_to(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

rpp::disposable_wrapper get_disposable() const { return m_state; }

static rpp::schedulers::time_point now() { return details::now(); }

private:
disposable_wrapper_impl<disposable> m_state = disposable_wrapper_impl<disposable>::make();
std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
};

static rpp::schedulers::worker<worker_strategy> create_worker()
Expand Down
2 changes: 0 additions & 2 deletions src/rpp/rpp/schedulers/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ namespace rpp::schedulers
shared->emplace_and_notify(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }

static rpp::schedulers::time_point now() { return details::now(); }

private:
Expand Down
41 changes: 15 additions & 26 deletions src/rpp/rpp/schedulers/test_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#pragma once

#include <rpp/disposables.hpp>
#include <rpp/schedulers.hpp>

namespace rpp::schedulers
Expand All @@ -23,16 +22,13 @@ namespace rpp::schedulers

class worker_strategy;

struct state : public rpp::details::base_disposable
struct state
{
state() = default;

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, rpp::schedulers::constraint::schedulable_fn<Handler, Args...> Fn>
void schedule(rpp::schedulers::time_point time_point, Fn&& fn, Handler&& handler, Args&&... args)
{
if (is_disposed())
return;

schedulings.push_back(time_point);
queue.emplace(time_point, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}
Expand All @@ -53,17 +49,15 @@ namespace rpp::schedulers
executions.push_back(s_current_time);
if (auto new_timepoint = (*fn)())
{
if (!is_disposed())
{
schedulings.push_back(std::max(s_current_time, new_timepoint.value()));
queue.emplace(schedulings.back(), std::move(fn));
}
if (fn->is_disposed())
continue;

schedulings.push_back(std::max(s_current_time, new_timepoint.value()));
queue.emplace(schedulings.back(), std::move(fn));
}
}
}

void base_dispose_impl(interface_disposable::Mode) noexcept override {}

std::vector<rpp::schedulers::time_point> schedulings{};
std::vector<rpp::schedulers::time_point> executions{};
rpp::schedulers::details::schedulables_queue<worker_strategy> queue{};
Expand All @@ -72,7 +66,7 @@ namespace rpp::schedulers
class worker_strategy
{
public:
worker_strategy(rpp::disposable_wrapper_impl<state> state)
worker_strategy(std::weak_ptr<state> state)
: m_state{std::move(state)}
{
}
Expand All @@ -82,41 +76,36 @@ namespace rpp::schedulers
{
if (auto locked = m_state.lock())
{
if (!locked->is_disposed())
{
locked->schedule(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
locked->drain();
}
locked->schedule(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
locked->drain();
}
}

static rpp::schedulers::time_point now() { return s_current_time; }

rpp::disposable_wrapper get_disposable() const { return m_state.as_weak(); }

private:
rpp::disposable_wrapper_impl<state> m_state;
std::weak_ptr<state> m_state;
};

test_scheduler() = default;

rpp::schedulers::worker<worker_strategy> create_worker() const
{
return rpp::schedulers::worker<worker_strategy>{m_state.as_weak()};
return rpp::schedulers::worker<worker_strategy>{m_state};
}

const auto& get_schedulings() const { return m_state.lock()->schedulings; }
const auto& get_executions() const { return m_state.lock()->executions; }
const auto& get_schedulings() const { return m_state->schedulings; }
const auto& get_executions() const { return m_state->executions; }

static rpp::schedulers::time_point now() { return s_current_time; }

void time_advance(rpp::schedulers::duration dur) const
{
s_current_time += dur;
m_state.lock()->drain();
m_state->drain();
}

private:
rpp::disposable_wrapper_impl<state> m_state = rpp::disposable_wrapper_impl<state>::make();
std::shared_ptr<state> m_state = std::make_shared<state>();
};
} // namespace rpp::schedulers
3 changes: 1 addition & 2 deletions src/rpp/rpp/schedulers/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ namespace rpp::schedulers
m_original_worker.schedule(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

static constexpr rpp::schedulers::details::none_disposable get_disposable() { return {}; }
static rpp::schedulers::time_point now() { return original_worker::now(); }
static rpp::schedulers::time_point now() { return original_worker::now(); }

private:
original_worker m_original_worker;
Expand Down
Loading

0 comments on commit 75d1a47

Please sign in to comment.