Skip to content

Commit

Permalink
implement simplest retry (#619)
Browse files Browse the repository at this point in the history
* implement simplest retry

* fix

* extend

* add stack overflow test for repeat

* extend tests

* extend with infinite retry

* add doxygen

* benchmarks + doc

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* rename

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
victimsnino and pre-commit-ci[bot] authored Aug 21, 2024
1 parent b4146dd commit ff8d149
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 1 deletion.
18 changes: 18 additions & 0 deletions src/benchmarks/benchmarks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,24 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape)
| rxcpp::operators::subscribe<int>([](int v) { ankerl::nanobench::doNotOptimizeAway(v); });
});
}
SECTION("create(on_error())+retry(1)+subscribe")
{
TEST_RPP([&]() {
rpp::source::create<int>([&](auto&& observer) {
observer.on_error({});
})
| rpp::operators::retry(1)
| rpp::operators::subscribe([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); });
});

TEST_RXCPP([&]() {
rxcpp::observable<>::create<int>([&](auto&& observer) {
observer.on_error({});
})
| rxcpp::operators::retry(1)
| rxcpp::operators::subscribe<int>([](int) {}, [](const std::exception_ptr& e) { ankerl::nanobench::doNotOptimizeAway(e); });
});
}
} // BENCHMARK("Error Handling Operators")

BENCHMARK("Subjects")
Expand Down
30 changes: 30 additions & 0 deletions src/examples/rpp/doxygen/retry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <rpp/rpp.hpp>

#include <exception>
#include <iostream>

/**
* @example retry.cpp
**/
int main()
{
//! [retry]
rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error<int>({}))
| rpp::operators::retry(2)
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) { std::cout << "error" << std::endl; },
[]() { std::cout << "completed" << std::endl; });
// Output: 1 2 3 1 2 3 1 2 3 error
//! [retry]

//! [retry_infinitely]
rpp::source::concat(rpp::source::just(1, 2, 3), rpp::source::error<int>({}))
| rpp::operators::retry()
| rpp::operators::take(10)
| rpp::operators::subscribe([](int v) { std::cout << v << " "; },
[](const std::exception_ptr&) { std::cout << "error" << std::endl; },
[]() { std::cout << "completed" << std::endl; });
// Output: 1 2 3 1 2 3 1 2 3 1 completed
//! [retry_infinitely]
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,4 @@
*/

#include <rpp/operators/on_error_resume_next.hpp>
#include <rpp/operators/retry.hpp>
4 changes: 4 additions & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ namespace rpp::operators

auto repeat();

auto retry(size_t count);

auto retry();

template<typename InitialValue, typename Fn>
requires (!utils::is_not_template_callable<Fn> || std::same_as<std::decay_t<InitialValue>, std::invoke_result_t<Fn, std::decay_t<InitialValue> &&, rpp::utils::convertible_to_any>>)
auto scan(InitialValue&& initial_value, Fn&& accumulator);
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace rpp::operators::details
disposable.dispose();
}

void set_upstream(const disposable_wrapper& d)
void set_upstream(const disposable_wrapper& d) const
{
disposable.add(d);
}
Expand Down
182 changes: 182 additions & 0 deletions src/rpp/rpp/operators/retry.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2023 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/fwd.hpp>

#include <rpp/defs.hpp>
#include <rpp/operators/details/strategy.hpp>

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver, constraint::decayed_type Observable>
struct retry_state_t final : public rpp::composite_disposable
{
retry_state_t(TObserver&& in_observer, const Observable& observable, std::optional<size_t> count)
: count{count}
, observer(std::move(in_observer))
, observable(observable)

{
}

std::optional<size_t> count;
std::atomic<bool> is_inside_drain{};

RPP_NO_UNIQUE_ADDRESS TObserver observer;
RPP_NO_UNIQUE_ADDRESS Observable observable;
};

template<rpp::constraint::observer TObserver, typename TObservable>
void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state);

template<rpp::constraint::observer TObserver, typename TObservable>
struct retry_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

std::shared_ptr<retry_state_t<TObserver, TObservable>> state;
mutable bool locally_disposed{};

template<typename T>
void on_next(T&& v) const
{
state->observer.on_next(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
locally_disposed = true;
if (state->count == 0)
{
state->observer.on_error(err);
state->dispose();
return;
}

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;

drain(state);
}

void on_completed() const
{
locally_disposed = true;
state->observer.on_completed();
}

void set_upstream(const disposable_wrapper& d) const
{
state->add(d);
}

bool is_disposed() const { return locally_disposed || state->is_disposed(); }
};

