Skip to content

Commit

Permalink
Merge branch 'main' into debounce
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino authored Dec 11, 2022
2 parents 8d1cd41 + 65decf7 commit 7eea102
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 76 deletions.
172 changes: 123 additions & 49 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,138 @@
#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state
#include <rpp/operators/fwd/delay.hpp> // own forwarding
#include <rpp/subscribers/constraints.hpp> // constraint::subscriber_of_type
#include <rpp/utils/overloaded.hpp>

#include <variant>

IMPLEMENTATION_FILE(delay_tag);

namespace rpp::details
{
/**
* Functor (type-erasure) of "delay" for on_next operator.
*/
struct delay_on_next
struct completion {};

template<typename T, typename Subscriber, typename Worker>
class queue_based_worker final : public std::enable_shared_from_this<queue_based_worker<T, Subscriber, Worker>>
{
schedulers::duration delay;
public:
queue_based_worker(schedulers::duration delay, Worker&& worker, const Subscriber& subscriber)
: m_delay{delay}
, m_worker{std::move(worker)}
, m_subscriber{subscriber} {}

void operator()(auto&& value, const auto& subscriber, const auto& worker) const
queue_based_worker(schedulers::duration delay, Worker&& worker, Subscriber&& subscriber)
: m_delay{delay}
, m_worker{std::move(worker)}
, m_subscriber{std::move(subscriber)} {}

struct on_next
{
worker.schedule(delay,
[value = std::forward<decltype(value)>(value), subscriber]()
{
subscriber.on_next(std::move(value));
return schedulers::optional_duration{};
});
}
};
void operator()(auto&& value, const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
{
state->emplace(std::forward<decltype(value)>(value));
}
};

/**
* Functor (type-erasure) of "delay" for on_error operator.
*/
struct delay_on_error
{
void operator()(const std::exception_ptr& err, const auto& subscriber, const auto& worker) const
struct on_error
{
// on-error must be delivered as soon as possible
worker.schedule([err, subscriber]()
{
subscriber.on_error(err);
return schedulers::optional_duration{};
});
void operator()(const std::exception_ptr& err, const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
{
state->emplace(err);
}
};

struct on_completed
{
void operator()(const std::shared_ptr<queue_based_worker<T, Subscriber, Worker>>& state) const
{
state->emplace(completion{});
}
};

private:
template<typename TT>
void emplace(TT&& item)
{
if (const auto timepoint = emplace_safe(std::forward<TT>(item)))
{
m_worker.schedule(timepoint.value(),
[state = this->shared_from_this()]()-> schedulers::optional_duration
{
return state->drain_queue();
});
}
}
};

/**
* Functor (type-erasure) of "delay" for on_completed operator.
*/
struct delay_on_completed
{
schedulers::duration delay;
template<typename TT>
std::optional<schedulers::time_point> emplace_safe(TT&& item)
{
std::lock_guard lock{m_mutex};
const auto delay = std::is_same_v<std::exception_ptr, std::decay_t<TT>> ? schedulers::duration{0} : m_delay;
m_queue.emplace(++m_current_id, m_worker.now()+delay, std::forward<TT>(item));
if (!m_active && m_queue.size() == 1)
{
m_active = true;
return m_queue.top().time;
}
return {};
}

void operator()(const auto& subscriber, const auto& worker) const
schedulers::optional_duration drain_queue()
{
worker.schedule(delay,
[subscriber]()
{
subscriber.on_completed();
return schedulers::optional_duration{};
});
while (true)
{
std::unique_lock lock{m_mutex};
if (m_queue.empty())
{
m_active = false;
return {};
}

auto& top = m_queue.top();
const auto now = m_worker.now();
if (top.time > now)
return top.time - now;

auto item = std::move(top.item);
m_queue.pop();
lock.unlock();

std::visit(utils::overloaded
{
[&](T&& v) { m_subscriber.on_next(std::move(v)); },
[&](const std::exception_ptr& err) { m_subscriber.on_error(err); },
[&](completion) { m_subscriber.on_completed(); }
},
std::move(item));
}
}

private:
struct emission
{
template<typename TT>
emission(size_t id, schedulers::time_point time, TT&& item)
: id{id}
, time{std::move(time)}
, item{std::forward<TT>(item)} {}

size_t id{};
schedulers::time_point time{};
std::variant<T, std::exception_ptr, completion> item{};

bool operator<(const emission& other) const { return std::tie(time, id) >= std::tie(other.time, other.id); }
};

schedulers::duration m_delay;
Worker m_worker;
Subscriber m_subscriber;
std::mutex m_mutex{};
size_t m_current_id{};
std::priority_queue<emission> m_queue{};
bool m_active{};
};

/**
* \brief Functor of OperatorFn for "combine_latest" operator (used by "lift").
*/

template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct delay_impl
{
Expand All @@ -86,14 +158,16 @@ struct delay_impl
auto operator()(TSub&& subscriber) const
{
auto worker = scheduler.create_worker(subscriber.get_subscription());

auto subscription = subscriber.get_subscription().make_child();

using state_t = queue_based_worker<Type, std::decay_t<TSub>, std::decay_t<decltype(worker)>>;
auto state = std::make_shared<state_t>(delay, std::move(worker), std::forward<TSub>(subscriber));

return create_subscriber_with_state<Type>(std::move(subscription),
delay_on_next{delay},
delay_on_error{},
delay_on_completed{delay},
std::forward<TSub>(subscriber),
std::move(worker));
typename state_t::on_next{},
typename state_t::on_error{},
typename state_t::on_completed{},
std::move(state));
}
};
} // namespace rpp::details
44 changes: 41 additions & 3 deletions src/tests/rpp/test_delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
#include <catch2/catch_test_macros.hpp>
#include <rpp/operators/delay.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>
#include <rpp/subjects/publish_subject.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/sources/just.hpp>

SCENARIO("delay mirrors both source observable and trigger observable", "[delay]")
{
auto mock = mock_observer<int>{};
std::chrono::milliseconds delay_duration{300};

GIVEN("observable of -1-|")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::just(1)
Expand Down Expand Up @@ -60,7 +61,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]

GIVEN("observable of -x")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::error<int>(std::make_exception_ptr(std::runtime_error{""}))
Expand All @@ -87,7 +87,6 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]

GIVEN("observable of -|")
{
auto mock = mock_observer<int>{};
const auto now = rpp::schedulers::clock_type::now();

rpp::source::empty<int>()
Expand All @@ -111,4 +110,43 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]
CHECK(mock.get_on_error_count() == 0);
}
}

GIVEN("subject with items")
{
auto subj = rpp::subjects::publish_subject<int>{};

WHEN("subscribe on subject via delay and doing recursive submit from another thread")
{
THEN("all values obtained in the same thread")
{
auto current_thread = std::this_thread::get_id();

auto sub = subj.get_observable()
.delay(delay_duration, rpp::schedulers::trampoline{})
.subscribe([&](int v)
{
CHECK(std::this_thread::get_id() == current_thread);

mock.on_next(v);

if (v == 1)
{
std::thread{[&]{subj.get_subscriber().on_next(2);}}.join();

THEN("no recursive on_next calls")
{
CHECK(mock.get_received_values() == std::vector{1});
}
}
});

subj.get_subscriber().on_next(1);

AND_THEN("all values obtained")
{
CHECK(mock.get_received_values() == std::vector{ 1, 2 });
}
}
}
}
}
Loading

0 comments on commit 7eea102

Please sign in to comment.