Skip to content

Commit

Permalink
Address coverage and issues (#622)
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino authored Aug 25, 2024
1 parent 7e811fb commit 97d7110
Show file tree
Hide file tree
Showing 43 changed files with 502 additions and 302 deletions.
16 changes: 8 additions & 8 deletions src/examples/rpp/doxygen/group_by.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ int main()
//! [group_by]
//
//! [group_by selector]
struct Person
struct person
{
std::string name;
int age;
};
rpp::source::just(Person{"Kate", 18},
Person{"Alex", 25},
Person{"Nick", 18},
Person{"Jack", 25},
Person{"Tom", 30},
Person{"Vanda", 18})
| rpp::operators::group_by([](const Person& v) { return v.age; }, [](const Person& v) { return v.name; })
rpp::source::just(person{"Kate", 18},
person{"Alex", 25},
person{"Nick", 18},
person{"Jack", 25},
person{"Tom", 30},
person{"Vanda", 18})
| rpp::operators::group_by([](const person& v) { return v.age; }, [](const person& v) { return v.name; })
| rpp::operators::subscribe([](auto grouped_observable) {
grouped_observable.subscribe([age = grouped_observable.get_key()](const std::string& name) {
std::cout << "Age [" << age << "] Name: " << name << std::endl;
Expand Down
2 changes: 1 addition & 1 deletion src/examples/rpp/sfml/snake/canvas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
static constexpr float s_gap_size = 1.0f;
static constexpr float s_cell_size = 10.0f;

sf::RectangleShape get_rectangle_at(Coordinates location, sf::Color color)
sf::RectangleShape get_rectangle_at(coordinates location, sf::Color color)
{
sf::RectangleShape box;
box.setSize(sf::Vector2f{s_cell_size, s_cell_size});
Expand Down
2 changes: 1 addition & 1 deletion src/examples/rpp/sfml/snake/canvas.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

#include "snake.hpp"

sf::RectangleShape get_rectangle_at(Coordinates location, sf::Color color);
sf::RectangleShape get_rectangle_at(coordinates location, sf::Color color);
sf::Vector2u get_window_size(size_t rows_count, size_t cols_count);
6 changes: 3 additions & 3 deletions src/examples/rpp/sfml/snake/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static auto get_events_observable(sf::RenderWindow& window)
return res;

// indicate new frame
obs.on_next(PresentEvent{frame_number++});
obs.on_next(present_event{frame_number++});

while (window.pollEvent(ev))
obs.on_next(ev);
Expand All @@ -43,12 +43,12 @@ int main()

auto start = rpp::schedulers::clock_type::now();
size_t frames_delta = 0;
presents.subscribe([&window](const PresentEvent&) {
presents.subscribe([&window](const present_event&) {
window.display();
window.clear(sf::Color{0, 128, 0});
});

presents | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe([&start, &frames_delta](const PresentEvent& p) {
presents | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe([&start, &frames_delta](const present_event& p) {
const auto diff = p.frame_number - frames_delta;
if (diff > 50)
{
Expand Down
28 changes: 14 additions & 14 deletions src/examples/rpp/sfml/snake/snake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
static SnakeBody generate_initial_snake_body()
{
// tail to head
return SnakeBody{Coordinates{1, 1}, Coordinates{2, 1}, Coordinates{3, 1}, Coordinates{4, 1}};
return SnakeBody{coordinates{1, 1}, coordinates{2, 1}, coordinates{3, 1}, coordinates{4, 1}};
}

static Coordinates generate_initial_apple()
static coordinates generate_initial_apple()
{
return Coordinates{3, 5};
return coordinates{3, 5};
}

static int wrap_coordinate(int value, int max_value)
Expand Down Expand Up @@ -55,17 +55,17 @@ static bool is_snake_eat_self(const SnakeBody& body)
return std::find(body.cbegin(), body.cend() - 1, head) == body.cend() - 1;
}

static Coordinates update_apple_position_if_eat(Coordinates&& apple_position, const SnakeBody& snake)
static coordinates update_apple_position_if_eat(coordinates&& apple_position, const SnakeBody& snake)
{
if (std::find(snake.cbegin(), snake.cend(), apple_position) == snake.cend())
return apple_position;

static std::random_device rd;
static std::mt19937 rng(rd());
static std::uniform_int_distribution<int> x_uni(0, s_columns_count - 1);
static std::uniform_int_distribution<int> y_uni(0, s_rows_count - 1);
static std::random_device s_rd;
static std::mt19937 s_rng(s_rd());
static std::uniform_int_distribution<int> s_x_uni(0, s_columns_count - 1);
static std::uniform_int_distribution<int> s_y_uni(0, s_rows_count - 1);

return Coordinates{x_uni(rng), y_uni(rng)};
return coordinates{s_x_uni(s_rng), s_y_uni(s_rng)};
}

static Direction select_next_not_opposite_direction(Direction&& current_direction, const Direction& new_direction)
Expand Down Expand Up @@ -126,16 +126,16 @@ rpp::dynamic_observable<sf::RectangleShape> get_shapes_to_draw(const rpp::dynami
| rpp::ops::publish()
| rpp::ops::ref_count();

static constexpr size_t points_per_apple = 1;
static constexpr size_t s_points_per_apple = 1;
apple_position | rpp::ops::distinct_until_changed()
| rpp::ops::skip(1) // skip first apple position to avoid snake growing immediately
| rpp::ops::map([](const auto&) { return points_per_apple; })
| rpp::ops::map([](const auto&) { return s_points_per_apple; })
| rpp::ops::subscribe(snake_earn_points.get_observer());

auto drawable_objects = snake_body | rpp::ops::with_latest_from([](const SnakeBody& body, const Coordinates& apple_coords) {
auto drawable_objects = snake_body | rpp::ops::with_latest_from([](const SnakeBody& body, const coordinates& apple_coords) {
return rpp::source::from_iterable(body)
| rpp::ops::map([](const Coordinates& coords) { return get_rectangle_at(coords, sf::Color::White); })
| rpp::ops::merge_with(rpp::source::just(apple_coords) | rpp::ops::map([](const Coordinates& coords) {
| rpp::ops::map([](const coordinates& coords) { return get_rectangle_at(coords, sf::Color::White); })
| rpp::ops::merge_with(rpp::source::just(apple_coords) | rpp::ops::map([](const coordinates& coords) {
return get_rectangle_at(coords, sf::Color::Red);
}));
},
Expand Down
18 changes: 9 additions & 9 deletions src/examples/rpp/sfml/snake/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@
static constexpr size_t s_rows_count = 20;
static constexpr size_t s_columns_count = 30;

struct Coordinates
struct coordinates
{
int x{};
int y{};

bool operator==(const Coordinates& other) const = default;
bool operator!=(const Coordinates& other) const = default;
bool operator==(const coordinates& other) const = default;
bool operator!=(const coordinates& other) const = default;
};

using Direction = Coordinates;
using SnakeBody = std::vector<Coordinates>;
using Direction = coordinates;
using SnakeBody = std::vector<coordinates>;

inline rpp::schedulers::run_loop g_run_loop{};

struct PresentEvent
struct present_event
{
size_t frame_number{};
};

using CustomEvent = std::variant<PresentEvent, sf::Event>;
using CustomEvent = std::variant<present_event, sf::Event>;

auto get_presents_stream(const auto& events)
{
return events | rpp::ops::filter([](const CustomEvent& ev) { return std::holds_alternative<PresentEvent>(ev); })
| rpp::ops::map([](const CustomEvent& ev) { return std::get<PresentEvent>(ev); });
return events | rpp::ops::filter([](const CustomEvent& ev) { return std::holds_alternative<present_event>(ev); })
| rpp::ops::map([](const CustomEvent& ev) { return std::get<present_event>(ev); });
}
2 changes: 1 addition & 1 deletion src/examples/rppgrpc/doxygen/server_reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* \example server_reactor.cpp
**/

class Server : public TestService::CallbackService
class server : public TestService::CallbackService
{

//! [read_reactor]
Expand Down
38 changes: 19 additions & 19 deletions src/extensions/rppgrpc/rppgrpc/details/base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ namespace rppgrpc::details

void handle_write_done()
{
std::lock_guard lock{write_mutex};
write.pop_front();
std::lock_guard lock{m_write_mutex};
m_write.pop_front();

if (!write.empty())
if (!m_write.empty())
{
start_write(write.front());
start_write(m_write.front());
}
else if (finished)
else if (m_finished)
{
finish_writes(grpc::Status::OK);
}
Expand All @@ -70,26 +70,26 @@ namespace rppgrpc::details
template<rpp::constraint::decayed_same_as<TData> T>
void on_next(T&& message) const
{
std::lock_guard lock{owner.get().write_mutex};
owner.get().write.push_back(std::forward<T>(message));
if (owner.get().write.size() == 1)
owner.get().start_write(owner.get().write.front());
std::lock_guard lock{owner.get().m_write_mutex};
owner.get().m_write.push_back(std::forward<T>(message));
if (owner.get().m_write.size() == 1)
owner.get().start_write(owner.get().m_write.front());
}

void on_error(const std::exception_ptr&) const
{
std::lock_guard lock{owner.get().write_mutex};
owner.get().finished = true;
std::lock_guard lock{owner.get().m_write_mutex};
owner.get().m_finished = true;

if (owner.get().write.size() == 0)
if (owner.get().m_write.size() == 0)
owner.get().finish_writes(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"});
}
void on_completed() const
{
std::lock_guard lock{owner.get().write_mutex};
owner.get().finished = true;
std::lock_guard lock{owner.get().m_write_mutex};
owner.get().m_finished = true;

if (owner.get().write.size() == 0)
if (owner.get().m_write.size() == 0)
owner.get().finish_writes(grpc::Status::OK);
}

Expand All @@ -100,9 +100,9 @@ namespace rppgrpc::details
private:
rpp::subjects::serialized_publish_subject<TData> m_subject{};

std::mutex write_mutex{};
std::deque<TData> write{};
bool finished{};
std::mutex m_write_mutex{};
std::deque<TData> m_write{};
bool m_finished{};
};

template<rpp::constraint::decayed_type TData>
Expand Down Expand Up @@ -137,7 +137,7 @@ namespace rppgrpc::details

private:
rpp::subjects::publish_subject<TData> m_observer;
TData m_data{};
RPP_NO_UNIQUE_ADDRESS TData m_data{};
};

} // namespace rppgrpc::details
2 changes: 1 addition & 1 deletion src/extensions/rppqt/rppqt/schedulers/main_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace rppqt::schedulers
return;
}

QTimer::singleShot(std::chrono::duration_cast<std::chrono::milliseconds>(duration), application, [fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)]() mutable {
QTimer::singleShot(std::chrono::duration_cast<std::chrono::milliseconds>(std::max(rpp::schedulers::duration{}, duration)), application, [fn = std::forward<Fn>(fn), handler = std::forward<Handler>(handler), ... args = std::forward<Args>(args)]() mutable {
if (!handler.is_disposed())
invoke(std::move(fn), std::move(handler), std::move(args)...);
});
Expand Down
5 changes: 1 addition & 4 deletions src/rpp/rpp/defs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
#define RPP_EMPTY_BASES
#endif

// MSVC has bad support for the no_unique_address from the box NOW, but provides specificator to enable full support. GCC/Clang works as expected
#if defined(_MSC_VER) && _MSC_FULL_VER >= 192829913
#define RPP_NO_UNIQUE_ADDRESS [[msvc::no_unique_address]]
#elif defined(__APPLE__) && defined(__clang__) // apple works unexpectedly bad on clang 15 =C
#if defined(__APPLE__) && defined(__clang__) // apple works unexpectedly bad on clang 15 =C
#define RPP_NO_UNIQUE_ADDRESS
#else
#define RPP_NO_UNIQUE_ADDRESS [[no_unique_address]]
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/disposables/callback_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace rpp
void base_dispose_impl(interface_disposable::Mode) noexcept override { std::move(m_fn)(); } // NOLINT(bugprone-exception-escape)

private:
Fn m_fn;
RPP_NO_UNIQUE_ADDRESS Fn m_fn;
};

template<rpp::constraint::is_nothrow_invocable Fn>
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/disposables/details/container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace rpp::details::disposables

void dispose() const
{
for (auto& d : m_data)
for (const auto& d : m_data)
{
d.dispose();
}
Expand Down
4 changes: 2 additions & 2 deletions src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ namespace rpp
}

private:
OriginalObservable m_original_observable;
Subject m_subject;
RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable;
Subject m_subject;

struct state_t
{
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/observers/dynamic_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace rpp::details::observers
}

private:
TObs m_observer;
RPP_NO_UNIQUE_ADDRESS TObs m_observer;
};

template<rpp::constraint::decayed_type Type>
Expand Down
8 changes: 4 additions & 4 deletions src/rpp/rpp/observers/mock_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class mock_observer_strategy final
{
public:
explicit mock_observer_strategy(bool copy_values = true)
: m_state{std::make_shared<State>(copy_values)}
: m_state{std::make_shared<state>(copy_values)}
{
}

Expand Down Expand Up @@ -57,9 +57,9 @@ class mock_observer_strategy final
auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_disposable<Type, mock_observer_strategy<Type>>{std::move(d), *this}; }

private:
struct State
struct state
{
explicit State(bool copy_values)
explicit state(bool copy_values)
: m_copy_values{copy_values}
{
}
Expand All @@ -73,5 +73,5 @@ class mock_observer_strategy final
std::vector<Type> vals{};
};

std::shared_ptr<State> m_state{};
std::shared_ptr<state> m_state{};
};
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace rpp::operators::details
template<rpp::constraint::observable TObservable, rpp::constraint::observer TObserver>
struct concat_inner_observer_strategy;

enum ConcatStage : uint8_t
enum class ConcatStage : uint8_t
{
None = 0,
Draining = 1,
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/operators/delay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ namespace rpp::operators::details
}
}

Observer observer;
RPP_NO_UNIQUE_ADDRESS Worker worker;
rpp::schedulers::duration delay;
RPP_NO_UNIQUE_ADDRESS Observer observer;
RPP_NO_UNIQUE_ADDRESS Worker worker;
rpp::schedulers::duration delay;

std::mutex mutex{};
std::queue<emission<T>> queue;
Expand Down
Loading

1 comment on commit 97d7110

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 302.46 ns 2.16 ns 2.16 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 299.87 ns 2.16 ns 2.16 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 678.00 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1043.60 ns 3.42 ns 3.70 ns 0.92
concat_as_source of just(1 immediate) create + subscribe 2233.74 ns 101.14 ns 105.08 ns 0.96
defer from array of 1 - defer + create + subscribe + immediate 750.18 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2161.92 ns 59.19 ns 59.33 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3188.34 ns 32.43 ns 32.40 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 29292.38 ns 27828.64 ns 28073.41 ns 0.99
from array of 1000 - create + as_blocking + subscribe + new_thread 40280.44 ns 48786.54 ns 50189.17 ns 0.97
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3504.31 ns 133.21 ns 126.09 ns 1.06

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1091.58 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 848.30 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1007.13 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 858.84 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1241.33 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+last()+subscribe 905.87 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1104.58 ns 17.59 ns 17.60 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 836.70 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 264.39 ns 2.16 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 369.72 ns 5.56 ns 5.86 ns 0.95
current_thread scheduler create worker + schedule + recursive schedule 821.19 ns 56.53 ns 56.54 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 835.00 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 915.39 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2387.72 ns 169.51 ns 159.52 ns 1.06
immediate_just+buffer(2)+subscribe 1525.60 ns 13.89 ns 13.58 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 2452.89 ns 1135.16 ns 1028.32 ns 1.10

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 839.53 ns - - 0.00
immediate_just+take_while(true)+subscribe 879.96 ns 0.31 ns 0.31 ns 1.01

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2006.06 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3402.96 ns 181.95 ns 179.31 ns 1.01
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3573.94 ns 209.07 ns 185.48 ns 1.13
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 140.15 ns 132.96 ns 1.05
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3529.33 ns 906.77 ns 997.81 ns 0.91
immediate_just(1) + zip(immediate_just(2)) + subscribe 2144.23 ns 205.74 ns 206.75 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.57 ns 14.72 ns 14.72 ns 1.00
subscribe 100 observers to publish_subject 198919.83 ns 15583.63 ns 15319.15 ns 1.02
100 on_next to 100 observers to publish_subject 32619.81 ns 20051.19 ns 18842.10 ns 1.06

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1457.82 ns 12.96 ns 12.65 ns 1.02
basic sample with immediate scheduler 1403.91 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 932.97 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2046.48 ns 1002.62 ns 996.08 ns 1.01
create(on_error())+retry(1)+subscribe 592.98 ns 119.47 ns 111.07 ns 1.08

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 967.67 ns 3.94 ns 5.40 ns 0.73
Subscribe empty callbacks to empty observable via pipe operator 965.90 ns 3.94 ns 4.62 ns 0.85

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1923.99 ns 0.23 ns 0.32 ns 0.73
from array of 1 - create + subscribe + current_thread 2405.25 ns 36.23 ns 39.03 ns 0.93
concat_as_source of just(1 immediate) create + subscribe 5438.49 ns 334.90 ns 420.96 ns 0.80
defer from array of 1 - defer + create + subscribe + immediate 1984.35 ns 0.23 ns 0.28 ns 0.85
interval - interval + take(3) + subscribe + immediate 4934.81 ns 113.56 ns 133.53 ns 0.85
interval - interval + take(3) + subscribe + current_thread 6007.18 ns 96.65 ns 110.64 ns 0.87
from array of 1 - create + as_blocking + subscribe + new_thread 83937.36 ns 81297.77 ns 134078.29 ns 0.61
from array of 1000 - create + as_blocking + subscribe + new_thread 93912.30 ns 104495.08 ns 151241.57 ns 0.69
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 8147.24 ns 379.06 ns 448.26 ns 0.85

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 2843.10 ns 0.23 ns 0.29 ns 0.80
immediate_just+filter(true)+subscribe 2062.80 ns 0.23 ns 0.28 ns 0.80
immediate_just(1,2)+skip(1)+subscribe 2684.39 ns 0.23 ns 0.29 ns 0.79
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2044.61 ns 0.46 ns 0.60 ns 0.77
immediate_just(1,2)+first()+subscribe 3118.09 ns 0.23 ns 0.27 ns 0.84
immediate_just(1,2)+last()+subscribe 2325.24 ns 0.23 ns 0.33 ns 0.69
immediate_just+take_last(1)+subscribe 2954.73 ns 0.23 ns 0.28 ns 0.84
immediate_just(1,2,3)+element_at(1)+subscribe 2092.61 ns 0.23 ns 0.28 ns 0.81

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 841.23 ns 4.14 ns 4.85 ns 0.85
current_thread scheduler create worker + schedule 1183.22 ns 39.43 ns 42.73 ns 0.92
current_thread scheduler create worker + schedule + recursive schedule 1996.15 ns 207.47 ns 242.41 ns 0.86

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2112.41 ns 4.43 ns 5.17 ns 0.86
immediate_just+scan(10, std::plus)+subscribe 2329.58 ns 0.47 ns 0.62 ns 0.76
immediate_just+flat_map(immediate_just(v*2))+subscribe 5296.23 ns 403.83 ns 517.27 ns 0.78
immediate_just+buffer(2)+subscribe 2484.15 ns 64.75 ns 81.41 ns 0.80
immediate_just+window(2)+subscribe + subscsribe inner 5291.36 ns 2400.79 ns 3319.91 ns 0.72

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2122.64 ns - - 0.00
immediate_just+take_while(true)+subscribe 2125.13 ns 0.24 ns 0.34 ns 0.69

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 4785.98 ns 4.78 ns 6.16 ns 0.78

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 7431.12 ns 443.63 ns 528.61 ns 0.84
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 8328.78 ns 439.74 ns 516.77 ns 0.85
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 458.70 ns 605.83 ns 0.76
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 8042.52 ns 1903.55 ns 2994.50 ns 0.64
immediate_just(1) + zip(immediate_just(2)) + subscribe 5185.14 ns 847.92 ns 1137.75 ns 0.75

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 73.46 ns 46.75 ns 60.47 ns 0.77
subscribe 100 observers to publish_subject 333690.00 ns 38867.07 ns 51052.45 ns 0.76
100 on_next to 100 observers to publish_subject 47643.12 ns 17348.30 ns 22372.85 ns 0.78

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 2581.51 ns 69.20 ns 85.42 ns 0.81
basic sample with immediate scheduler 2580.05 ns 17.81 ns 21.80 ns 0.82

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2273.14 ns 0.23 ns 0.33 ns 0.69

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 6190.02 ns 3944.55 ns 4848.54 ns 0.81
create(on_error())+retry(1)+subscribe 1718.08 ns 283.42 ns 362.58 ns 0.78

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 270.29 ns 0.88 ns 1.54 ns 0.57
Subscribe empty callbacks to empty observable via pipe operator 271.82 ns 0.88 ns 1.54 ns 0.57

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 602.04 ns 0.70 ns 0.34 ns 2.03
from array of 1 - create + subscribe + current_thread 793.14 ns 4.01 ns 4.32 ns 0.93
concat_as_source of just(1 immediate) create + subscribe 2354.14 ns 136.13 ns 134.92 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 780.88 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2204.72 ns 58.30 ns 58.31 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3120.84 ns 30.88 ns 30.86 ns 1.00
from array of 1 - create + as_blocking + subscribe + new_thread 30549.79 ns 28054.44 ns 28005.07 ns 1.00
from array of 1000 - create + as_blocking + subscribe + new_thread 36795.97 ns 35054.34 ns 36459.93 ns 0.96
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 3679.12 ns 159.42 ns 156.82 ns 1.02

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1175.13 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 841.52 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1107.47 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 881.02 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+first()+subscribe 1380.47 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 1030.67 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1238.80 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2,3)+element_at(1)+subscribe 859.66 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 281.32 ns 0.88 ns 1.54 ns 0.57
current_thread scheduler create worker + schedule 389.07 ns 4.02 ns 4.79 ns 0.84
current_thread scheduler create worker + schedule + recursive schedule 850.84 ns 64.47 ns 55.85 ns 1.15

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 853.55 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 979.87 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2434.28 ns 139.32 ns 144.46 ns 0.96
immediate_just+buffer(2)+subscribe 1536.96 ns 13.89 ns 13.88 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2439.23 ns 923.38 ns 936.43 ns 0.99

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 836.10 ns - - 0.00
immediate_just+take_while(true)+subscribe 848.56 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 2019.56 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3333.35 ns 162.52 ns 159.69 ns 1.02
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3711.63 ns 147.67 ns 145.90 ns 1.01
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 144.69 ns 144.26 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3351.20 ns 844.76 ns 845.49 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 2201.04 ns 200.06 ns 200.63 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 52.09 ns 17.85 ns 17.65 ns 1.01
subscribe 100 observers to publish_subject 203370.50 ns 16488.79 ns 16378.32 ns 1.01
100 on_next to 100 observers to publish_subject 40930.46 ns 17665.30 ns 21234.39 ns 0.83

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1502.30 ns 11.42 ns 11.72 ns 0.97
basic sample with immediate scheduler 1344.53 ns 5.86 ns 6.17 ns 0.95

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1018.13 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2169.35 ns 1256.87 ns 1215.57 ns 1.03
create(on_error())+retry(1)+subscribe 642.65 ns 146.51 ns 145.97 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 566.86 ns 4.01 ns 4.69 ns 0.86
Subscribe empty callbacks to empty observable via pipe operator 583.01 ns 4.02 ns 4.79 ns 0.84

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1160.00 ns 9.71 ns 4.62 ns 2.10
from array of 1 - create + subscribe + current_thread 1429.01 ns 17.90 ns 15.51 ns 1.15
concat_as_source of just(1 immediate) create + subscribe 3743.64 ns 187.77 ns 172.56 ns 1.09
defer from array of 1 - defer + create + subscribe + immediate 1264.06 ns 9.41 ns 4.94 ns 1.91
interval - interval + take(3) + subscribe + immediate 3420.32 ns 151.41 ns 133.24 ns 1.14
interval - interval + take(3) + subscribe + current_thread 3389.17 ns 65.69 ns 54.57 ns 1.20
from array of 1 - create + as_blocking + subscribe + new_thread 122177.78 ns 115000.00 ns 109020.00 ns 1.05
from array of 1000 - create + as_blocking + subscribe + new_thread 135828.57 ns 138412.50 ns 126250.00 ns 1.10
concat_as_source of just(1 immediate) and just(1,2 immediate)create + subscribe 5584.69 ns 211.58 ns 206.73 ns 1.02

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1827.72 ns 25.28 ns 12.87 ns 1.96
immediate_just+filter(true)+subscribe 1366.10 ns 24.35 ns 12.37 ns 1.97
immediate_just(1,2)+skip(1)+subscribe 1750.07 ns 24.06 ns 13.13 ns 1.83
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1367.46 ns 28.99 ns 15.94 ns 1.82
immediate_just(1,2)+first()+subscribe 2094.60 ns 22.83 ns 12.94 ns 1.76
immediate_just(1,2)+last()+subscribe 1822.87 ns 24.06 ns 13.80 ns 1.74
immediate_just+take_last(1)+subscribe 2033.00 ns 78.94 ns 58.99 ns 1.34
immediate_just(1,2,3)+element_at(1)+subscribe 1357.90 ns 27.47 ns 13.78 ns 1.99

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 477.53 ns 6.17 ns 6.17 ns 1.00
current_thread scheduler create worker + schedule 648.34 ns 13.89 ns 14.07 ns 0.99
current_thread scheduler create worker + schedule + recursive schedule 1356.56 ns 104.76 ns 104.67 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1336.85 ns 24.35 ns 12.34 ns 1.97
immediate_just+scan(10, std::plus)+subscribe 1464.76 ns 26.54 ns 21.58 ns 1.23
immediate_just+flat_map(immediate_just(v*2))+subscribe 3581.82 ns 221.95 ns 203.81 ns 1.09
immediate_just+buffer(2)+subscribe 2670.83 ns 69.64 ns 58.38 ns 1.19
immediate_just+window(2)+subscribe + subscsribe inner 4065.87 ns 1307.08 ns 1322.69 ns 0.99

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1654.75 ns 23.14 ns 11.46 ns 2.02
immediate_just+take_while(true)+subscribe 1371.75 ns 24.35 ns 12.37 ns 1.97

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3487.61 ns 11.10 ns 7.40 ns 1.50

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5126.91 ns 224.19 ns 225.94 ns 0.99
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 5435.08 ns 228.92 ns 214.63 ns 1.07
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 201.24 ns 193.76 ns 1.04
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 5434.55 ns 967.80 ns 942.66 ns 1.03
immediate_just(1) + zip(immediate_just(2)) + subscribe 3597.48 ns 519.53 ns 519.55 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 37.78 ns 20.35 ns 19.79 ns 1.03
subscribe 100 observers to publish_subject 270225.00 ns 28513.89 ns 27657.14 ns 1.03
100 on_next to 100 observers to publish_subject 51755.00 ns 32680.65 ns 38686.21 ns 0.84

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1868.67 ns 101.69 ns 58.62 ns 1.73
basic sample with immediate scheduler 1874.41 ns 73.93 ns 36.72 ns 2.01

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1467.13 ns 24.99 ns 19.97 ns 1.25

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 2196.78 ns 350.92 ns 341.28 ns 1.03
create(on_error())+retry(1)+subscribe 1175.31 ns 148.89 ns 144.20 ns 1.03

Please sign in to comment.