template<rpp::constraint::observer TObserver, typename TObservable>
void drain(const std::shared_ptr<retry_state_t<TObserver, TObservable>>& state)
{
while (!state->is_disposed())
{
if (state->count)
--state->count.value();
state->clear();
state->is_inside_drain.store(true, std::memory_order::seq_cst);
try
{
using value_type = rpp::utils::extract_observer_type_t<TObserver>;
state->observable.subscribe(observer<value_type, retry_observer_strategy<TObserver, TObservable>>{state});

if (state->is_inside_drain.exchange(false, std::memory_order::seq_cst))
return;
}
catch (...)
{
state->observer.on_error(std::current_exception());
return;
}
}
}

struct retry_t
{
const std::optional<size_t> count{};

template<rpp::constraint::decayed_type T>
struct operator_traits
{
using result_type = T;
};

template<rpp::details::observables::constraint::disposable_strategy Prev>
using updated_disposable_strategy = rpp::details::observables::fixed_disposable_strategy_selector<1>;

template<rpp::constraint::observer TObserver, typename TObservable>
void subscribe(TObserver&& observer, TObservable&& observble) const
{
const auto d = disposable_wrapper_impl<retry_state_t<std::decay_t<TObserver>, std::decay_t<TObservable>>>::make(std::forward<TObserver>(observer), std::forward<TObservable>(observble), count ? count.value() + 1 : count);
auto ptr = d.lock();

ptr->observer.set_upstream(d.as_weak());
drain(ptr);
}
};
} // namespace rpp::operators::details

namespace rpp::operators
{
/**
* @brief The retry operator attempts to resubscribe to the observable when an error occurs, up to the specified number of retries.
*
* @marble retry
{
source observable : +-1-x
operator "retry:(2)" : +-1-1-1-x
}
*
* @param count is the number of retries
*
* @warning #include <rpp/operators/retry.hpp>
*
* @par Examples:
* @snippet retry.cpp retry
*
* @ingroup error_handling_operators
* @see https://reactivex.io/documentation/operators/retry.html
*/
inline auto retry(size_t count)
{
return details::retry_t{count};
}

/**
* @brief The infinite retry operator continuously attempts to resubscribe to the observable upon error, without a retry limit.
*
* @marble infinite_retry
{
source observable : +-1-x
operator "retry:()" : +-1-1-1-1-1-1-1-1-1-1-1->
}
*
* @warning #include <rpp/operators/retry.hpp>
*
* @par Examples:
* @snippet retry.cpp retry_infinitely
*
* @ingroup error_handling_operators
* @see https://reactivex.io/documentation/operators/retry.html
*/
inline auto retry()
{
return details::retry_t{};
}
} // namespace rpp::operators
20 changes: 20 additions & 0 deletions src/tests/rpp/test_repeat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "copy_count_tracker.hpp"
#include "disposable_observable.hpp"
#include "rpp_trompeloil.hpp"


TEST_CASE("repeat resubscribes")
Expand Down Expand Up @@ -135,3 +136,22 @@ TEST_CASE("repeat satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::repeat());
}


TEST_CASE("repeat handles stack overflow")
{
mock_observer<int> mock{};
trompeloeil::sequence seq;

constexpr size_t count = 500000;

REQUIRE_CALL(*mock, on_next_rvalue(trompeloeil::_)).TIMES(count).IN_SEQUENCE(seq);
REQUIRE_CALL(*mock, on_completed()).IN_SEQUENCE(seq);

rpp::source::create<int>([](const auto& obs) {
obs.on_next(1);
obs.on_completed();
})
| rpp::operators::repeat(count)
| rpp::operators::subscribe(mock);
}
Loading

