Skip to content

Commit

Permalink
Add new benchmarks + speedup (#562)
Browse files Browse the repository at this point in the history
* add new_thread benchmark

* improve situation

* Update current_thread.hpp

* Update CMakePresets.json

* add 1000 values scheduling example

* small improve

* speedup

* compile fix

* fix

* handle current_thread

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* add benchmarks

* adapt

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update benchmarks.cpp

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
victimsnino and pre-commit-ci[bot] authored Apr 19, 2024
1 parent 24af0e4 commit 245e40e
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 106 deletions.
2 changes: 1 addition & 1 deletion CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"inherits" : ["ci-flags"],
"hidden": true,
"cacheVariables": {
"CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew"
"CMAKE_CXX_FLAGS": "/utf-8 /W4 /permissive- /volatile:iso /Zc:preprocessor /EHsc /Zc:__cplusplus /Zc:externConstexpr /Zc:throwingNew /bigobj"
}
},
{
Expand Down
25 changes: 25 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,31 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
});
}


SECTION("from array of 1 - create + as_blocking + subscribe + new_thread")
{
std::array<int, 1> vals{123};
TEST_RPP([&]() {
(rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
(rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}

SECTION("from array of 1000 - create + as_blocking + subscribe + new_thread")
{
std::array<int, 1000> vals{};
TEST_RPP([&]() {
(rpp::source::from_iterable(vals, rpp::schedulers::new_thread{}) | rpp::ops::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});

TEST_RXCPP([&]() {
(rxcpp::observable<>::iterate(vals, rxcpp::observe_on_new_thread()) | rxcpp::operators::as_blocking()).subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}

SECTION("concat_as_source of just(1 immediate) create + subscribe")
{
TEST_RPP([&]() {
Expand Down
111 changes: 64 additions & 47 deletions src/rpp/rpp/schedulers/current_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ namespace rpp::schedulers
friend class new_thread;
class worker_strategy;

inline static thread_local std::optional<details::schedulables_queue<worker_strategy>> s_queue{};
inline static thread_local details::schedulables_queue<worker_strategy>* s_queue{};

struct is_queue_is_empty
{
Expand All @@ -97,38 +97,43 @@ namespace rpp::schedulers
};


static void drain_current_queue()
static void drain_queue() noexcept
{
drain_queue(s_queue);
}

static void drain_queue(std::optional<details::schedulables_queue<worker_strategy>>& queue)
{
while (!queue->is_empty())
while (s_queue && !s_queue->is_empty())
{
auto top = queue->pop();
auto top = s_queue->pop();
if (top->is_disposed())
continue;

std::optional<time_point> timepoint{top->get_timepoint()};
// immediate like scheduling
do
{
if (timepoint && !top->is_disposed())
details::sleep_until(top->get_timepoint());

if (top->is_disposed())
timepoint.reset();
else
timepoint = (*top)();

} while (queue->is_empty() && timepoint.has_value());
details::sleep_until(top->get_timepoint());

if (timepoint.has_value())
queue->emplace(timepoint.value(), std::move(top));
while (true)
{
if (const auto res = top->make_advanced_call())
{
if (!top->is_disposed())
{
if (s_queue->is_empty())
{
if (const auto d = std::get_if<delay_from_now>(&res->get()))
{
std::this_thread::sleep_for(d->value);
}
else
{
details::sleep_until(top->handle_advanced_call(res.value()));
}
continue;
}

s_queue->emplace(top->handle_advanced_call(res.value()), std::move(top));
}
}
break;
}
}

queue.reset();
s_queue = nullptr;
}

class worker_strategy
Expand All @@ -140,26 +145,20 @@ namespace rpp::schedulers
if (handler.is_disposed())
return;

auto& queue = s_queue;
const bool someone_owns_queue = queue.has_value();
std::optional<time_point> timepoint{};
if (!someone_owns_queue)
if (!s_queue)
{
queue.emplace();
details::schedulables_queue<worker_strategy> queue{};
s_queue = &queue;

timepoint = details::immediate_scheduling_while_condition<worker_strategy>(duration, is_queue_is_empty{queue.value()}, fn, handler, args...);
const auto timepoint = details::immediate_scheduling_while_condition<worker_strategy>(duration, is_queue_is_empty{queue}, fn, handler, args...);
if (!timepoint || handler.is_disposed())
return drain_queue(queue);
}
else
{
timepoint = now() + duration;
}
return drain_queue();

queue->emplace(timepoint.value(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
s_queue->emplace(timepoint.value(), std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
return drain_queue();
}

if (!someone_owns_queue)
drain_queue(queue);
s_queue->emplace(now() + duration, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}

template<rpp::schedulers::constraint::schedulable_handler Handler, typename... Args, constraint::schedulable_fn<Handler, Args...> Fn>
Expand All @@ -168,7 +167,7 @@ namespace rpp::schedulers
if (handler.is_disposed())
return;

if (s_queue.has_value())
if (s_queue)
{
s_queue->emplace(tp, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...);
}
Expand All @@ -183,15 +182,33 @@ namespace rpp::schedulers
static rpp::schedulers::time_point now() { return details::now(); }
};

public:
static rpp::utils::finally_action<void (*)()> own_queue_and_drain_finally_if_not_owned()
private:
class own_queue_guard
{
const bool someone_owns_queue = s_queue.has_value();
public:
own_queue_guard()
: m_clear_on_destruction{!s_queue}
{
if (m_clear_on_destruction)
s_queue = &m_queue;
}
~own_queue_guard()
{
if (m_clear_on_destruction)
drain_queue();
}
own_queue_guard(const own_queue_guard&) = delete;
own_queue_guard(own_queue_guard&&) = delete;

if (!someone_owns_queue)
s_queue.emplace();
private:
details::schedulables_queue<worker_strategy> m_queue{};
bool m_clear_on_destruction{};
};

return rpp::utils::finally_action{!someone_owns_queue ? &drain_current_queue : &rpp::utils::empty_function<>};
public:
static own_queue_guard own_queue_and_drain_finally_if_not_owned()
{
return own_queue_guard{};
}

static rpp::schedulers::worker<worker_strategy> create_worker()
Expand Down
109 changes: 84 additions & 25 deletions src/rpp/rpp/schedulers/details/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
#include <rpp/utils/tuple.hpp>
#include <rpp/utils/utils.hpp>

#include "rpp/utils/functors.hpp"

#include <condition_variable>
#include <exception>
#include <memory>
#include <mutex>
#include <optional>
#include <utility>
#include <variant>

namespace rpp::schedulers::details
{
Expand All @@ -36,8 +39,41 @@ namespace rpp::schedulers::details

virtual ~schedulable_base() noexcept = default;

virtual std::optional<time_point> operator()() noexcept = 0;
virtual bool is_disposed() const noexcept = 0;
virtual std::optional<time_point> operator()() noexcept = 0;

class advanced_call
{
public:
advanced_call(std::variant<delay_from_now, delay_from_this_timepoint, delay_to> data)
: m_data{data}
{
}

const std::variant<delay_from_now, delay_from_this_timepoint, delay_to>& get() const { return m_data; }

auto visit(const auto& fn) const
{
return std::visit(fn, m_data);
}

bool can_run_immediately() const noexcept
{
return visit(rpp::utils::overloaded{[](const delay_to&) {
return false;
},
[](const auto& d) {
return d.value == rpp::schedulers::duration::zero();
}});
}

private:
std::variant<delay_from_now, delay_from_this_timepoint, delay_to> m_data;
};

virtual std::optional<advanced_call> make_advanced_call() noexcept = 0;
virtual time_point handle_advanced_call(const advanced_call&) noexcept = 0;

virtual bool is_disposed() const noexcept = 0;

time_point get_timepoint() const { return m_time_point; }

Expand All @@ -54,6 +90,22 @@ namespace rpp::schedulers::details
m_next = std::move(next);
}

protected:
template<typename NowStrategy>
auto get_advanced_call_handler() const
{
return rpp::utils::overloaded{
[](const delay_from_now& v) {
return NowStrategy::now() + v.value;
},
[this](const delay_from_this_timepoint& v) {
return get_timepoint() + v.value;
},
[](const delay_to& v) {
return v.value;
}};
}

private:
std::shared_ptr<schedulable_base> m_next{};
time_point m_time_point;
Expand All @@ -77,21 +129,7 @@ namespace rpp::schedulers::details
try
{
if (const auto res = m_args.apply(m_fn))
{
if constexpr (constraint::schedulable_delay_from_now_fn<Fn, Handler, Args...>)
{
return NowStrategy::now() + res->value;
}
else if constexpr (constraint::schedulable_delay_to_fn<Fn, Handler, Args...>)
{
return res->value;
}
else
{
static_assert(constraint::schedulable_delay_from_this_timepoint_fn<Fn, Handler, Args...>);
return get_timepoint() + res->value;
}
}
return get_advanced_call_handler<NowStrategy>()(res.value());
}
catch (...)
{
Expand All @@ -100,6 +138,25 @@ namespace rpp::schedulers::details
return std::nullopt;
}

std::optional<advanced_call> make_advanced_call() noexcept override
{
try
{
if (const auto res = m_args.apply(m_fn))
return advanced_call{res.value()};
}
catch (...)
{
m_args.template get<0>().on_error(std::current_exception());
}
return std::nullopt;
}

time_point handle_advanced_call(const advanced_call& v) noexcept override
{
return v.visit(get_advanced_call_handler<NowStrategy>());
}

bool is_disposed() const noexcept override { return m_args.template get<0>().is_disposed(); }

private:
Expand Down Expand Up @@ -151,7 +208,7 @@ namespace rpp::schedulers::details
schedulables_queue& operator=(const schedulables_queue& other) = delete;
schedulables_queue& operator=(schedulables_queue&& other) noexcept = default;

schedulables_queue(std::shared_ptr<shared_queue_data> shared_data)
schedulables_queue(std::weak_ptr<shared_queue_data> shared_data)
: m_shared_data{std::move(shared_data)}
{
}
Expand All @@ -162,8 +219,6 @@ namespace rpp::schedulers::details
using schedulable_type = specific_schedulable<NowStrategy, std::decay_t<Fn>, std::decay_t<Handler>, std::decay_t<Args>...>;

emplace_impl(std::make_shared<schedulable_type>(timepoint, std::forward<Fn>(fn), std::forward<Handler>(handler), std::forward<Args>(args)...));
if (m_shared_data)
m_shared_data->cv.notify_all();
}

void emplace(const time_point& timepoint, std::shared_ptr<schedulable_base>&& schedulable)
Expand All @@ -173,8 +228,6 @@ namespace rpp::schedulers::details

schedulable->set_timepoint(timepoint);
emplace_impl(std::move(schedulable));
if (m_shared_data)
m_shared_data->cv.notify_all();
}

bool is_empty() const { return !m_head; }
Expand All @@ -193,7 +246,13 @@ namespace rpp::schedulers::details
void emplace_impl(std::shared_ptr<schedulable_base>&& schedulable)
{
// needed in case of new_thread and current_thread shares same queue
optional_mutex<std::recursive_mutex> mutex{m_shared_data ? &m_shared_data->mutex : nullptr};
const auto s = m_shared_data.lock();
const rpp::utils::finally_action _{[&] {
if (s)
s->cv.notify_one();
}};

optional_mutex<std::recursive_mutex> mutex{s ? &s->mutex : nullptr};
std::lock_guard lock{mutex};

if (!m_head || schedulable->get_timepoint() < m_head->get_timepoint())
Expand All @@ -215,7 +274,7 @@ namespace rpp::schedulers::details
}

private:
std::shared_ptr<schedulable_base> m_head{};
std::shared_ptr<shared_queue_data> m_shared_data{};
std::shared_ptr<schedulable_base> m_head{};
std::weak_ptr<shared_queue_data> m_shared_data{};
};
} // namespace rpp::schedulers::details
Loading

1 comment on commit 245e40e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 310.67 ns 2.16 ns 2.22 ns 0.97
Subscribe empty callbacks to empty observable via pipe operator 306.10 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 695.22 ns 0.62 ns 0.31 ns 2.00
from array of 1 - create + subscribe + current_thread 1041.36 ns 3.43 ns 5.25 ns 0.65
concat_as_source of just(1 immediate) create + subscribe 2238.72 ns 130.79 ns 133.05 ns 0.98
defer from array of 1 - defer + create + subscribe + immediate 745.67 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2132.70 ns 59.23 ns 59.19 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3035.30 ns 32.42 ns 32.76 ns 0.99
from array of 1 - create + as_blocking + subscribe + new_thread 29527.18 ns 27881.55 ns - 0.00
from array of 1000 - create + as_blocking + subscribe + new_thread 41027.91 ns 51949.64 ns - 0.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1087.03 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 855.73 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 999.27 ns 0.31 ns 0.31 ns 1.01
immediate_just(1,1,2)+distinct_until_changed()+subscribe 858.30 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1247.88 ns 0.62 ns 0.31 ns 2.00
immediate_just(1,2)+last()+subscribe 936.78 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1162.30 ns 17.90 ns 18.82 ns 0.95

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 270.79 ns 2.16 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 365.60 ns 6.18 ns 7.42 ns 0.83
current_thread scheduler create worker + schedule + recursive schedule 865.25 ns 55.96 ns 64.44 ns 0.87

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 859.59 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 891.69 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2310.14 ns 162.21 ns 173.60 ns 0.93
immediate_just+buffer(2)+subscribe 1559.77 ns 13.59 ns 14.51 ns 0.94
immediate_just+window(2)+subscribe + subscsribe inner 2414.24 ns 1047.77 ns 1075.73 ns 0.97

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 821.59 ns - - 0.00
immediate_just+take_while(true)+subscribe 843.51 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2009.21 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3398.77 ns 186.10 ns 196.34 ns 0.95
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3718.33 ns 169.90 ns 173.92 ns 0.98
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 157.07 ns 150.82 ns 1.04
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3540.70 ns 1031.79 ns 992.26 ns 1.04
immediate_just(1) + zip(immediate_just(2)) + subscribe 2103.43 ns 210.27 ns 211.31 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.58 ns 16.74 ns 12.02 ns 1.39

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1428.40 ns 14.43 ns 16.37 ns 0.88
basic sample with immediate scheduler 1396.90 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 926.52 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1036.38 ns 125.98 ns 121.65 ns 1.04

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 973.68 ns 3.80 ns 3.85 ns 0.99
Subscribe empty callbacks to empty observable via pipe operator 970.07 ns 3.84 ns 3.89 ns 0.99

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1929.32 ns 0.23 ns 0.23 ns 1.00
from array of 1 - create + subscribe + current_thread 2413.86 ns 32.45 ns 25.29 ns 1.28
concat_as_source of just(1 immediate) create + subscribe 5426.70 ns 334.85 ns 333.65 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 2002.49 ns 0.23 ns 0.24 ns 0.99
interval - interval + take(3) + subscribe + immediate 4947.87 ns 116.25 ns 117.21 ns 0.99
interval - interval + take(3) + subscribe + current_thread 6018.38 ns 95.62 ns 107.01 ns 0.89
from array of 1 - create + as_blocking + subscribe + new_thread 82870.00 ns 78873.46 ns - 0.00
from array of 1000 - create + as_blocking + subscribe + new_thread 89866.00 ns 85145.33 ns - 0.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 3246.51 ns 0.22 ns 0.23 ns 0.93
immediate_just+filter(true)+subscribe 2150.72 ns 0.24 ns 0.23 ns 1.04
immediate_just(1,2)+skip(1)+subscribe 2750.18 ns 0.24 ns 0.23 ns 1.02
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2060.03 ns 0.47 ns 0.47 ns 1.00
immediate_just(1,2)+first()+subscribe 3321.33 ns 0.24 ns 0.23 ns 1.05
immediate_just(1,2)+last()+subscribe 2446.86 ns 0.24 ns 0.23 ns 1.04
immediate_just+take_last(1)+subscribe 3265.48 ns 74.86 ns 71.09 ns 1.05

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 843.25 ns 4.02 ns 4.04 ns 0.99
current_thread scheduler create worker + schedule 1181.87 ns 39.69 ns 40.73 ns 0.97
current_thread scheduler create worker + schedule + recursive schedule 1987.23 ns 202.51 ns 220.73 ns 0.92

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2024.58 ns 0.22 ns 0.23 ns 0.96
immediate_just+scan(10, std::plus)+subscribe 2187.85 ns 0.45 ns 0.47 ns 0.96
immediate_just+flat_map(immediate_just(v*2))+subscribe 4962.63 ns 375.13 ns 413.68 ns 0.91
immediate_just+buffer(2)+subscribe 2321.46 ns 65.05 ns 68.63 ns 0.95
immediate_just+window(2)+subscribe + subscsribe inner 4982.90 ns 2148.16 ns 2295.55 ns 0.94

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2018.27 ns - - 0.00
immediate_just+take_while(true)+subscribe 2020.40 ns 0.22 ns 0.23 ns 0.96

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 4880.11 ns 0.23 ns 0.23 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7430.26 ns 438.77 ns 449.18 ns 0.98
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8376.92 ns 439.97 ns 444.96 ns 0.99
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 461.76 ns 475.49 ns 0.97
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 7598.63 ns 1913.39 ns 1908.91 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 4897.42 ns 804.52 ns 832.28 ns 0.97

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 75.53 ns 49.19 ns 49.56 ns 0.99

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2765.52 ns 71.35 ns 102.34 ns 0.70
basic sample with immediate scheduler 2933.08 ns 15.65 ns 15.51 ns 1.01

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2378.64 ns 0.23 ns 0.24 ns 0.96

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 6461.88 ns 4081.84 ns 4147.98 ns 0.98

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 271.59 ns 1.56 ns 0.89 ns 1.76
Subscribe empty callbacks to empty observable via pipe operator 271.23 ns 1.57 ns 0.88 ns 1.77

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 565.88 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 785.17 ns 4.01 ns 5.55 ns 0.72
concat_as_source of just(1 immediate) create + subscribe 2347.31 ns 135.13 ns 114.06 ns 1.18
defer from array of 1 - defer + create + subscribe + immediate 773.91 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2194.33 ns 58.31 ns 58.26 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3109.52 ns 31.19 ns 31.47 ns 0.99
from array of 1 - create + as_blocking + subscribe + new_thread 26659.76 ns 27817.21 ns - 0.00
from array of 1000 - create + as_blocking + subscribe + new_thread 39886.96 ns 36525.45 ns - 0.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1157.03 ns 0.31 ns 0.31 ns 1.01
immediate_just+filter(true)+subscribe 840.95 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1112.97 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 921.85 ns 0.31 ns 0.62 ns 0.50
immediate_just(1,2)+first()+subscribe 1375.20 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 996.16 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1209.20 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 280.56 ns 1.57 ns 0.88 ns 1.79
current_thread scheduler create worker + schedule 390.00 ns 4.01 ns 5.89 ns 0.68
current_thread scheduler create worker + schedule + recursive schedule 843.05 ns 56.88 ns 59.96 ns 0.95

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 843.80 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 962.69 ns 0.62 ns 0.31 ns 2.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2244.52 ns 138.58 ns 116.36 ns 1.19
immediate_just+buffer(2)+subscribe 1517.10 ns 13.58 ns 14.50 ns 0.94
immediate_just+window(2)+subscribe + subscsribe inner 2486.78 ns 894.88 ns 822.02 ns 1.09

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 829.51 ns - - 0.00
immediate_just+take_while(true)+subscribe 841.83 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1991.49 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3227.69 ns 158.58 ns 123.43 ns 1.28
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3746.16 ns 148.88 ns 119.20 ns 1.25
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 141.25 ns 109.41 ns 1.29
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3383.71 ns 840.77 ns 730.00 ns 1.15
immediate_just(1) + zip(immediate_just(2)) + subscribe 2197.91 ns 201.37 ns 168.72 ns 1.19

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 52.45 ns 18.55 ns 13.57 ns 1.37

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1296.01 ns 11.11 ns 13.88 ns 0.80
basic sample with immediate scheduler 1352.49 ns 6.17 ns 5.86 ns 1.05

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 995.42 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1059.11 ns 134.53 ns 123.98 ns 1.09

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 576.54 ns 4.01 ns 4.01 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 589.45 ns 4.01 ns 4.01 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1160.65 ns 5.55 ns 5.55 ns 1.00
from array of 1 - create + subscribe + current_thread 1440.11 ns 15.44 ns 19.20 ns 0.80
concat_as_source of just(1 immediate) create + subscribe 3768.65 ns 183.74 ns 175.93 ns 1.04
defer from array of 1 - defer + create + subscribe + immediate 1208.97 ns 5.24 ns 5.55 ns 0.95
interval - interval + take(3) + subscribe + immediate 3061.60 ns 134.23 ns 134.24 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3760.19 ns 52.98 ns 57.89 ns 0.92
from array of 1 - create + as_blocking + subscribe + new_thread 122312.50 ns 115490.00 ns - 0.00
from array of 1000 - create + as_blocking + subscribe + new_thread 127600.00 ns 128162.50 ns - 0.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1863.17 ns 12.86 ns 12.87 ns 1.00
immediate_just+filter(true)+subscribe 1339.06 ns 12.39 ns 12.36 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 2106.35 ns 13.02 ns 13.11 ns 0.99
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1453.89 ns 15.95 ns 15.94 ns 1.00
immediate_just(1,2)+first()+subscribe 2074.71 ns 12.64 ns 12.64 ns 1.00
immediate_just(1,2)+last()+subscribe 1788.94 ns 14.11 ns 14.07 ns 1.00
immediate_just+take_last(1)+subscribe 2056.64 ns 58.75 ns 61.34 ns 0.96

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 489.14 ns 6.17 ns 6.18 ns 1.00
current_thread scheduler create worker + schedule 657.48 ns 13.33 ns 28.62 ns 0.47
current_thread scheduler create worker + schedule + recursive schedule 1092.85 ns 104.57 ns 115.63 ns 0.90

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1330.34 ns 12.33 ns 12.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1448.37 ns 21.27 ns 21.29 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 3570.14 ns 226.25 ns 233.56 ns 0.97
immediate_just+buffer(2)+subscribe 2336.67 ns 58.17 ns 61.04 ns 0.95
immediate_just+window(2)+subscribe + subscsribe inner 4411.81 ns 1227.66 ns 1581.80 ns 0.78

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1330.86 ns 11.46 ns 11.46 ns 1.00
immediate_just+take_while(true)+subscribe 1635.83 ns 12.40 ns 12.35 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3219.55 ns 7.71 ns 7.71 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5956.07 ns 236.78 ns 248.46 ns 0.95
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5710.00 ns 232.74 ns 250.65 ns 0.93
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 221.66 ns 243.38 ns 0.91
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5615.22 ns 932.27 ns 977.26 ns 0.95
immediate_just(1) + zip(immediate_just(2)) + subscribe 3567.39 ns 524.58 ns 536.00 ns 0.98

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.49 ns 25.59 ns 27.15 ns 0.94

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2070.06 ns 56.62 ns 59.14 ns 0.96
basic sample with immediate scheduler 1995.45 ns 38.29 ns 35.50 ns 1.08

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1466.67 ns 17.81 ns 17.81 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1979.18 ns 340.74 ns 344.18 ns 0.99

Please sign in to comment.