Skip to content

Commit

Permalink
extend coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino committed Aug 24, 2024
1 parent 983bab8 commit 1955f1f
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 25 deletions.
83 changes: 64 additions & 19 deletions src/rpp/rpp/operators/on_error_resume_next.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,55 +17,100 @@

namespace rpp::operators::details
{
template<rpp::constraint::observer TObserver>
struct on_error_resume_next_disposable final : public rpp::composite_disposable
{
on_error_resume_next_disposable(TObserver&& observer)
: rpp::composite_disposable{}
, observer(std::move(observer))
{
}

RPP_NO_UNIQUE_ADDRESS TObserver observer;
};

template<rpp::constraint::observer TObserver>
struct on_error_resume_next_inner_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

std::shared_ptr<TObserver> observer;

template<typename T>
void on_next(T&& v) const
{
observer->on_next(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
observer->on_error(err);
}

void on_completed() const
{
observer->on_completed();
}

void set_upstream(const disposable_wrapper& d) { observer->set_upstream(d); }

bool is_disposed() const { return observer->is_disposed(); }
};


template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Selector>
struct on_error_resume_next_observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

RPP_NO_UNIQUE_ADDRESS mutable TObserver observer;
RPP_NO_UNIQUE_ADDRESS Selector selector;
// Manually control disposable to ensure observer is not used after move in on_error emission
mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make();
on_error_resume_next_observer_strategy(TObserver&& observer, const Selector& selector)
: state{init_state(std::move(observer))}
, selector{selector}
{
}

RPP_CALL_DURING_CONSTRUCTION(
observer.set_upstream(disposable););
std::shared_ptr<on_error_resume_next_disposable<TObserver>> state;
RPP_NO_UNIQUE_ADDRESS Selector selector;

template<typename T>
void on_next(T&& v) const
{
observer.on_next(std::forward<T>(v));
state->observer.on_next(std::forward<T>(v));
}

void on_error(const std::exception_ptr& err) const
{
std::optional<std::invoke_result_t<Selector, std::exception_ptr>> selector_obs;
try
{
selector_obs.emplace(selector(err));
selector(err).subscribe(on_error_resume_next_inner_observer_strategy<TObserver>{std::shared_ptr<TObserver>(state, &state->observer)});
}
catch (...)
{
observer.on_error(std::current_exception());
}
if (selector_obs.has_value())
{
std::move(selector_obs).value().subscribe(std::move(observer));
state->observer.on_error(std::current_exception());
}
disposable.dispose();
state->dispose();
}

void on_completed() const
{
observer.on_completed();
disposable.dispose();
state->observer.on_completed();
state->dispose();
}

void set_upstream(const disposable_wrapper& d) const
{
disposable.add(d);
state->add(d);
}

bool is_disposed() const { return disposable.is_disposed(); }
bool is_disposed() const { return state->is_disposed(); }

static std::shared_ptr<on_error_resume_next_disposable<TObserver>> init_state(TObserver&& observer)
{
const auto d = disposable_wrapper_impl<on_error_resume_next_disposable<TObserver>>::make(std::move(observer));
auto ptr = d.lock();
ptr->observer.set_upstream(d.as_weak());
return ptr;
}
};

template<rpp::constraint::decayed_type Selector>
Expand Down
7 changes: 1 addition & 6 deletions src/rpp/rpp/operators/retry_when.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,14 @@ namespace rpp::operators::details

void on_error(const std::exception_ptr& err) const
{
std::optional<std::invoke_result_t<TNotifier, std::exception_ptr>> notifier_obs;
try
{
notifier_obs.emplace(state->notifier(err));
state->notifier(err).subscribe(retry_when_impl_inner_strategy<TObserver, TObservable, TNotifier>{state});
}
catch (...)
{
state->observer.on_error(std::current_exception());
}
if (notifier_obs.has_value())
{
std::move(notifier_obs).value().subscribe(retry_when_impl_inner_strategy<TObserver, TObservable, TNotifier>{state});
}
}

void on_completed() const
Expand Down
11 changes: 11 additions & 0 deletions src/tests/rpp/test_debounce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/debounce.hpp>
#include <rpp/schedulers/test_scheduler.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/subjects/publish_subject.hpp>

