Skip to content

Commit

Permalink
Minor cleanup (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino authored Jun 9, 2023
1 parent a1359da commit 05a4d05
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 249 deletions.
35 changes: 15 additions & 20 deletions src/rpp/rpp/sources/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct concat_source_observer_strategy
using Type = utils::extract_observable_type_t<utils::iterable_value_t<PackedContainer>>;

RPP_NO_UNIQUE_ADDRESS mutable PackedContainer container;
mutable size_t index;

constexpr static operators::details::empty_on_subscribe on_subscribe{};
constexpr static operators::details::forwarding_on_next_strategy on_next{};
Expand All @@ -54,16 +55,7 @@ struct concat_source_observer_strategy

void on_completed(rpp::constraint::observer auto& observer) const
{
try
{
container.increment_iterator();
}
catch(...)
{
observer.on_error(std::current_exception());
return;
}
concat_strategy<PackedContainer>::drain(std::move(container), std::move(observer));
concat_strategy<PackedContainer>::drain(std::move(container), ++index, std::move(observer));
}
};

Expand All @@ -77,33 +69,36 @@ struct concat_strategy
template<constraint::observer_strategy<Type> Strategy>
void subscribe(observer<Type, Strategy>&& obs) const
{
drain(container, std::move(obs));
drain(container, size_t{}, std::move(obs));
}

template<constraint::observer_strategy<Type> Strategy>
static void drain(constraint::decayed_same_as<PackedContainer> auto&& container, observer<Type, Strategy>&& obs)
static void drain(constraint::decayed_same_as<PackedContainer> auto&& container, size_t index, observer<Type, Strategy>&& obs)
{
if (const auto observable = extract_observable(container, obs))
if (const auto observable = extract_observable(container, index, obs))
{
observable->subscribe(observer<Type,
rpp::operators::details::operator_strategy_base<Type, observer<Type, Strategy>,
concat_source_observer_strategy<PackedContainer>>>
{
std::move(obs),
std::forward<decltype(container)>(container)
std::forward<decltype(container)>(container),
index
});
}
}

private:
static std::optional<utils::iterable_value_t<PackedContainer>> extract_observable(const PackedContainer& container, const auto& obs)
static std::optional<utils::iterable_value_t<PackedContainer>> extract_observable(const PackedContainer& container, size_t index, const auto& obs)
{
try
{
const auto itr = container.get_actual_iterator();
auto itr = std::cbegin(container);
std::advance(itr, static_cast<int64_t>(index));

if (itr != std::cend(container))
{
return PackedContainer::extract_value_from_itr(itr);
return *itr;
}
}
catch (...)
Expand All @@ -114,7 +109,7 @@ struct concat_strategy
obs.on_completed();
return std::nullopt;
}

};

template<typename PackedContainer>
Expand Down Expand Up @@ -157,7 +152,7 @@ template<constraint::memory_model memory_model /*= memory_model::use_stack*/, rp
requires (std::same_as<rpp::utils::extract_observable_type_t<TObservable>, rpp::utils::extract_observable_type_t<TObservables>> && ...)
auto concat(TObservable&& obs, TObservables&&...others)
{
return make_concat_from_iterable(pack_observables<memory_model>(std::forward<TObservable>(obs), std::forward<TObservables>(others)...));
return rpp::details::make_concat_from_iterable(rpp::details::pack_observables<memory_model>(std::forward<TObservable>(obs), std::forward<TObservables>(others)...));
}

