Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino committed Dec 11, 2022
1 parent 7eea102 commit 5371122
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
16 changes: 15 additions & 1 deletion src/tests/rpp/test_debounce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ SCENARIO("debounce emit only items where timeout reached", "[operators][debounce
GIVEN("subject of items and subscriber subscribed on it via debounce")
{
auto mock = mock_observer<int>{};
auto subj = rpp::subjects::publish_subject<int>{};
std::optional<rpp::subjects::publish_subject<int>> optional_subj{rpp::subjects::publish_subject<int>{}};
auto& subj = optional_subj.value();
subj.get_observable().debounce(debounce_delay, scheduler).subscribe(mock);
WHEN("emit value")
{
Expand Down Expand Up @@ -96,6 +97,19 @@ SCENARIO("debounce emit only items where timeout reached", "[operators][debounce
}
}
}
AND_WHEN("subject destoryed and then schedulable reaches schedulable")
{
optional_subj.reset();
scheduler.time_advance(debounce_delay);
THEN("emission reached mock")
{
CHECK(scheduler.get_schedulings() == std::vector{start+debounce_delay});
CHECK(scheduler.get_executions() == std::vector{start+debounce_delay});
CHECK(mock.get_received_values() == std::vector{1});
CHECK(mock.get_on_error_count() == 0);
CHECK(mock.get_on_completed_count() == 0);
}
}
}
}
}
14 changes: 14 additions & 0 deletions src/tests/rpp/test_delay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#include "mock_observer.hpp"
#include <test_scheduler.hpp>

#include <catch2/catch_test_macros.hpp>
#include <rpp/operators/delay.hpp>
Expand Down Expand Up @@ -148,5 +149,18 @@ SCENARIO("delay mirrors both source observable and trigger observable", "[delay]
}
}
}
WHEN("subscribe on subject via delay via test_scheduler, sent value")
{
subj.get_observable()
.delay(std::chrono::seconds{30000}, test_scheduler{})
.subscribe(mock);

subj.get_subscriber().on_next(1);

AND_THEN("no memory leak")
{
// checked via sanitizer
}
}
}
}
13 changes: 8 additions & 5 deletions src/tests/utils/test_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,26 @@ class test_scheduler final : public rpp::schedulers::details::scheduler_tag
class worker_strategy
{
public:
worker_strategy(std::shared_ptr<state> state)
worker_strategy(std::weak_ptr<state> state)
: m_state{state} { }

void defer_at(rpp::schedulers::time_point time_point,
rpp::schedulers::constraint::schedulable_fn auto&& fn) const
{
if (m_state->sub.is_subscribed())
if (auto locked = m_state.lock())
{
m_state->schedule(time_point, std::forward<decltype(fn)>(fn));
m_state->drain();
if (locked->sub.is_subscribed())
{
locked->schedule(time_point, std::forward<decltype(fn)>(fn));
locked->drain();
}
}
}

static rpp::schedulers::time_point now() { return s_current_time; }

private:
std::shared_ptr<state> m_state;
std::weak_ptr<state> m_state;
};

test_scheduler() {}
Expand Down

0 comments on commit 5371122

Please sign in to comment.