1 comment on commit ff8d149

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 303.93 ns 2.40 ns 2.16 ns 1.11
Subscribe empty callbacks to empty observable via pipe operator 303.76 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 690.63 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1054.93 ns 3.70 ns 3.43 ns 1.08
concat_as_source of just(1 immediate) create + subscribe 2267.73 ns 102.22 ns 124.97 ns 0.82
defer from array of 1 - defer + create + subscribe + immediate 727.40 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2236.26 ns 59.19 ns 59.24 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3147.90 ns 32.42 ns 32.46 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 28624.90 ns 28548.97 ns 27261.09 ns 1.05
from array of 1000 - create + as_blocking + subscribe + new_thread 43118.88 ns 52180.17 ns 51775.00 ns 1.01
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3633.85 ns 129.55 ns 138.45 ns 0.94

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1083.14 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 823.75 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1001.11 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 869.44 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1351.50 ns 0.62 ns 0.31 ns 2.00
immediate_just(1,2)+last()+subscribe 935.76 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1121.20 ns 17.28 ns 17.91 ns 0.96
immediate_just(1,2,3)+element_at(1)+subscribe 842.68 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 292.15 ns 2.16 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 365.12 ns 5.86 ns 5.87 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 804.87 ns 56.60 ns 55.85 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 837.76 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 887.20 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2409.48 ns 166.53 ns 158.70 ns 1.05
immediate_just+buffer(2)+subscribe 1543.45 ns 13.58 ns 13.90 ns 0.98
immediate_just+window(2)+subscribe + subscsribe inner 2428.87 ns 1052.10 ns 1091.53 ns 0.96

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 820.38 ns - - 0.00
immediate_just+take_while(true)+subscribe 834.38 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2048.99 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3470.46 ns 177.53 ns 173.15 ns 1.03
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3789.97 ns 185.57 ns 175.26 ns 1.06
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 134.04 ns 135.51 ns 0.99
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3670.17 ns 948.61 ns 1004.59 ns 0.94
immediate_just(1) + zip(immediate_just(2)) + subscribe 2371.35 ns 206.86 ns 207.71 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.53 ns 14.72 ns 14.60 ns 1.01
subscribe 100 observers to publish_subject 199128.50 ns 15016.23 ns 15476.84 ns 0.97
100 on_next to 100 observers to publish_subject 33087.90 ns 17129.44 ns 20245.56 ns 0.85

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1385.87 ns 12.66 ns 12.66 ns 1.00
basic sample with immediate scheduler 1393.98 ns 5.56 ns 5.24 ns 1.06

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 940.03 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2037.43 ns 983.90 ns 965.70 ns 1.02
create(on_error())+retry(1)+subscribe 601.94 ns 106.37 ns - 0.00

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1094.56 ns 4.78 ns 5.42 ns 0.88
Subscribe empty callbacks to empty observable via pipe operator 1162.47 ns 4.68 ns 5.44 ns 0.86

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1919.24 ns 0.23 ns 0.33 ns 0.72
from array of 1 - create + subscribe + current_thread 2422.64 ns 33.37 ns 46.17 ns 0.72
concat_as_source of just(1 immediate) create + subscribe 5821.12 ns 344.62 ns 463.26 ns 0.74
defer from array of 1 - defer + create + subscribe + immediate 2104.20 ns 0.24 ns 0.33 ns 0.72
interval - interval + take(3) + subscribe + immediate 5539.85 ns 129.49 ns 158.37 ns 0.82
interval - interval + take(3) + subscribe + current_thread 6028.52 ns 97.73 ns 132.00 ns 0.74
from array of 1 - create + as_blocking + subscribe + new_thread 83484.92 ns 80486.71 ns 111711.90 ns 0.72
from array of 1000 - create + as_blocking + subscribe + new_thread 89909.00 ns 84633.00 ns 122468.10 ns 0.69
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 8253.55 ns 379.55 ns 521.99 ns 0.73

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 3085.27 ns 0.25 ns 0.33 ns 0.77
immediate_just+filter(true)+subscribe 2215.11 ns 0.25 ns 0.32 ns 0.75
immediate_just(1,2)+skip(1)+subscribe 2963.77 ns 0.25 ns 0.32 ns 0.78
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2185.77 ns 0.50 ns 0.65 ns 0.77
immediate_just(1,2)+first()+subscribe 3458.22 ns 0.25 ns 0.33 ns 0.78
immediate_just(1,2)+last()+subscribe 2487.97 ns 0.25 ns 0.33 ns 0.77
immediate_just+take_last(1)+subscribe 3112.21 ns 0.25 ns 0.33 ns 0.75
immediate_just(1,2,3)+element_at(1)+subscribe 2273.76 ns 0.25 ns 0.33 ns 0.76

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 872.24 ns 4.38 ns 5.77 ns 0.76
current_thread scheduler create worker + schedule 1206.83 ns 37.60 ns 51.33 ns 0.73
current_thread scheduler create worker + schedule + recursive schedule 2020.03 ns 210.66 ns 285.29 ns 0.74

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2212.30 ns 4.55 ns 6.16 ns 0.74
immediate_just+scan(10, std::plus)+subscribe 2589.13 ns 0.49 ns 0.65 ns 0.75
immediate_just+flat_map(immediate_just(v*2))+subscribe 5979.18 ns 470.73 ns 563.80 ns 0.83
immediate_just+buffer(2)+subscribe 2734.99 ns 72.21 ns 90.45 ns 0.80
immediate_just+window(2)+subscribe + subscsribe inner 5852.48 ns 2615.47 ns 3348.55 ns 0.78

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2099.42 ns - - 0.00
immediate_just+take_while(true)+subscribe 2112.88 ns 0.23 ns 0.33 ns 0.72

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 5859.78 ns 5.69 ns 6.59 ns 0.86

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7695.20 ns 470.64 ns 616.68 ns 0.76
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8422.92 ns 449.53 ns 615.16 ns 0.73
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 460.68 ns 652.43 ns 0.71
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 8294.38 ns 1951.72 ns 2663.51 ns 0.73
immediate_just(1) + zip(immediate_just(2)) + subscribe 5151.78 ns 809.37 ns 1162.84 ns 0.70

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 84.51 ns 52.71 ns 66.13 ns 0.80
subscribe 100 observers to publish_subject 346354.67 ns 44964.58 ns 55122.70 ns 0.82
100 on_next to 100 observers to publish_subject 50356.43 ns 16902.58 ns 22465.44 ns 0.75

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 4067.76 ns 69.84 ns 91.36 ns 0.76
basic sample with immediate scheduler 2741.50 ns 20.10 ns 25.10 ns 0.80

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2442.17 ns 0.24 ns 0.31 ns 0.75

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 7024.93 ns 4239.80 ns 5531.59 ns 0.77
create(on_error())+retry(1)+subscribe 2018.66 ns 322.47 ns - 0.00

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 273.18 ns 0.90 ns 1.56 ns 0.58
Subscribe empty callbacks to empty observable via pipe operator 280.68 ns 0.88 ns 1.54 ns 0.57

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 574.49 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 783.03 ns 4.33 ns 4.02 ns 1.08
concat_as_source of just(1 immediate) create + subscribe 2375.85 ns 135.82 ns 135.76 ns 1.00
defer from array of 1 - defer + create + subscribe + immediate 763.33 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2224.64 ns 58.26 ns 58.36 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3206.28 ns 30.86 ns 30.86 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 29188.55 ns 28992.29 ns 27933.41 ns 1.04
from array of 1000 - create + as_blocking + subscribe + new_thread 40903.29 ns 37672.29 ns 36567.84 ns 1.03
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3765.74 ns 159.41 ns 157.15 ns 1.01

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1146.92 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 844.25 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1064.82 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 853.99 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+first()+subscribe 1368.23 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 1003.64 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1184.70 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 858.80 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 283.57 ns 0.88 ns 1.54 ns 0.57
current_thread scheduler create worker + schedule 395.67 ns 4.63 ns 4.50 ns 1.03
current_thread scheduler create worker + schedule + recursive schedule 861.11 ns 56.05 ns 55.72 ns 1.01

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 846.25 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 958.05 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2282.35 ns 139.29 ns 140.84 ns 0.99
immediate_just+buffer(2)+subscribe 1591.49 ns 13.58 ns 13.59 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2462.46 ns 943.02 ns 927.46 ns 1.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 837.77 ns - - 0.00
immediate_just+take_while(true)+subscribe 843.83 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1991.93 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3267.01 ns 161.76 ns 159.02 ns 1.02
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3694.45 ns 146.18 ns 147.45 ns 0.99
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 143.72 ns 145.28 ns 0.99
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3355.18 ns 848.06 ns 849.83 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 2248.24 ns 205.46 ns 200.40 ns 1.03

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 52.17 ns 17.57 ns 17.85 ns 0.98
subscribe 100 observers to publish_subject 215543.20 ns 16231.34 ns 15985.26 ns 1.02
100 on_next to 100 observers to publish_subject 42714.68 ns 17471.02 ns 20745.52 ns 0.84

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1275.16 ns 11.73 ns 11.11 ns 1.06
basic sample with immediate scheduler 1368.71 ns 6.17 ns 5.86 ns 1.05

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 982.28 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2153.46 ns 1207.03 ns 1230.05 ns 0.98
create(on_error())+retry(1)+subscribe 658.33 ns 146.22 ns - 0.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 559.62 ns 4.32 ns 4.94 ns 0.88
Subscribe empty callbacks to empty observable via pipe operator 585.45 ns 4.32 ns 4.94 ns 0.88

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1423.06 ns 4.62 ns 5.55 ns 0.83
from array of 1 - create + subscribe + current_thread 1442.56 ns 15.45 ns 15.74 ns 0.98
concat_as_source of just(1 immediate) create + subscribe 4077.69 ns 176.76 ns 174.06 ns 1.02
defer from array of 1 - defer + create + subscribe + immediate 1205.99 ns 4.93 ns 5.24 ns 0.94
interval - interval + take(3) + subscribe + immediate 3099.37 ns 133.46 ns 133.31 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3444.26 ns 54.78 ns 55.25 ns 0.99
from array of 1 - create + as_blocking + subscribe + new_thread 117060.00 ns 111390.00 ns 114811.11 ns 0.97
from array of 1000 - create + as_blocking + subscribe + new_thread 134762.50 ns 133800.00 ns 132911.11 ns 1.01
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5391.63 ns 205.69 ns 205.81 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1835.04 ns 12.87 ns 12.86 ns 1.00
immediate_just+filter(true)+subscribe 1318.75 ns 11.69 ns 12.38 ns 0.94
immediate_just(1,2)+skip(1)+subscribe 1715.55 ns 12.99 ns 13.13 ns 0.99
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1874.96 ns 15.80 ns 15.95 ns 0.99
immediate_just(1,2)+first()+subscribe 2059.48 ns 12.63 ns 12.95 ns 0.98
immediate_just(1,2)+last()+subscribe 1769.66 ns 14.09 ns 13.79 ns 1.02
immediate_just+take_last(1)+subscribe 2046.60 ns 59.38 ns 59.68 ns 0.99
immediate_just(1,2,3)+element_at(1)+subscribe 1337.62 ns 13.79 ns 13.77 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 484.43 ns 6.17 ns 6.17 ns 1.00
current_thread scheduler create worker + schedule 662.20 ns 14.10 ns 13.68 ns 1.03
current_thread scheduler create worker + schedule + recursive schedule 1357.86 ns 103.32 ns 107.43 ns 0.96

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1310.61 ns 10.53 ns 12.26 ns 0.86
immediate_just+scan(10, std::plus)+subscribe 1433.67 ns 21.27 ns 21.58 ns 0.99
immediate_just+flat_map(immediate_just(v*2))+subscribe 3472.85 ns 203.16 ns 207.43 ns 0.98
immediate_just+buffer(2)+subscribe 2319.42 ns 57.49 ns 57.92 ns 0.99
immediate_just+window(2)+subscribe + subscsribe inner 3997.92 ns 1309.53 ns 1290.18 ns 1.01

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1315.02 ns 11.46 ns 11.44 ns 1.00
immediate_just+take_while(true)+subscribe 1332.72 ns 11.70 ns 12.37 ns 0.95

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3551.86 ns 7.40 ns 7.71 ns 0.96

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5084.65 ns 221.54 ns 221.45 ns 1.00
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5448.86 ns 217.01 ns 213.27 ns 1.02
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 195.17 ns 197.27 ns 0.99
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5421.90 ns 948.85 ns 953.15 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 3581.23 ns 528.13 ns 525.58 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 37.44 ns 19.47 ns 20.02 ns 0.97
subscribe 100 observers to publish_subject 260650.00 ns 28552.50 ns 28978.05 ns 0.99
100 on_next to 100 observers to publish_subject 51990.00 ns 38696.77 ns 32658.06 ns 1.18

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1897.52 ns 56.84 ns 62.66 ns 0.91
basic sample with immediate scheduler 1916.90 ns 36.72 ns 36.72 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1464.39 ns 19.97 ns 20.01 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2195.49 ns 334.25 ns 350.50 ns 0.95
create(on_error())+retry(1)+subscribe 1156.38 ns 144.84 ns - 0.00

Please sign in to comment.