/**
Expand Down Expand Up @@ -189,6 +184,6 @@ template<constraint::memory_model memory_model /*= memory_model::use_stack*/, co
requires constraint::observable<utils::iterable_value_t<Iterable>>
auto concat(Iterable&& iterable)
{
return make_concat_from_iterable(details::pack_to_container<memory_model, std::decay_t<Iterable>>(std::forward<Iterable>(iterable)));
return rpp::details::make_concat_from_iterable(rpp::details::pack_to_container<memory_model, std::decay_t<Iterable>>(std::forward<Iterable>(iterable)));
}
}
105 changes: 20 additions & 85 deletions src/rpp/rpp/sources/from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,101 +27,29 @@ template<constraint::decayed_type Container>
class shared_container
{
public:
template<typename ...Ts>
requires (!constraint::variadic_decayed_same_as<shared_container<Container>, Ts...>)
explicit shared_container(Ts&&...items)
template<typename... Ts>
requires(!constraint::variadic_decayed_same_as<shared_container<Container>, Ts...>)
explicit shared_container(Ts&&... items)
// raw "new" call to avoid extra copy/moves for items
: m_container{new Container{std::forward<Ts>(items)...}} {}
: m_container{new Container{std::forward<Ts>(items)...}}
{
}

shared_container(const shared_container&) = default;
shared_container(const shared_container&) = default;
shared_container(shared_container&&) noexcept = default;

auto begin() const { return std::cbegin(*m_container); }
auto end() const { return std::cend(*m_container); }

auto get_actual_iterator() const
{
if (!m_iterator)
m_iterator.emplace(begin());
return m_iterator.value();
}
bool increment_iterator() const
{
if (!m_iterator)
m_iterator.emplace(begin());

return ++(m_iterator.value()) != end();
}

static const auto& extract_value_from_itr(const auto& itr) {
return *itr;
}

private:
std::shared_ptr<Container> m_container{};
mutable std::optional<decltype(std::cbegin(*m_container))> m_iterator;
};

template<constraint::decayed_type Container>
class container_with_iterator
{
public:
template<typename ...Ts>
requires (!constraint::variadic_decayed_same_as<container_with_iterator<Container>, Ts...>)
explicit container_with_iterator(Ts&&...items)
: m_container{std::forward<Ts>(items)...} {}

container_with_iterator(const container_with_iterator& other)
: m_container{other.m_container}
, m_index(other.m_index)
{}

container_with_iterator(container_with_iterator&& other) noexcept
: m_container{std::move(other.m_container)}
, m_index(other.m_index)
{}

auto begin() const { return std::cbegin(m_container); }
auto end() const { return std::cend(m_container); }

auto get_actual_iterator() const
{
if (!m_iterator)
m_iterator.emplace(get_default_iterator_value());
return m_iterator.value();
}
bool increment_iterator() const
{
if (!m_iterator)
m_iterator.emplace(get_default_iterator_value());

++m_index;
return ++(m_iterator.value()) != end();
}

static auto extract_value_from_itr(const auto& itr) {
return std::move(*itr);
}

private:
auto get_default_iterator_value() const
{
auto itr = begin();
std::advance(itr, m_index);
return itr;
}

private:
RPP_NO_UNIQUE_ADDRESS Container m_container{};
mutable size_t m_index{};
mutable std::optional<decltype(std::cbegin(m_container))> m_iterator{};
std::shared_ptr<Container> m_container{};
};