#include "disposable_observable.hpp"
Expand Down Expand Up @@ -120,6 +121,16 @@ TEST_CASE("debounce emit only items where timeout reached")
}
}

TEST_CASE("debounce forwards error")
{
auto mock = mock_observer_strategy<int>{};

rpp::source::error<int>({}) | rpp::operators::debounce({}, rpp::schedulers::immediate{}) | rpp::ops::subscribe(mock);
CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}


TEST_CASE("debounce satisfies disposable contracts")
{
Expand Down
26 changes: 26 additions & 0 deletions src/tests/rpp/test_reduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include <rpp/observers/mock_observer.hpp>
#include <rpp/operators/reduce.hpp>
#include <rpp/sources/empty.hpp>
#include <rpp/sources/error.hpp>
#include <rpp/sources/from.hpp>

#include "copy_count_tracker.hpp"
Expand Down Expand Up @@ -69,6 +71,30 @@ TEMPLATE_TEST_CASE("reduce reduces values and store state", "", rpp::memory_mode
}
}

TEST_CASE("reduce forwards callbacks")
{
auto mock = mock_observer_strategy<int>{};
SECTION("on_error")
{
rpp::source::error<int>({})
| rpp::ops::reduce(0, std::plus<int>{})
| rpp::ops::subscribe(mock);

CHECK(mock.get_received_values().empty());
CHECK(mock.get_on_error_count() == 1);
}

SECTION("on_completed")
{
rpp::source::empty<int>()
| rpp::ops::reduce(0, std::plus<int>{})
| rpp::ops::subscribe(mock);

CHECK(mock.get_received_values().empty());
CHECK(mock.get_on_completed_count() == 1);
}
}

TEST_CASE("reduce doesn't produce extra copies")
{
SECTION("reduce([](verifier&& seed, auto&& v){return forward(v); }")
Expand Down
38 changes: 38 additions & 0 deletions src/tests/rpp/test_retry_when.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,44 @@ TEST_CASE("retry_when resubscribes on notifier emission")

CHECK(subscribe_count == 4 + 1);
}
SECTION("callable throws exception")
{
REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq);

observable
| rpp::operators::retry_when([](const std::exception_ptr&) { throw 1; return rpp::source::just(rpp::schedulers::new_thread{}, 1); })
| rpp::operators::as_blocking()
| rpp::operators::subscribe(mock);

CHECK(subscribe_count == 1);
}
SECTION("callable return observable throwing exception")
{
REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq);

observable
| rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::create<int>([](const auto&) { throw 1; }); })
| rpp::operators::as_blocking()
| rpp::operators::subscribe(mock);

CHECK(subscribe_count == 1);
}
}
SECTION("observable throws exception")
{
size_t i = 0;
const auto observable = rpp::source::create<std::string>([&i](const auto& sub) {
if (i++)
throw 1;
sub.on_error({});
});

SECTION("retry()")
{
REQUIRE_CALL(*mock, on_error(trompeloeil::_)).IN_SEQUENCE(seq);

observable | rpp::operators::retry_when([](const std::exception_ptr&) { return rpp::source::just(rpp::schedulers::new_thread{}, 1); }) | rpp::operators::as_blocking() | rpp::operators::subscribe(mock);
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/tests/rpp/test_timeout.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ TEST_CASE("timeout handles current_thread scheduling")
CHECK(mock.get_on_completed_count() == 1);
}

TEST_CASE("timeout handles exception from fallback")
{
auto mock = mock_observer_strategy<int>{};

rpp::source::never<int>()
| rpp::operators::timeout(std::chrono::seconds{1}, rpp::source::create<int>([](const auto&) {
throw 1;
}),
rpp::schedulers::current_thread{})
| rpp::operators::subscribe(mock);

CHECK(mock.get_received_values() == std::vector<int>{});
CHECK(mock.get_on_error_count() == 1);
CHECK(mock.get_on_completed_count() == 0);
}

TEST_CASE("timeout satisfies disposable contracts")
{
test_operator_with_disposable<int>(rpp::ops::timeout(std::chrono::seconds{10000000}, rpp::schedulers::test_scheduler{}));
Expand Down

0 comments on commit 1955f1f

Please sign in to comment.