diff --git a/src/examples/rpp/doxygen/group_by.cpp b/src/examples/rpp/doxygen/group_by.cpp index 51fe55710..31eeebd47 100644 --- a/src/examples/rpp/doxygen/group_by.cpp +++ b/src/examples/rpp/doxygen/group_by.cpp @@ -30,18 +30,18 @@ int main() //! [group_by] // //! [group_by selector] - struct Person + struct person { std::string name; int age; }; - rpp::source::just(Person{"Kate", 18}, - Person{"Alex", 25}, - Person{"Nick", 18}, - Person{"Jack", 25}, - Person{"Tom", 30}, - Person{"Vanda", 18}) - | rpp::operators::group_by([](const Person& v) { return v.age; }, [](const Person& v) { return v.name; }) + rpp::source::just(person{"Kate", 18}, + person{"Alex", 25}, + person{"Nick", 18}, + person{"Jack", 25}, + person{"Tom", 30}, + person{"Vanda", 18}) + | rpp::operators::group_by([](const person& v) { return v.age; }, [](const person& v) { return v.name; }) | rpp::operators::subscribe([](auto grouped_observable) { grouped_observable.subscribe([age = grouped_observable.get_key()](const std::string& name) { std::cout << "Age [" << age << "] Name: " << name << std::endl; diff --git a/src/examples/rpp/sfml/snake/canvas.cpp b/src/examples/rpp/sfml/snake/canvas.cpp index bfdc34717..10b772347 100644 --- a/src/examples/rpp/sfml/snake/canvas.cpp +++ b/src/examples/rpp/sfml/snake/canvas.cpp @@ -6,7 +6,7 @@ static constexpr float s_gap_size = 1.0f; static constexpr float s_cell_size = 10.0f; -sf::RectangleShape get_rectangle_at(Coordinates location, sf::Color color) +sf::RectangleShape get_rectangle_at(coordinates location, sf::Color color) { sf::RectangleShape box; box.setSize(sf::Vector2f{s_cell_size, s_cell_size}); diff --git a/src/examples/rpp/sfml/snake/canvas.hpp b/src/examples/rpp/sfml/snake/canvas.hpp index 8f4e52a17..0db68a76f 100644 --- a/src/examples/rpp/sfml/snake/canvas.hpp +++ b/src/examples/rpp/sfml/snake/canvas.hpp @@ -4,5 +4,5 @@ #include "snake.hpp" -sf::RectangleShape get_rectangle_at(Coordinates location, sf::Color color); +sf::RectangleShape get_rectangle_at(coordinates location, sf::Color color); sf::Vector2u get_window_size(size_t rows_count, size_t cols_count); diff --git a/src/examples/rpp/sfml/snake/main.cpp b/src/examples/rpp/sfml/snake/main.cpp index 236ff5cd9..88389b0e6 100644 --- a/src/examples/rpp/sfml/snake/main.cpp +++ b/src/examples/rpp/sfml/snake/main.cpp @@ -20,7 +20,7 @@ static auto get_events_observable(sf::RenderWindow& window) return res; // indicate new frame - obs.on_next(PresentEvent{frame_number++}); + obs.on_next(present_event{frame_number++}); while (window.pollEvent(ev)) obs.on_next(ev); @@ -43,12 +43,12 @@ int main() auto start = rpp::schedulers::clock_type::now(); size_t frames_delta = 0; - presents.subscribe([&window](const PresentEvent&) { + presents.subscribe([&window](const present_event&) { window.display(); window.clear(sf::Color{0, 128, 0}); }); - presents | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe([&start, &frames_delta](const PresentEvent& p) { + presents | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe([&start, &frames_delta](const present_event& p) { const auto diff = p.frame_number - frames_delta; if (diff > 50) { diff --git a/src/examples/rpp/sfml/snake/snake.cpp b/src/examples/rpp/sfml/snake/snake.cpp index 4459c6c7e..d3e495faf 100644 --- a/src/examples/rpp/sfml/snake/snake.cpp +++ b/src/examples/rpp/sfml/snake/snake.cpp @@ -12,12 +12,12 @@ static SnakeBody generate_initial_snake_body() { // tail to head - return SnakeBody{Coordinates{1, 1}, Coordinates{2, 1}, Coordinates{3, 1}, Coordinates{4, 1}}; + return SnakeBody{coordinates{1, 1}, coordinates{2, 1}, coordinates{3, 1}, coordinates{4, 1}}; } -static Coordinates generate_initial_apple() +static coordinates generate_initial_apple() { - return Coordinates{3, 5}; + return coordinates{3, 5}; } static int wrap_coordinate(int value, int max_value) @@ -55,17 +55,17 @@ static bool is_snake_eat_self(const SnakeBody& body) return std::find(body.cbegin(), body.cend() - 1, head) == body.cend() - 1; } -static Coordinates update_apple_position_if_eat(Coordinates&& apple_position, const SnakeBody& snake) +static coordinates update_apple_position_if_eat(coordinates&& apple_position, const SnakeBody& snake) { if (std::find(snake.cbegin(), snake.cend(), apple_position) == snake.cend()) return apple_position; - static std::random_device rd; - static std::mt19937 rng(rd()); - static std::uniform_int_distribution x_uni(0, s_columns_count - 1); - static std::uniform_int_distribution y_uni(0, s_rows_count - 1); + static std::random_device s_rd; + static std::mt19937 s_rng(s_rd()); + static std::uniform_int_distribution s_x_uni(0, s_columns_count - 1); + static std::uniform_int_distribution s_y_uni(0, s_rows_count - 1); - return Coordinates{x_uni(rng), y_uni(rng)}; + return coordinates{s_x_uni(s_rng), s_y_uni(s_rng)}; } static Direction select_next_not_opposite_direction(Direction&& current_direction, const Direction& new_direction) @@ -126,16 +126,16 @@ rpp::dynamic_observable get_shapes_to_draw(const rpp::dynami | rpp::ops::publish() | rpp::ops::ref_count(); - static constexpr size_t points_per_apple = 1; + static constexpr size_t s_points_per_apple = 1; apple_position | rpp::ops::distinct_until_changed() | rpp::ops::skip(1) // skip first apple position to avoid snake growing immediately - | rpp::ops::map([](const auto&) { return points_per_apple; }) + | rpp::ops::map([](const auto&) { return s_points_per_apple; }) | rpp::ops::subscribe(snake_earn_points.get_observer()); - auto drawable_objects = snake_body | rpp::ops::with_latest_from([](const SnakeBody& body, const Coordinates& apple_coords) { + auto drawable_objects = snake_body | rpp::ops::with_latest_from([](const SnakeBody& body, const coordinates& apple_coords) { return rpp::source::from_iterable(body) - | rpp::ops::map([](const Coordinates& coords) { return get_rectangle_at(coords, sf::Color::White); }) - | rpp::ops::merge_with(rpp::source::just(apple_coords) | rpp::ops::map([](const Coordinates& coords) { + | rpp::ops::map([](const coordinates& coords) { return get_rectangle_at(coords, sf::Color::White); }) + | rpp::ops::merge_with(rpp::source::just(apple_coords) | rpp::ops::map([](const coordinates& coords) { return get_rectangle_at(coords, sf::Color::Red); })); }, diff --git a/src/examples/rpp/sfml/snake/utils.hpp b/src/examples/rpp/sfml/snake/utils.hpp index 4521c9825..866c2a248 100644 --- a/src/examples/rpp/sfml/snake/utils.hpp +++ b/src/examples/rpp/sfml/snake/utils.hpp @@ -11,29 +11,29 @@ static constexpr size_t s_rows_count = 20; static constexpr size_t s_columns_count = 30; -struct Coordinates +struct coordinates { int x{}; int y{}; - bool operator==(const Coordinates& other) const = default; - bool operator!=(const Coordinates& other) const = default; + bool operator==(const coordinates& other) const = default; + bool operator!=(const coordinates& other) const = default; }; -using Direction = Coordinates; -using SnakeBody = std::vector; +using Direction = coordinates; +using SnakeBody = std::vector; inline rpp::schedulers::run_loop g_run_loop{}; -struct PresentEvent +struct present_event { size_t frame_number{}; }; -using CustomEvent = std::variant; +using CustomEvent = std::variant; auto get_presents_stream(const auto& events) { - return events | rpp::ops::filter([](const CustomEvent& ev) { return std::holds_alternative(ev); }) - | rpp::ops::map([](const CustomEvent& ev) { return std::get(ev); }); + return events | rpp::ops::filter([](const CustomEvent& ev) { return std::holds_alternative(ev); }) + | rpp::ops::map([](const CustomEvent& ev) { return std::get(ev); }); } diff --git a/src/examples/rppgrpc/doxygen/server_reactor.cpp b/src/examples/rppgrpc/doxygen/server_reactor.cpp index ebfb5fcc8..86f5822ce 100644 --- a/src/examples/rppgrpc/doxygen/server_reactor.cpp +++ b/src/examples/rppgrpc/doxygen/server_reactor.cpp @@ -9,7 +9,7 @@ * \example server_reactor.cpp **/ -class Server : public TestService::CallbackService +class server : public TestService::CallbackService { //! [read_reactor] diff --git a/src/extensions/rppgrpc/rppgrpc/details/base.hpp b/src/extensions/rppgrpc/rppgrpc/details/base.hpp index 30a9dc96e..c5e73facb 100644 --- a/src/extensions/rppgrpc/rppgrpc/details/base.hpp +++ b/src/extensions/rppgrpc/rppgrpc/details/base.hpp @@ -50,14 +50,14 @@ namespace rppgrpc::details void handle_write_done() { - std::lock_guard lock{write_mutex}; - write.pop_front(); + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); - if (!write.empty()) + if (!m_write.empty()) { - start_write(write.front()); + start_write(m_write.front()); } - else if (finished) + else if (m_finished) { finish_writes(grpc::Status::OK); } @@ -70,26 +70,26 @@ namespace rppgrpc::details template T> void on_next(T&& message) const { - std::lock_guard lock{owner.get().write_mutex}; - owner.get().write.push_back(std::forward(message)); - if (owner.get().write.size() == 1) - owner.get().start_write(owner.get().write.front()); + std::lock_guard lock{owner.get().m_write_mutex}; + owner.get().m_write.push_back(std::forward(message)); + if (owner.get().m_write.size() == 1) + owner.get().start_write(owner.get().m_write.front()); } void on_error(const std::exception_ptr&) const { - std::lock_guard lock{owner.get().write_mutex}; - owner.get().finished = true; + std::lock_guard lock{owner.get().m_write_mutex}; + owner.get().m_finished = true; - if (owner.get().write.size() == 0) + if (owner.get().m_write.size() == 0) owner.get().finish_writes(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); } void on_completed() const { - std::lock_guard lock{owner.get().write_mutex}; - owner.get().finished = true; + std::lock_guard lock{owner.get().m_write_mutex}; + owner.get().m_finished = true; - if (owner.get().write.size() == 0) + if (owner.get().m_write.size() == 0) owner.get().finish_writes(grpc::Status::OK); } @@ -100,9 +100,9 @@ namespace rppgrpc::details private: rpp::subjects::serialized_publish_subject m_subject{}; - std::mutex write_mutex{}; - std::deque write{}; - bool finished{}; + std::mutex m_write_mutex{}; + std::deque m_write{}; + bool m_finished{}; }; template @@ -137,7 +137,7 @@ namespace rppgrpc::details private: rpp::subjects::publish_subject m_observer; - TData m_data{}; + RPP_NO_UNIQUE_ADDRESS TData m_data{}; }; } // namespace rppgrpc::details diff --git a/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp b/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp index 74091acd4..f7cd503ad 100644 --- a/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp +++ b/src/extensions/rppqt/rppqt/schedulers/main_thread.hpp @@ -44,7 +44,7 @@ namespace rppqt::schedulers return; } - QTimer::singleShot(std::chrono::duration_cast(duration), application, [fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)]() mutable { + QTimer::singleShot(std::chrono::duration_cast(std::max(rpp::schedulers::duration{}, duration)), application, [fn = std::forward(fn), handler = std::forward(handler), ... args = std::forward(args)]() mutable { if (!handler.is_disposed()) invoke(std::move(fn), std::move(handler), std::move(args)...); }); diff --git a/src/rpp/rpp/defs.hpp b/src/rpp/rpp/defs.hpp index f8cc0f6b9..72e194874 100644 --- a/src/rpp/rpp/defs.hpp +++ b/src/rpp/rpp/defs.hpp @@ -16,10 +16,7 @@ #define RPP_EMPTY_BASES #endif -// MSVC has bad support for the no_unique_address from the box NOW, but provides specificator to enable full support. GCC/Clang works as expected -#if defined(_MSC_VER) && _MSC_FULL_VER >= 192829913 - #define RPP_NO_UNIQUE_ADDRESS [[msvc::no_unique_address]] -#elif defined(__APPLE__) && defined(__clang__) // apple works unexpectedly bad on clang 15 =C +#if defined(__APPLE__) && defined(__clang__) // apple works unexpectedly bad on clang 15 =C #define RPP_NO_UNIQUE_ADDRESS #else #define RPP_NO_UNIQUE_ADDRESS [[no_unique_address]] diff --git a/src/rpp/rpp/disposables/callback_disposable.hpp b/src/rpp/rpp/disposables/callback_disposable.hpp index bf6a7b501..2cfe2af09 100644 --- a/src/rpp/rpp/disposables/callback_disposable.hpp +++ b/src/rpp/rpp/disposables/callback_disposable.hpp @@ -40,7 +40,7 @@ namespace rpp void base_dispose_impl(interface_disposable::Mode) noexcept override { std::move(m_fn)(); } // NOLINT(bugprone-exception-escape) private: - Fn m_fn; + RPP_NO_UNIQUE_ADDRESS Fn m_fn; }; template diff --git a/src/rpp/rpp/disposables/details/container.hpp b/src/rpp/rpp/disposables/details/container.hpp index 1271f4600..8a4c84b5a 100644 --- a/src/rpp/rpp/disposables/details/container.hpp +++ b/src/rpp/rpp/disposables/details/container.hpp @@ -42,7 +42,7 @@ namespace rpp::details::disposables void dispose() const { - for (auto& d : m_data) + for (const auto& d : m_data) { d.dispose(); } diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index a108abee1..567afeebe 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -155,8 +155,8 @@ namespace rpp } private: - OriginalObservable m_original_observable; - Subject m_subject; + RPP_NO_UNIQUE_ADDRESS OriginalObservable m_original_observable; + Subject m_subject; struct state_t { diff --git a/src/rpp/rpp/observers/dynamic_observer.hpp b/src/rpp/rpp/observers/dynamic_observer.hpp index 408de7bbd..0ec799162 100644 --- a/src/rpp/rpp/observers/dynamic_observer.hpp +++ b/src/rpp/rpp/observers/dynamic_observer.hpp @@ -84,7 +84,7 @@ namespace rpp::details::observers } private: - TObs m_observer; + RPP_NO_UNIQUE_ADDRESS TObs m_observer; }; template diff --git a/src/rpp/rpp/observers/mock_observer.hpp b/src/rpp/rpp/observers/mock_observer.hpp index 358ef0711..98cf7e29e 100644 --- a/src/rpp/rpp/observers/mock_observer.hpp +++ b/src/rpp/rpp/observers/mock_observer.hpp @@ -21,7 +21,7 @@ class mock_observer_strategy final { public: explicit mock_observer_strategy(bool copy_values = true) - : m_state{std::make_shared(copy_values)} + : m_state{std::make_shared(copy_values)} { } @@ -57,9 +57,9 @@ class mock_observer_strategy final auto get_observer(rpp::composite_disposable_wrapper d) const { return rpp::observer_with_disposable>{std::move(d), *this}; } private: - struct State + struct state { - explicit State(bool copy_values) + explicit state(bool copy_values) : m_copy_values{copy_values} { } @@ -73,5 +73,5 @@ class mock_observer_strategy final std::vector vals{}; }; - std::shared_ptr m_state{}; + std::shared_ptr m_state{}; }; diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index be9737c97..b750b5e15 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -25,7 +25,7 @@ namespace rpp::operators::details template struct concat_inner_observer_strategy; - enum ConcatStage : uint8_t + enum class ConcatStage : uint8_t { None = 0, Draining = 1, diff --git a/src/rpp/rpp/operators/delay.hpp b/src/rpp/rpp/operators/delay.hpp index 69860f835..8e180e5b4 100644 --- a/src/rpp/rpp/operators/delay.hpp +++ b/src/rpp/rpp/operators/delay.hpp @@ -52,9 +52,9 @@ namespace rpp::operators::details } } - Observer observer; - RPP_NO_UNIQUE_ADDRESS Worker worker; - rpp::schedulers::duration delay; + RPP_NO_UNIQUE_ADDRESS Observer observer; + RPP_NO_UNIQUE_ADDRESS Worker worker; + rpp::schedulers::duration delay; std::mutex mutex{}; std::queue> queue; diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp index 78a4d6d3f..c010613ca 100644 --- a/src/rpp/rpp/operators/on_error_resume_next.hpp +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -17,55 +17,100 @@ namespace rpp::operators::details { + template + struct on_error_resume_next_disposable final : public rpp::composite_disposable + { + on_error_resume_next_disposable(TObserver&& observer) + : rpp::composite_disposable{} + , observer(std::move(observer)) + { + } + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + }; + + template + struct on_error_resume_next_inner_observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + std::shared_ptr observer; + + template + void on_next(T&& v) const + { + observer->on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + observer->on_error(err); + } + + void on_completed() const + { + observer->on_completed(); + } + + void set_upstream(const disposable_wrapper& d) { observer->set_upstream(d); } + + bool is_disposed() const { return observer->is_disposed(); } + }; + + template struct on_error_resume_next_observer_strategy { using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; - RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; - RPP_NO_UNIQUE_ADDRESS Selector selector; - // Manually control disposable to ensure observer is not used after move in on_error emission - mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); + on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector) + : state{init_state(std::move(observer))} + , selector{selector} + { + } - RPP_CALL_DURING_CONSTRUCTION( - observer.set_upstream(disposable);); + std::shared_ptr> state; + RPP_NO_UNIQUE_ADDRESS Selector selector; template void on_next(T&& v) const { - observer.on_next(std::forward(v)); + state->observer.on_next(std::forward(v)); } void on_error(const std::exception_ptr& err) const { - std::optional> selector_obs; try { - selector_obs.emplace(selector(err)); + selector(err).subscribe(on_error_resume_next_inner_observer_strategy{std::shared_ptr(state, &state->observer)}); } catch (...) { - observer.on_error(std::current_exception()); - } - if (selector_obs.has_value()) - { - std::move(selector_obs).value().subscribe(std::move(observer)); + state->observer.on_error(std::current_exception()); } - disposable.dispose(); + state->dispose(); } void on_completed() const { - observer.on_completed(); - disposable.dispose(); + state->observer.on_completed(); + state->dispose(); } void set_upstream(const disposable_wrapper& d) const { - disposable.add(d); + state->add(d); } - bool is_disposed() const { return disposable.is_disposed(); } + bool is_disposed() const { return state->is_disposed(); } + + static std::shared_ptr> init_state(TObserver&& observer) + { + const auto d = disposable_wrapper_impl>::make(std::move(observer)); + auto ptr = d.lock(); + ptr->observer.set_upstream(d.as_weak()); + return ptr; + } }; template diff --git a/src/rpp/rpp/operators/reduce.hpp b/src/rpp/rpp/operators/reduce.hpp index 1d9b74f83..fd15960ba 100644 --- a/src/rpp/rpp/operators/reduce.hpp +++ b/src/rpp/rpp/operators/reduce.hpp @@ -89,7 +89,8 @@ namespace rpp::operators::details void on_completed() const { - observer.on_next(std::move(seed).value()); + if (seed.has_value()) + observer.on_next(std::move(seed).value()); observer.on_completed(); } diff --git a/src/rpp/rpp/operators/retry_when.hpp b/src/rpp/rpp/operators/retry_when.hpp index f875e562b..3b0a9e0f1 100644 --- a/src/rpp/rpp/operators/retry_when.hpp +++ b/src/rpp/rpp/operators/retry_when.hpp @@ -96,19 +96,14 @@ namespace rpp::operators::details void on_error(const std::exception_ptr& err) const { - std::optional> notifier_obs; try { - notifier_obs.emplace(state->notifier(err)); + state->notifier(err).subscribe(retry_when_impl_inner_strategy{state}); } catch (...) { state->observer.on_error(std::current_exception()); } - if (notifier_obs.has_value()) - { - std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy{state}); - } } void on_completed() const diff --git a/src/rpp/rpp/operators/timeout.hpp b/src/rpp/rpp/operators/timeout.hpp index 9ad1a1800..9d8efda02 100644 --- a/src/rpp/rpp/operators/timeout.hpp +++ b/src/rpp/rpp/operators/timeout.hpp @@ -25,8 +25,8 @@ namespace rpp::operators::details public: struct observer_with_timeout { - TObserver observer; - rpp::schedulers::time_point timeout; + RPP_NO_UNIQUE_ADDRESS TObserver observer; + rpp::schedulers::time_point timeout; }; timeout_disposable(TObserver&& observer, rpp::schedulers::duration period, const TFallbackObservable& fallback, rpp::schedulers::time_point timeout) @@ -44,8 +44,8 @@ namespace rpp::operators::details private: rpp::utils::value_with_mutex m_observer_with_timeout; - const rpp::schedulers::duration m_period; - const TFallbackObservable m_fallback; + const rpp::schedulers::duration m_period; + RPP_NO_UNIQUE_ADDRESS const TFallbackObservable m_fallback; }; template diff --git a/src/rpp/rpp/operators/window.hpp b/src/rpp/rpp/operators/window.hpp index d93a59c63..87fcab903 100644 --- a/src/rpp/rpp/operators/window.hpp +++ b/src/rpp/rpp/operators/window.hpp @@ -52,7 +52,7 @@ namespace rpp::operators::details if (m_items_in_current_window == m_window_size) { Subject subject{m_disposable->wrapper_from_this()}; - m_subject_data.emplace(subject_data{subject.get_observer(), subject.get_disposable()}); + m_subject_data.emplace(subject.get_observer(), subject.get_disposable()); m_disposable->add(m_subject_data->disposable); m_observer.on_next(subject.get_observable()); m_items_in_current_window = 0; @@ -93,8 +93,16 @@ namespace rpp::operators::details struct subject_data { - decltype(std::declval().get_observer()) observer; - rpp::disposable_wrapper disposable; + using TObs = decltype(std::declval().get_observer()); + + subject_data(TObs&& obs, rpp::disposable_wrapper&& d) + : observer{std::move(obs)} + , disposable{std::move(d)} + { + } + + TObs observer; + rpp::disposable_wrapper disposable; }; mutable std::optional m_subject_data; diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index 68963c718..98fa622db 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -27,21 +27,21 @@ namespace rpp::operators::details { public: explicit with_latest_from_disposable(Observer&& observer, const TSelector& selector) - : observer_with_mutex{std::move(observer)} - , selector{selector} + : m_observer_with_mutex{std::move(observer)} + , m_selector{selector} { } - rpp::utils::pointer_under_lock get_observer_under_lock() { return observer_with_mutex; } + rpp::utils::pointer_under_lock get_observer_under_lock() { return m_observer_with_mutex; } - rpp::utils::tuple>...>& get_values() { return values; } + rpp::utils::tuple>...>& get_values() { return m_values; } - const TSelector& get_selector() const { return selector; } + const TSelector& get_selector() const { return m_selector; } private: - rpp::utils::value_with_mutex observer_with_mutex{}; - rpp::utils::tuple>...> values{}; - RPP_NO_UNIQUE_ADDRESS TSelector selector; + rpp::utils::value_with_mutex m_observer_with_mutex{}; + rpp::utils::tuple>...> m_values{}; + RPP_NO_UNIQUE_ADDRESS TSelector m_selector; }; template diff --git a/src/rpp/rpp/schedulers/computational.hpp b/src/rpp/rpp/schedulers/computational.hpp index 3f892bd15..ecde1be2a 100644 --- a/src/rpp/rpp/schedulers/computational.hpp +++ b/src/rpp/rpp/schedulers/computational.hpp @@ -31,8 +31,8 @@ namespace rpp::schedulers public: static auto create_worker() { - static thread_pool tp{}; - return tp.create_worker(); + static thread_pool s_tp{}; + return s_tp.create_worker(); } }; } // namespace rpp::schedulers diff --git a/src/rpp/rpp/schedulers/current_thread.hpp b/src/rpp/rpp/schedulers/current_thread.hpp index 295469684..94b0a2cbc 100644 --- a/src/rpp/rpp/schedulers/current_thread.hpp +++ b/src/rpp/rpp/schedulers/current_thread.hpp @@ -89,8 +89,8 @@ namespace rpp::schedulers static details::schedulables_queue*& get_queue() { - thread_local details::schedulables_queue* queue{}; - return queue; + thread_local details::schedulables_queue* s_queue{}; + return s_queue; } struct is_queue_is_empty diff --git a/src/rpp/rpp/schedulers/details/queue.hpp b/src/rpp/rpp/schedulers/details/queue.hpp index 26f424430..4bd73b526 100644 --- a/src/rpp/rpp/schedulers/details/queue.hpp +++ b/src/rpp/rpp/schedulers/details/queue.hpp @@ -168,7 +168,7 @@ namespace rpp::schedulers::details class optional_mutex { public: - optional_mutex() {} + optional_mutex() = default; optional_mutex(Mutex* mutex) : m_mutex{mutex} diff --git a/src/rpp/rpp/schedulers/test_scheduler.hpp b/src/rpp/rpp/schedulers/test_scheduler.hpp index 3bbaca6c5..6f29117d3 100644 --- a/src/rpp/rpp/schedulers/test_scheduler.hpp +++ b/src/rpp/rpp/schedulers/test_scheduler.hpp @@ -12,107 +12,111 @@ #include #include -inline rpp::schedulers::time_point s_current_time{std::chrono::seconds{10}}; - -class test_scheduler final +namespace rpp::schedulers { -public: - class worker_strategy; - struct state : public rpp::details::base_disposable + + class test_scheduler final { - state() = default; + public: + static inline rpp::schedulers::time_point s_current_time{std::chrono::seconds{10}}; - template Fn> - void schedule(rpp::schedulers::time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) - { - if (is_disposed()) - return; + class worker_strategy; - schedulings.push_back(time_point); - queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); - } - - void drain() + struct state : public rpp::details::base_disposable { - while (!queue.is_empty()) + state() = default; + + template Fn> + void schedule(rpp::schedulers::time_point time_point, Fn&& fn, Handler&& handler, Args&&... args) { - auto time_point = queue.top()->get_timepoint(); - if (time_point > s_current_time) + if (is_disposed()) return; - auto fn = queue.top(); - queue.pop(); - - if (fn->is_disposed()) - continue; + schedulings.push_back(time_point); + queue.emplace(time_point, std::forward(fn), std::forward(handler), std::forward(args)...); + } - executions.push_back(s_current_time); - if (auto new_timepoint = (*fn)()) + void drain() + { + while (!queue.is_empty()) { - if (!is_disposed()) + if (queue.top()->get_timepoint() > s_current_time) + return; + + auto fn = queue.top(); + queue.pop(); + + if (fn->is_disposed()) + continue; + + executions.push_back(s_current_time); + if (auto new_timepoint = (*fn)()) { - schedulings.push_back(std::max(s_current_time, new_timepoint.value())); - queue.emplace(schedulings.back(), std::move(fn)); + if (!is_disposed()) + { + schedulings.push_back(std::max(s_current_time, new_timepoint.value())); + queue.emplace(schedulings.back(), std::move(fn)); + } } } } - } - void base_dispose_impl(interface_disposable::Mode) noexcept override {} + void base_dispose_impl(interface_disposable::Mode) noexcept override {} - std::vector schedulings{}; - std::vector executions{}; - rpp::schedulers::details::schedulables_queue queue{}; - }; + std::vector schedulings{}; + std::vector executions{}; + rpp::schedulers::details::schedulables_queue queue{}; + }; - class worker_strategy - { - public: - worker_strategy(rpp::disposable_wrapper_impl state) - : m_state{std::move(state)} + class worker_strategy { - } + public: + worker_strategy(rpp::disposable_wrapper_impl state) + : m_state{std::move(state)} + { + } - template Fn> - void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const - { - if (auto locked = m_state.lock()) + template Fn> + void defer_for(rpp::schedulers::duration duration, Fn&& fn, Handler&& handler, Args&&... args) const { - if (!locked->is_disposed()) + if (auto locked = m_state.lock()) { - locked->schedule(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); - locked->drain(); + if (!locked->is_disposed()) + { + locked->schedule(now() + duration, std::forward(fn), std::forward(handler), std::forward(args)...); + locked->drain(); + } } } - } - static rpp::schedulers::time_point now() { return s_current_time; } + static rpp::schedulers::time_point now() { return s_current_time; } - rpp::disposable_wrapper get_disposable() const { return m_state.as_weak(); } + rpp::disposable_wrapper get_disposable() const { return m_state.as_weak(); } - private: - rpp::disposable_wrapper_impl m_state; - }; + private: + rpp::disposable_wrapper_impl m_state; + }; - test_scheduler() = default; + test_scheduler() = default; - rpp::schedulers::worker create_worker() const - { - return rpp::schedulers::worker{m_state.as_weak()}; - } + rpp::schedulers::worker create_worker() const + { + return rpp::schedulers::worker{m_state.as_weak()}; + } - const auto& get_schedulings() const { return m_state.lock()->schedulings; } - const auto& get_executions() const { return m_state.lock()->executions; } + const auto& get_schedulings() const { return m_state.lock()->schedulings; } + const auto& get_executions() const { return m_state.lock()->executions; } - static rpp::schedulers::time_point now() { return s_current_time; } + static rpp::schedulers::time_point now() { return s_current_time; } - void time_advance(rpp::schedulers::duration dur) const - { - s_current_time += dur; - m_state.lock()->drain(); - } + void time_advance(rpp::schedulers::duration dur) const + { + s_current_time += dur; + m_state.lock()->drain(); + } -private: - rpp::disposable_wrapper_impl m_state = rpp::disposable_wrapper_impl::make(); -}; + private: + rpp::disposable_wrapper_impl m_state = rpp::disposable_wrapper_impl::make(); + }; +} // namespace rpp::schedulers diff --git a/src/rpp/rpp/schedulers/thread_pool.hpp b/src/rpp/rpp/schedulers/thread_pool.hpp index d3579d589..6c4275813 100644 --- a/src/rpp/rpp/schedulers/thread_pool.hpp +++ b/src/rpp/rpp/schedulers/thread_pool.hpp @@ -14,6 +14,8 @@ #include +#include + namespace rpp::schedulers { /** @@ -74,16 +76,16 @@ namespace rpp::schedulers explicit state(size_t threads_count) { threads_count = std::max(size_t{1}, threads_count); - workers.reserve(threads_count); + m_workers.reserve(threads_count); for (size_t i = 0; i < threads_count; ++i) - workers.emplace_back(new_thread::create_worker()); + m_workers.emplace_back(new_thread::create_worker()); } - const original_worker& get() { return workers[index++ % workers.size()]; } + const original_worker& get() { return m_workers[m_index++ % m_workers.size()]; } private: - std::vector workers{}; - size_t index{}; + std::vector m_workers{}; + size_t m_index{}; }; std::shared_ptr m_state{}; diff --git a/src/rpp/rpp/utils/utils.hpp b/src/rpp/rpp/utils/utils.hpp index 0fef33617..09a82e2e9 100644 --- a/src/rpp/rpp/utils/utils.hpp +++ b/src/rpp/rpp/utils/utils.hpp @@ -90,19 +90,19 @@ namespace rpp::utils return std::all_of(std::cbegin(container), std::cend(container), fn); } - template + template requires std::is_member_function_pointer_v struct static_mem_fn { template - requires (inverse == false && std::invocable) + requires (Inverse == false && std::invocable) auto operator()(TT&& d) const { return (std::forward(d).*Fn)(); } template - requires (inverse == true && std::invocable) + requires (Inverse == true && std::invocable) auto operator()(TT&& d) const { return !(std::forward(d).*Fn)(); @@ -196,8 +196,8 @@ namespace rpp::utils iterator end() const { return {this, m_count}; } private: - T m_value; - size_t m_count; + RPP_NO_UNIQUE_ADDRESS T m_value; + size_t m_count; }; template @@ -245,7 +245,7 @@ namespace rpp::utils iterator end() const { return {nullptr}; } private: - T m_value; + RPP_NO_UNIQUE_ADDRESS T m_value; }; struct none_mutex @@ -306,8 +306,8 @@ namespace rpp::utils T& get_value_unsafe() { return m_value; } private: - T m_value{}; - std::mutex m_mutex{}; + RPP_NO_UNIQUE_ADDRESS T m_value{}; + std::mutex m_mutex{}; }; template diff --git a/src/tests/rpp/test_debounce.cpp b/src/tests/rpp/test_debounce.cpp index 6510a54ec..4aa9e8474 100644 --- a/src/tests/rpp/test_debounce.cpp +++ b/src/tests/rpp/test_debounce.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "disposable_observable.hpp" @@ -21,9 +22,9 @@ TEST_CASE("debounce emit only items where timeout reached") { - auto debounce_delay = std::chrono::seconds{2}; - test_scheduler scheduler{}; - auto start = s_current_time; + auto debounce_delay = std::chrono::seconds{2}; + rpp::schedulers::test_scheduler scheduler{}; + auto start = rpp::schedulers::test_scheduler::s_current_time; SECTION("subject of items and subscriber subscribed on it via debounce") { @@ -120,8 +121,18 @@ TEST_CASE("debounce emit only items where timeout reached") } } +TEST_CASE("debounce forwards error") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::error({}) | rpp::operators::debounce({}, rpp::schedulers::immediate{}) | rpp::ops::subscribe(mock); + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); +} + TEST_CASE("debounce satisfies disposable contracts") { - test_operator_with_disposable(rpp::ops::debounce(std::chrono::seconds{1}, test_scheduler{})); + test_operator_with_disposable(rpp::ops::debounce(std::chrono::seconds{1}, rpp::schedulers::test_scheduler{})); } diff --git a/src/tests/rpp/test_delay.cpp b/src/tests/rpp/test_delay.cpp index 64dbfdf3c..432b8e683 100644 --- a/src/tests/rpp/test_delay.cpp +++ b/src/tests/rpp/test_delay.cpp @@ -54,7 +54,7 @@ TEST_CASE("delay delays observable's emissions") { auto mock = mock_observer_strategy{}; std::chrono::milliseconds delay_duration{300}; - auto scheduler = test_scheduler{}; + auto scheduler = rpp::schedulers::test_scheduler{}; auto subscribe_with_delay = [&](auto get_now) { const auto now = get_now(); @@ -181,7 +181,7 @@ TEST_CASE("delay delays observable's emissions") { rpp::source::just(1) | rpp::ops::delay(delay_duration, scheduler) - | subscribe_with_delay([]() { return test_scheduler::worker_strategy::now(); }); + | subscribe_with_delay([]() { return rpp::schedulers::test_scheduler::worker_strategy::now(); }); SECTION("shouldn't see anything before manual invoking") { @@ -213,7 +213,7 @@ TEST_CASE("delay delays observable's emissions") obs.on_error({}); }) | rpp::ops::delay(delay_duration, scheduler) - | subscribe_with_delay([]() { return test_scheduler::worker_strategy::now(); }); + | subscribe_with_delay([]() { return rpp::schedulers::test_scheduler::worker_strategy::now(); }); SECTION("shouldn't see anything before manual invoking") { @@ -242,7 +242,7 @@ TEST_CASE("observe_on forward error immediately") { auto mock = mock_observer_strategy{}; std::chrono::milliseconds delay_duration{300}; - auto scheduler = test_scheduler{}; + auto scheduler = rpp::schedulers::test_scheduler{}; auto subscribe_with_delay = [&](auto get_now) { const auto now = get_now(); @@ -268,13 +268,13 @@ TEST_CASE("observe_on forward error immediately") SECTION("observable of -1-x but with invoking schedulable after subscription") { - const auto now = test_scheduler::worker_strategy::now(); + const auto now = rpp::schedulers::test_scheduler::worker_strategy::now(); rpp::source::create([](const auto& obs) { obs.on_next(1); obs.on_error({}); }) | rpp::ops::observe_on(scheduler, delay_duration) - | subscribe_with_delay([]() { return test_scheduler::worker_strategy::now(); }); + | subscribe_with_delay([]() { return rpp::schedulers::test_scheduler::worker_strategy::now(); }); SECTION("should see on_error immediately") { diff --git a/src/tests/rpp/test_interval.cpp b/src/tests/rpp/test_interval.cpp index 4471eab55..8f8c1d126 100644 --- a/src/tests/rpp/test_interval.cpp +++ b/src/tests/rpp/test_interval.cpp @@ -25,14 +25,14 @@ TEST_CASE("interval emit values with provided interval") { - auto scheduler = test_scheduler{}; + auto scheduler = rpp::schedulers::test_scheduler{}; auto mock = mock_observer_strategy{}; SECTION("interval observable") { auto interval = std::chrono::seconds{1}; auto obs = rpp::source::interval(interval, scheduler); - auto initial_time = test_scheduler::worker_strategy::now(); + auto initial_time = rpp::schedulers::test_scheduler::worker_strategy::now(); SECTION("subscribe on it via take 3") { @@ -91,7 +91,7 @@ TEST_CASE("interval emit values with provided interval") auto initial_delay = std::chrono::seconds{2}; auto interval = std::chrono::seconds{1}; auto obs = rpp::source::interval(initial_delay, interval, scheduler); - auto initial_time = test_scheduler::worker_strategy::now(); + auto initial_time = rpp::schedulers::test_scheduler::worker_strategy::now(); SECTION("subscribe on it via take 3") { diff --git a/src/tests/rpp/test_reduce.cpp b/src/tests/rpp/test_reduce.cpp index 9ea9e4669..f3fae9e19 100644 --- a/src/tests/rpp/test_reduce.cpp +++ b/src/tests/rpp/test_reduce.cpp @@ -13,6 +13,8 @@ #include #include +#include +#include #include #include "copy_count_tracker.hpp" @@ -69,6 +71,40 @@ TEMPLATE_TEST_CASE("reduce reduces values and store state", "", rpp::memory_mode } } +TEST_CASE("reduce forwards callbacks") +{ + auto mock = mock_observer_strategy{}; + SECTION("on_error") + { + rpp::source::error({}) + | rpp::ops::reduce(0, std::plus{}) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_error_count() == 1); + } + + SECTION("on_completed") + { + rpp::source::empty() + | rpp::ops::reduce(0, std::plus{}) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{0}); + CHECK(mock.get_on_completed_count() == 1); + } + + SECTION("on_completed no_seed") + { + rpp::source::empty() + | rpp::ops::reduce(std::plus{}) + | rpp::ops::subscribe(mock); + + CHECK(mock.get_received_values().empty()); + CHECK(mock.get_on_completed_count() == 1); + } +} + TEST_CASE("reduce doesn't produce extra copies") { SECTION("reduce([](verifier&& seed, auto&& v){return forward(v); }") diff --git a/src/tests/rpp/test_retry.cpp b/src/tests/rpp/test_retry.cpp index 450f5a549..d31d7b962 100644 --- a/src/tests/rpp/test_retry.cpp +++ b/src/tests/rpp/test_retry.cpp @@ -11,11 +11,15 @@ #include #include +#include #include +#include +#include #include #include #include #include +#include #include "copy_count_tracker.hpp" #include "disposable_observable.hpp" @@ -56,6 +60,15 @@ TEST_CASE("retry handles errors properly") observable | rpp::operators::retry(2) | rpp::operators::subscribe(mock); } + SECTION("retry(2) from another thread") + { + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::ops::subscribe_on(rpp::schedulers::new_thread{}) | rpp::operators::retry(2) | rpp::ops::as_blocking() | rpp::operators::subscribe(mock); + } SECTION("retry()") { @@ -121,6 +134,22 @@ TEST_CASE("retry handles errors properly") { REQUIRE_CALL(*mock, on_next_lvalue(1)).IN_SEQUENCE(seq); + observable | rpp::operators::retry() | rpp::operators::subscribe(mock); + } + } + SECTION("observable throws exception") + { + size_t i = 0; + const auto observable = rpp::source::create([&i](const auto& sub) { + if (i++) + throw 1; + sub.on_error({}); + }); + + SECTION("retry()") + { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + observable | rpp::operators::retry() | rpp::operators::subscribe(mock); } } diff --git a/src/tests/rpp/test_retry_when.cpp b/src/tests/rpp/test_retry_when.cpp index 4d5632ee3..0aa705171 100644 --- a/src/tests/rpp/test_retry_when.cpp +++ b/src/tests/rpp/test_retry_when.cpp @@ -186,6 +186,44 @@ TEST_CASE("retry_when resubscribes on notifier emission") CHECK(subscribe_count == 4 + 1); } + SECTION("callable throws exception") + { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { throw 1; return rpp::source::just(rpp::schedulers::new_thread{}, 1); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } + SECTION("callable return observable throwing exception") + { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable + | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::create([](const auto&) { throw 1; }); }) + | rpp::operators::as_blocking() + | rpp::operators::subscribe(mock); + + CHECK(subscribe_count == 1); + } + } + SECTION("observable throws exception") + { + size_t i = 0; + const auto observable = rpp::source::create([&i](const auto& sub) { + if (i++) + throw 1; + sub.on_error({}); + }); + + SECTION("retry()") + { + REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq); + + observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(rpp::schedulers::new_thread{}, 1); }) | rpp::operators::as_blocking() | rpp::operators::subscribe(mock); + } } } diff --git a/src/tests/rpp/test_scheduler.cpp b/src/tests/rpp/test_scheduler.cpp index 3d13936bd..386dd964f 100644 --- a/src/tests/rpp/test_scheduler.cpp +++ b/src/tests/rpp/test_scheduler.cpp @@ -349,13 +349,13 @@ TEST_CASE("Immediate scheduler") } } -TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, rpp::schedulers::new_thread) +TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, rpp::schedulers::new_thread, rpp::schedulers::thread_pool) { auto d = rpp::composite_disposable_wrapper::make(); auto mock_obs = mock_observer_strategy{}; auto obs = std::optional{mock_obs.get_observer(d).as_dynamic()}; - auto worker = std::optional{TestType::create_worker()}; + auto worker = std::optional{TestType{}.create_worker()}; if constexpr (std::same_as) CHECK(worker->get_disposable().is_disposed()); @@ -368,8 +368,8 @@ TEMPLATE_TEST_CASE("queue_based scheduler", "", rpp::schedulers::current_thread, worker->schedule([&](const auto&) { thread_of_schedule_promise.set_value(get_thread_id_as_string(std::this_thread::get_id())); - if constexpr (std::same_as) - thread_local rpp::utils::finally_action a{[done] { + if constexpr (!std::same_as) + thread_local rpp::utils::finally_action s_a{[done] { done->store(true); }}; else @@ -861,11 +861,11 @@ TEST_CASE("run_loop scheduler dispatches tasks only manually") TEST_CASE("different delaying strategies") { - test_scheduler scheduler{}; - auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); - auto advance = std::chrono::seconds{1}; - auto delay = advance * 2; - auto now = scheduler.now(); + rpp::schedulers::test_scheduler scheduler{}; + auto obs = mock_observer_strategy{}.get_observer().as_dynamic(); + auto advance = std::chrono::seconds{1}; + auto delay = advance * 2; + auto now = scheduler.now(); auto test = [&](auto res) { scheduler.create_worker().schedule([&, res](const auto&) { @@ -906,7 +906,7 @@ TEST_CASE("current_thread inside new_thread") auto done = std::make_shared(); worker->schedule([&](const auto&) { - thread_local rpp::utils::finally_action th{[done] { + thread_local rpp::utils::finally_action s_th{[done] { done->store(true); }}; return rpp::schedulers::optional_delay_from_now{}; diff --git a/src/tests/rpp/test_throttle.cpp b/src/tests/rpp/test_throttle.cpp index f385642c6..d441f2e4c 100644 --- a/src/tests/rpp/test_throttle.cpp +++ b/src/tests/rpp/test_throttle.cpp @@ -24,17 +24,17 @@ TEST_CASE("throttle throttles emissions") auto mock = mock_observer_strategy>{}; auto subj = rpp::subjects::publish_subject{}; const auto throttle_duration = std::chrono::seconds{2}; - subj.get_observable() | rpp::ops::throttle(throttle_duration) | rpp::ops::map([](int v) { return std::tuple{v, test_scheduler::now()}; }) | rpp::ops::subscribe(mock); + subj.get_observable() | rpp::ops::throttle(throttle_duration) | rpp::ops::map([](int v) { return std::tuple{v, rpp::schedulers::test_scheduler::now()}; }) | rpp::ops::subscribe(mock); SECTION("emiting second value forwards it immediately") { - const auto first_value_time = test_scheduler::now(); + const auto first_value_time = rpp::schedulers::test_scheduler::now(); subj.get_observer().on_next(1); CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); SECTION("emitting second value in throttle_duration/2 not forwards it") { - test_scheduler{}.time_advance(throttle_duration / 2); + rpp::schedulers::test_scheduler{}.time_advance(throttle_duration / 2); subj.get_observer().on_next(2); CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}}); @@ -42,10 +42,10 @@ TEST_CASE("throttle throttles emissions") CHECK(mock.get_on_completed_count() == 0); SECTION("emitting third value in throttle_duration/2+throttle_duration/2 forwards it") { - test_scheduler{}.time_advance(throttle_duration / 2); + rpp::schedulers::test_scheduler{}.time_advance(throttle_duration / 2); subj.get_observer().on_next(3); - CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}, std::tuple{3, test_scheduler::now()}}); + CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}, std::tuple{3, rpp::schedulers::test_scheduler::now()}}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); } @@ -66,19 +66,19 @@ TEST_CASE("throttle throttles emissions") } SECTION("emitting second value in throttle_duration forwards it") { - test_scheduler{}.time_advance(throttle_duration); + rpp::schedulers::test_scheduler{}.time_advance(throttle_duration); subj.get_observer().on_next(2); - CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}, std::tuple{2, test_scheduler::now()}}); + CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}, std::tuple{2, rpp::schedulers::test_scheduler::now()}}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); } SECTION("emitting second value in 3/2*throttle_duration forwards it") { - test_scheduler{}.time_advance(throttle_duration / 2 * 3); + rpp::schedulers::test_scheduler{}.time_advance(throttle_duration / 2 * 3); subj.get_observer().on_next(2); - CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}, std::tuple{2, test_scheduler::now()}}); + CHECK(mock.get_received_values() == std::vector{std::tuple{1, first_value_time}, std::tuple{2, rpp::schedulers::test_scheduler::now()}}); CHECK(mock.get_on_error_count() == 0); CHECK(mock.get_on_completed_count() == 0); } diff --git a/src/tests/rpp/test_timeout.cpp b/src/tests/rpp/test_timeout.cpp index 025dfee46..b356d1c43 100644 --- a/src/tests/rpp/test_timeout.cpp +++ b/src/tests/rpp/test_timeout.cpp @@ -25,7 +25,7 @@ TEST_CASE("timeout subscribes to passed observable in case of reaching timeout") { - auto scheduler = test_scheduler{}; + auto scheduler = rpp::schedulers::test_scheduler{}; auto mock = mock_observer_strategy{}; const auto now = scheduler.now(); @@ -154,7 +154,23 @@ TEST_CASE("timeout handles current_thread scheduling") CHECK(mock.get_on_completed_count() == 1); } +TEST_CASE("timeout handles exception from fallback") +{ + auto mock = mock_observer_strategy{}; + + rpp::source::never() + | rpp::operators::timeout(std::chrono::seconds{1}, rpp::source::create([](const auto&) { + throw 1; + }), + rpp::schedulers::current_thread{}) + | rpp::operators::subscribe(mock); + + CHECK(mock.get_received_values() == std::vector{}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); +} + TEST_CASE("timeout satisfies disposable contracts") { - test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{10000000}, test_scheduler{})); + test_operator_with_disposable(rpp::ops::timeout(std::chrono::seconds{10000000}, rpp::schedulers::test_scheduler{})); } diff --git a/src/tests/rpp/test_timer.cpp b/src/tests/rpp/test_timer.cpp index 508fd74e0..13e078ecc 100644 --- a/src/tests/rpp/test_timer.cpp +++ b/src/tests/rpp/test_timer.cpp @@ -19,8 +19,8 @@ TEST_CASE("timer emit single value at provided duration") { - auto scheduler = test_scheduler{}; - auto scheduler2 = test_scheduler{}; + auto scheduler = rpp::schedulers::test_scheduler{}; + auto scheduler2 = rpp::schedulers::test_scheduler{}; auto mock = mock_observer_strategy{}; auto mock2 = mock_observer_strategy{}; @@ -78,7 +78,7 @@ TEST_CASE("timer emit single value at provided duration") TEST_CASE("timer emit single value at provided time_point") { - auto scheduler = test_scheduler{}; + auto scheduler = rpp::schedulers::test_scheduler{}; auto mock = mock_observer_strategy{}; SECTION("timer observable") diff --git a/src/tests/rppqt/test_from_signal.cpp b/src/tests/rppqt/test_from_signal.cpp index e83aa45af..5894e2038 100644 --- a/src/tests/rppqt/test_from_signal.cpp +++ b/src/tests/rppqt/test_from_signal.cpp @@ -10,32 +10,32 @@ #include -struct TestQObject : public QObject +struct test_q_object : public QObject { Q_OBJECT public: using QObject::QObject; - void EmitSingleValueSignal(int v) + void emit_single_value_signal(int v) { - emit SingleValueSignal(v); + emit single_value_signal(v); } - void EmitMultipleValueSignal(int v, double d, const std::string& s) + void emit_multiple_value_signal(int v, double d, const std::string& s) { - emit MultipleValueSignal(v, d, s); + emit multiple_value_signal(v, d, s); } - void EmitNoValueSignal() + void emit_no_value_signal() { - emit NoValueSignal(); + emit no_value_signal(); } Q_SIGNALS: - void SingleValueSignal(int v); - void MultipleValueSignal(int v, double d, const std::string& s); - void NoValueSignal(); + void single_value_signal(int v); + void multiple_value_signal(int v, double d, const std::string& s); + void no_value_signal(); }; #include @@ -53,13 +53,13 @@ TEST_CASE("from_signal can see object value from object signal") SECTION("qobject with signal with 1 argument and observable from signal from this object") { mock_observer_strategy mock_observer{}; - auto testobject = std::make_unique(); - auto obs = rppqt::source::from_signal(*testobject, &TestQObject::SingleValueSignal); + auto testobject = std::make_unique(); + auto obs = rppqt::source::from_signal(*testobject, &test_q_object::single_value_signal); SECTION("emit signal, subscribe on it and emit signal") { - testobject->EmitSingleValueSignal(1); + testobject->emit_single_value_signal(1); obs.subscribe(mock_observer); - testobject->EmitSingleValueSignal(2); + testobject->emit_single_value_signal(2); SECTION("subscriber sees only emission after subscribe") { CHECK(mock_observer.get_received_values() == std::vector{2}); @@ -100,12 +100,12 @@ TEST_CASE("from_signal sends tuple if multiple values") SECTION("object with signal with multiple values and observable from this signal") { mock_observer_strategy> mock_observer{}; - auto testobject = std::make_unique(); - auto obs = rppqt::source::from_signal(*testobject, &TestQObject::MultipleValueSignal); + auto testobject = std::make_unique(); + auto obs = rppqt::source::from_signal(*testobject, &test_q_object::multiple_value_signal); SECTION("subscribe on it and emit signal") { obs.subscribe(mock_observer); - testobject->EmitMultipleValueSignal(1, 2, "31"); + testobject->emit_multiple_value_signal(1, 2, "31"); SECTION("subscriber sees values") { CHECK(mock_observer.get_received_values() == std::vector>{std::tuple{1, 2.0, "31"}}); @@ -121,13 +121,13 @@ TEST_CASE("from_signal sends special struct if no args in signal") SECTION("object with signal with zero values and observable from this signal") { mock_observer_strategy mock_observer{}; - auto testobject = std::make_unique(); - auto obs = rppqt::source::from_signal(*testobject, &TestQObject::NoValueSignal); + auto testobject = std::make_unique(); + auto obs = rppqt::source::from_signal(*testobject, &test_q_object::no_value_signal); SECTION("subscribe on it and emit signal") { obs.subscribe(mock_observer); - testobject->EmitNoValueSignal(); + testobject->emit_no_value_signal(); SECTION("subscriber sees values") { CHECK(mock_observer.get_received_values().size() == 1); diff --git a/src/tests/rppqt/test_main_thread_scheduler.cpp b/src/tests/rppqt/test_main_thread_scheduler.cpp index f2fa25c0e..05a06cdf4 100644 --- a/src/tests/rppqt/test_main_thread_scheduler.cpp +++ b/src/tests/rppqt/test_main_thread_scheduler.cpp @@ -71,32 +71,50 @@ TEST_CASE("main_thread_scheduler schedules actions to main thread") REQUIRE(future.wait_for(std::chrono::seconds{1}) == std::future_status::timeout); } - SECTION("recursive scheduling to main thread") - { - std::string execution{}; - std::thread{[&] { - rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now { - rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now { - execution += "inner "; - return {}; + auto test_recursive_scheduling = [&](const auto& duration) { + SECTION("recursive scheduling to main thread") + { + std::string execution{}; + std::thread{[&] { + rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) { + rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now { + execution += "inner "; + return {}; + }, + observer); + + const bool first_run = execution.empty(); + execution += "outer "; + return first_run ? duration : std::nullopt; }, observer); + }}.join(); - const bool first_run = execution.empty(); - execution += "outer "; - return first_run ? rpp::schedulers::optional_delay_from_now{std::chrono::nanoseconds{}} : std::nullopt; - }, - observer); - }}.join(); - - application.exec(); - CHECK(execution == "outer inner outer inner "); + application.exec(); + CHECK(execution == "outer inner outer inner "); + } + }; + SECTION("optional_delay_from_now") + { + test_recursive_scheduling(rpp::schedulers::optional_delay_from_now{std::chrono::nanoseconds{}}); } - - SECTION("scheduler can be applied for all types of schedulables") + SECTION("optional_delay_from_this_timepoint") + { + test_recursive_scheduling(rpp::schedulers::optional_delay_from_this_timepoint{std::chrono::nanoseconds{}}); + } + SECTION("optional_delay_to") { - rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now { return std::nullopt; }, observer); - rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_this_timepoint { return std::nullopt; }, observer); - rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) -> rpp::schedulers::optional_delay_to { return std::nullopt; }, observer); + test_recursive_scheduling(rpp::schedulers::optional_delay_to{rpp::schedulers::time_point{}}); } } + +TEST_CASE("no application") +{ + mock_observer_strategy mock{}; + rppqt::schedulers::main_thread_scheduler::create_worker().schedule([&](const auto&) -> rpp::schedulers::optional_delay_from_now { + return {}; + }, + mock); + + CHECK(mock.get_on_error_count() == 1); +} diff --git a/src/tests/utils/copy_count_tracker.hpp b/src/tests/utils/copy_count_tracker.hpp index 05ccee242..666df1ec9 100644 --- a/src/tests/utils/copy_count_tracker.hpp +++ b/src/tests/utils/copy_count_tracker.hpp @@ -22,28 +22,28 @@ class copy_count_tracker { public: copy_count_tracker() - : _state(std::make_shared()) + : m_state(std::make_shared()) { } copy_count_tracker(const copy_count_tracker& other) - : _state{other._state} + : m_state{other.m_state} { - ++_state->copy_count; + ++m_state->copy_count; } copy_count_tracker(copy_count_tracker&& other) noexcept - : _state{other._state} // NOLINT(performance-move-constructor-init) + : m_state{other.m_state} // NOLINT(performance-move-constructor-init) { - ++_state->move_count; + ++m_state->move_count; } copy_count_tracker& operator=(const copy_count_tracker& other) { if (this == &other) return *this; - _state = other._state; - ++_state->copy_count; + m_state = other.m_state; + ++m_state->copy_count; return *this; } @@ -51,14 +51,14 @@ class copy_count_tracker { if (this == &other) return *this; - _state = other._state; - ++_state->move_count; + m_state = other.m_state; + ++m_state->move_count; return *this; } bool operator==(const copy_count_tracker& other) const { - return _state == other._state; + return m_state == other.m_state; } bool operator!=(const copy_count_tracker& other) const { return !(*this == other); } @@ -112,11 +112,11 @@ class copy_count_tracker } } - int get_copy_count() const { return _state->copy_count; } - int get_move_count() const { return _state->move_count; } + int get_copy_count() const { return m_state->copy_count; } + int get_move_count() const { return m_state->move_count; } private: - std::shared_ptr _state; + std::shared_ptr m_state; }; namespace std diff --git a/src/tests/utils/rpp_trompeloil.hpp b/src/tests/utils/rpp_trompeloil.hpp index c2c817960..124e15f10 100644 --- a/src/tests/utils/rpp_trompeloil.hpp +++ b/src/tests/utils/rpp_trompeloil.hpp @@ -48,26 +48,26 @@ class mock_observer MAKE_MOCK0(on_completed, void(), const); }; - impl_t& operator*() const noexcept { return *impl; } + impl_t& operator*() const noexcept { return *m_impl; } void on_next(const T& v) const noexcept { - impl->on_next_lvalue(v); + m_impl->on_next_lvalue(v); } void on_next(T&& v) const noexcept { - impl->on_next_rvalue(std::move(v)); + m_impl->on_next_rvalue(std::move(v)); } - void on_error(const std::exception_ptr& err) const noexcept { impl->on_error(err); } - void on_completed() const noexcept { impl->on_completed(); } + void on_error(const std::exception_ptr& err) const noexcept { m_impl->on_error(err); } + void on_completed() const noexcept { m_impl->on_completed(); } static bool is_disposed() noexcept { return false; } static void set_upstream(const rpp::disposable_wrapper&) noexcept {} private: - std::shared_ptr impl = std::make_shared(); + std::shared_ptr m_impl = std::make_shared(); }; inline void wait(const std::unique_ptr& e)