template<constraint::memory_model memory_model, constraint::iterable Container, typename ...Ts>
auto pack_to_container(Ts&& ...items)
{
if constexpr (std::same_as<memory_model, rpp::memory_model::use_stack>)
return container_with_iterator<Container>{std::forward<Ts>(items)...};
return Container{std::forward<Ts>(items)...};
else
return shared_container<Container>{std::forward<Ts>(items)...};
}
Expand All @@ -135,15 +63,22 @@ auto pack_variadic(Ts&& ...items)
struct from_iterable_schedulable
{
template<constraint::decayed_type PackedContainer, constraint::observer_strategy<utils::iterable_value_t<PackedContainer>> Strategy>
rpp::schedulers::optional_duration operator()(const observer<utils::iterable_value_t<PackedContainer>, Strategy>& obs, const PackedContainer& cont) const
rpp::schedulers::optional_duration operator()(const observer<utils::iterable_value_t<PackedContainer>, Strategy>& obs, const PackedContainer& cont, size_t& index) const
{
try
{
if (const auto itr = cont.get_actual_iterator(); itr != std::cend(cont))
auto itr = std::cbegin(cont);
auto end = std::cend(cont);
std::advance(itr, static_cast<int64_t>(index));

if (itr != end)
{
obs.on_next(utils::as_const(*itr));
if (cont.increment_iterator()) // it was not last
if (std::next(itr) != end) // it was not last
{
++index;
return schedulers::duration{}; // re-schedule this
}
}

obs.on_completed();
Expand Down Expand Up @@ -188,7 +123,7 @@ struct from_iterable_strategy
{
const auto worker = scheduler.create_worker();
obs.set_upstream(worker.get_disposable());
worker.schedule(from_iterable_schedulable{}, std::move(obs), container);
worker.schedule(from_iterable_schedulable{}, std::move(obs), container, size_t{});
}
}
};
Expand Down
14 changes: 7 additions & 7 deletions src/tests/rpp/test_concat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me
mock_observer_strategy<int> mock{};
SECTION("concat of solo observable")
{
auto observable = rpp::source::concat<TestType>(rpp::source::just(1, 2));
auto observable = rpp::source::concat<TestType>(rpp::source::just<TestType>(1, 2));
observable.subscribe(mock.get_observer());

CHECK(mock.get_received_values() == std::vector{1,2});
Expand All @@ -75,7 +75,7 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me
}
SECTION("concat of multiple same observables")
{
auto observable = rpp::source::concat<TestType>(rpp::source::just(1, 2), rpp::source::just(1, 2));
auto observable = rpp::source::concat<TestType>(rpp::source::just<TestType>(1, 2), rpp::source::just<TestType>(1, 2));
observable.subscribe(mock.get_observer());

CHECK(mock.get_received_values() == std::vector{1,2, 1, 2});
Expand All @@ -84,7 +84,7 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me
}
SECTION("concat of multiple different observables")
{
auto observable = rpp::source::concat<TestType>(rpp::source::just(1, 2), rpp::source::just(1));
auto observable = rpp::source::concat<TestType>(rpp::source::just<TestType>(1, 2), rpp::source::just<TestType>(1));
observable.subscribe(mock.get_observer());

CHECK(mock.get_received_values() == std::vector{1,2,1});
Expand All @@ -93,7 +93,7 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me
}
SECTION("concat of array of different observables")
{
auto observable = rpp::source::concat<TestType>(std::array{rpp::source::just(1, 2), rpp::source::just(1, 1)});
auto observable = rpp::source::concat<TestType>(std::array{rpp::source::just<TestType>(1, 2), rpp::source::just<TestType>(1, 1)});
observable.subscribe(mock.get_observer());

CHECK(mock.get_received_values() == std::vector{1,2,1, 1});
Expand All @@ -103,7 +103,7 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me
SECTION("concat stop if no completion")
{
std::optional<rpp::dynamic_observer<int>> observer{};
auto observable = rpp::source::concat<TestType>(rpp::source::just(1, 2), rpp::source::create<int>([&](auto&& obs){ observer.emplace(std::forward<decltype(obs)>(obs).as_dynamic()); }), rpp::source::just(3));
auto observable = rpp::source::concat<TestType>(rpp::source::just<TestType>(1, 2), rpp::source::create<int>([&](auto&& obs){ observer.emplace(std::forward<decltype(obs)>(obs).as_dynamic()); }), rpp::source::just<TestType>(3));
observable.subscribe(mock.get_observer());

CHECK(mock.get_received_values() == std::vector{1,2});
Expand Down Expand Up @@ -140,10 +140,10 @@ TEMPLATE_TEST_CASE("concat as source", "", rpp::memory_model::use_stack, rpp::me
{
auto d = std::make_shared<rpp::base_disposable>();
auto observable =
rpp::source::concat<TestType>(rpp::source::just(1),
rpp::source::concat<TestType>(rpp::source::just<TestType>(1),
rpp::source::create<int>([&](auto&& obs) { d->dispose(); obs.on_completed(); }),
rpp::source::create<int>([&](auto&&) { FAIL("Shouldn't be called"); }),
rpp::source::just(3));
rpp::source::just<TestType>(3));
observable.subscribe(rpp::disposable_wrapper{d}, mock.get_observer());

CHECK(mock.get_received_values() == std::vector{1});
Expand Down
59 changes: 15 additions & 44 deletions src/tests/rpp/test_distinct_until_changed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,23 @@

#include "mock_observer.hpp"

TEST_CASE("distinct_until_changed filters out consecutive duplicates and send first value from duplicates")
TEMPLATE_TEST_CASE("distinct_until_changed filters out consecutive duplicates and send first value from duplicates", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared)
{
auto mock = mock_observer_strategy<int>{};
SECTION("observable of values with duplicates")
auto obs = rpp::source::just<TestType>(1, 1, 2, 2, 3, 2, 2, 1);
SECTION("when subscribe on observable with duplicates via distinct_until_changed then subscriber obtains values without consecutive duplicates")
{
auto obs = rpp::source::just(1, 1, 2, 2, 3, 2, 2, 1);
SECTION("when subscribe on it via distinct_until_changed then subscriber obtains values without consecutive duplicates")
{
obs | rpp::ops::distinct_until_changed() | rpp::ops::subscribe(mock.get_observer());
CHECK(mock.get_received_values() == std::vector{ 1,2,3,2,1 });
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
SECTION("when subscribe on it via distinct_until_changed with custom comparator then subscriber obtains values without consecutive duplicates")
{
auto op = rpp::ops::distinct_until_changed([](int old_value, int new_value) {return old_value % 2 != new_value % 2; });
obs | op | rpp::ops::subscribe(mock.get_observer());
CHECK(mock.get_received_values() == std::vector{ 1, 1, 3, 1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
obs | rpp::ops::distinct_until_changed() | rpp::ops::subscribe(mock.get_observer());
CHECK(mock.get_received_values() == std::vector{ 1,2,3,2,1 });
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}
SECTION("when subscribe on observable with duplicates via distinct_until_changed with custom comparator then subscriber obtains values without consecutive duplicates")
{
auto op = rpp::ops::distinct_until_changed([](int old_value, int new_value) {return old_value % 2 != new_value % 2; });
obs | op | rpp::ops::subscribe(mock.get_observer());
CHECK(mock.get_received_values() == std::vector{ 1, 1, 3, 1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 1);
}

// SECTION("subject of values")
// {
// auto subj = rpp::subjects::publish_subject<int>{};
// SECTION("subscribe on it via distinct_until_changed and send value")
// {
// subj.get_observable().distinct_until_changed().subscribe(mock);
// subj.get_subscriber().on_next(1);
// SECTION("subscriber obtains value")
// {
// CHECK(mock.get_received_values() == std::vector{ 1});
// CHECK(mock.get_on_error_count() == 0);
// CHECK(mock.get_on_completed_count() == 0);
// }
// AND_SECTION("send duplicate")
// {
// subj.get_subscriber().on_next(1);
// SECTION("subscriber ignores duplicated value")
// {
// CHECK(mock.get_received_values() == std::vector{ 1 });
// CHECK(mock.get_on_error_count() == 0);
// CHECK(mock.get_on_completed_count() == 0);
// }
// }
// }
// }
}
13 changes: 3 additions & 10 deletions src/tests/rpp/test_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,18 @@
#include <snitch/snitch.hpp>

#include <rpp/operators/filter.hpp>
#include <rpp/sources/create.hpp>
#include <rpp/sources/just.hpp>

#include "mock_observer.hpp"

#include <stdexcept>
#include <string>

TEST_CASE("filter")
TEMPLATE_TEST_CASE("filter", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared)
{
mock_observer_strategy<int> mock{};

auto obs = rpp::source::create<int>([](const auto& obs)
{
obs.on_next(1);
obs.on_next(2);
obs.on_next(3);
obs.on_next(4);
obs.on_completed();
});
auto obs = rpp::source::just<TestType>(1,2,3,4);

SECTION("filter emits only satisfying values")
{
Expand Down
Loading

1 comment on commit 05a4d05

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 614.99 ns 1.16 ns 0.67 ns 1.73
Subscribe empty callbacks to empty observable via pipe operator 652.93 ns 0.86 ns 0.67 ns 1.29

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1577.14 ns 0.57 ns 0.37 ns 1.54
from array of 1 - create + subscribe + current_thread 2290.15 ns 4.59 ns 7.99 ns 0.57
concat_as_source of just(1 immediate) create + subscribe 4808.64 ns 4.25 ns 9.03 ns 0.47

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 1225.31 ns 0.29 ns 0.33 ns 0.86
create+filter(true)+subscribe 733.27 ns 0.29 ns 0.33 ns 0.86
create(1,2)+skip(1)+subscribe 1077.24 ns 0.29 ns 0.33 ns 0.86
create(1,1,2)+distinct_until_changed()+subscribe 749.29 ns 0.29 ns 0.33 ns 0.86

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 635.02 ns 0.58 ns 0.67 ns 0.87
current_thread scheduler create worker + schedule 764.55 ns 5.16 ns 5.94 ns 0.87
current_thread scheduler create worker + schedule + recursive schedule 1712.85 ns 73.16 ns 82.14 ns 0.89

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 729.97 ns 0.29 ns 0.33 ns 0.86
create+scan(10, std::plus)+subscribe 869.11 ns 0.29 ns 0.33 ns 0.86

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 766.61 ns - - 0.00
create+take_while(true)+subscribe 734.24 ns 0.29 ns 0.33 ns 0.86

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 2712.46 ns 0.29 ns 0.33 ns 0.86

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 3850.55 ns 184.37 ns 213.06 ns 0.87
create(1) + merge_with(create(2)) + subscribe 6046.15 ns 184.80 ns 214.73 ns 0.86

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1104.21 ns 0.56 ns 0.56 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 1112.55 ns 0.56 ns 0.56 ns 0.99

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2407.10 ns 0.44 ns 0.49 ns 0.90
from array of 1 - create + subscribe + current_thread 3152.98 ns 8.51 ns 12.10 ns 0.70
concat_as_source of just(1 immediate) create + subscribe 6629.81 ns 6.80 ns 12.65 ns 0.54

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 2053.06 ns 0.28 ns 0.28 ns 1.00
create+filter(true)+subscribe 1150.15 ns 0.28 ns 0.28 ns 0.99
create(1,2)+skip(1)+subscribe 1853.98 ns 0.28 ns 0.28 ns 1.00
create(1,1,2)+distinct_until_changed()+subscribe 1183.76 ns 0.56 ns 0.56 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 981.41 ns 0.56 ns 0.56 ns 1.01
current_thread scheduler create worker + schedule 1294.93 ns 8.36 ns 8.40 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 2280.23 ns 157.18 ns 152.94 ns 1.03

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 1183.48 ns 0.29 ns 0.28 ns 1.05
create+scan(10, std::plus)+subscribe 1435.66 ns 0.56 ns 0.56 ns 1.00

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 1202.31 ns - - 0.00
create+take_while(true)+subscribe 1179.50 ns 0.29 ns 0.28 ns 1.04

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 3638.87 ns 0.28 ns 0.28 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 4024.75 ns 385.39 ns 367.98 ns 1.05
create(1) + merge_with(create(2)) + subscribe 6841.43 ns 362.60 ns 369.02 ns 0.98

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 296.67 ns 0.60 ns 0.60 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 302.53 ns 0.60 ns 0.80 ns 0.75

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 644.08 ns 0.30 ns 0.41 ns 0.73
from array of 1 - create + subscribe + current_thread 909.80 ns 3.91 ns 5.69 ns 0.69
concat_as_source of just(1 immediate) create + subscribe 2109.59 ns 5.91 ns 7.02 ns 0.84

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 589.42 ns 0.30 ns 0.40 ns 0.74
create+filter(true)+subscribe 324.23 ns 0.30 ns 0.40 ns 0.74
create(1,2)+skip(1)+subscribe 518.06 ns 0.34 ns 0.40 ns 0.84
create(1,1,2)+distinct_until_changed()+subscribe 351.81 ns 0.59 ns 0.80 ns 0.74

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 256.08 ns 0.59 ns 0.80 ns 0.74
current_thread scheduler create worker + schedule 395.68 ns 3.61 ns 4.03 ns 0.89
current_thread scheduler create worker + schedule + recursive schedule 707.14 ns 60.29 ns 64.56 ns 0.93

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 324.38 ns 0.30 ns 0.40 ns 0.74
create+scan(10, std::plus)+subscribe 383.86 ns 0.60 ns 0.40 ns 1.49

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 378.09 ns - - 0.00
create+take_while(true)+subscribe 327.70 ns 0.32 ns 0.40 ns 0.79

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 1642.21 ns 0.30 ns 0.40 ns 0.75

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 2098.82 ns 105.41 ns 108.70 ns 0.97
create(1) + merge_with(create(2)) + subscribe 3376.63 ns 103.88 ns 104.54 ns 0.99

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1272.76 ns 2.34 ns 2.41 ns 0.97
Subscribe empty callbacks to empty observable via pipe operator 1763.59 ns 2.34 ns 2.41 ns 0.97

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 3006.59 ns 8.02 ns 10.43 ns 0.77
from array of 1 - create + subscribe + current_thread 3424.10 ns 23.74 ns 30.05 ns 0.79
concat_as_source of just(1 immediate) create + subscribe 8679.56 ns 21.81 ns 34.58 ns 0.63

Filtering Operators

name rxcpp rpp prev rpp ratio
create+take(1)+subscribe 2405.04 ns 7.37 ns 9.20 ns 0.80
create+filter(true)+subscribe 1364.36 ns 5.65 ns 6.43 ns 0.88
create(1,2)+skip(1)+subscribe 2035.38 ns 9.69 ns 9.65 ns 1.00
create(1,1,2)+distinct_until_changed()+subscribe 1374.84 ns 13.70 ns 17.64 ns 0.78

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1761.32 ns 4.51 ns 5.22 ns 0.86
current_thread scheduler create worker + schedule 1411.70 ns 10.40 ns 13.66 ns 0.76
current_thread scheduler create worker + schedule + recursive schedule 2438.55 ns 135.16 ns 165.45 ns 0.82

Transforming Operators

name rxcpp rpp prev rpp ratio
create+map(v*2)+subscribe 1335.37 ns 6.52 ns 7.23 ns 0.90
create+scan(10, std::plus)+subscribe 1554.32 ns 8.69 ns 10.32 ns 0.84

Conditional Operators

name rxcpp rpp prev rpp ratio
create+take_while(false)+subscribe 1479.86 ns 4.68 ns 5.21 ns 0.90
create+take_while(true)+subscribe 1343.60 ns 5.65 ns 7.26 ns 0.78

Utility Operators

name rxcpp rpp prev rpp ratio
create(1)+subscribe_on(immediate)+subscribe 4599.13 ns 11.71 ns 13.99 ns 0.84

Combining Operators

name rxcpp rpp prev rpp ratio
create(create(1), create(1)) + merge() + subscribe 5147.37 ns 377.16 ns 448.71 ns 0.84
create(1) + merge_with(create(2)) + subscribe 8119.05 ns 371.09 ns 444.12 ns 0.84

Please sign in to comment.