diff --git a/include/rxqt_run_loop.hpp b/include/rxqt_run_loop.hpp index cfc24fc..29eda25 100644 --- a/include/rxqt_run_loop.hpp +++ b/include/rxqt_run_loop.hpp @@ -5,20 +5,24 @@ #include #include #include +#include namespace rxqt { -class run_loop +class run_loop: public QObject { public: - run_loop(QObject *parent = Q_NULLPTR) : timer(parent), threadId(QThread::currentThreadId()) + run_loop(QObject *parent = Q_NULLPTR) : QObject(parent), threadId(QThread::currentThreadId()) { + timer = new QTimer(this); + mapper = new QSignalMapper(this); // Give the RxCpp run loop a a function to let us schedule a wakeup in order to dispatch run loop events - rxcpp_run_loop.set_notify_earlier_wakeup([this](auto const& when) { on_earlier_wakeup(when); }); - timer.setSingleShot(true); - timer.setTimerType(Qt::PreciseTimer); + rxcpp_run_loop.set_notify_earlier_wakeup([this](auto const& when) { this->on_earlier_wakeup(this->ms_until(when).count()); }); + timer->setSingleShot(true); + timer->setTimerType(Qt::PreciseTimer); // When the timer expires, we'll flush the run loop - timer.connect(&timer, &QTimer::timeout, [this]() { on_event_scheduled(); }); + timer->connect(timer, &QTimer::timeout, this, &run_loop::on_event_scheduled); + mapper->connect(mapper, static_cast(&QSignalMapper::mapped), this, &run_loop::on_earlier_wakeup); } rxcpp::schedulers::scheduler get_scheduler() const @@ -37,21 +41,23 @@ class run_loop } private: - void on_earlier_wakeup(std::chrono::steady_clock::time_point when) + void on_earlier_wakeup(int msec) { // Tell the timer to wake-up at `when` if its not already waking up earlier - const auto ms_till_task = ms_until(when); - if (!timer.isActive() || ms_till_task.count() < timer.remainingTime()) + if (threadId == QThread::currentThreadId()) { - if (threadId == QThread::currentThreadId()) + const int remainingTime = timer->remainingTime(); + if (remainingTime < 0 || msec < remainingTime) { - timer.start(ms_till_task.count()); - } - else - { - QMetaObject::invokeMethod(&timer, "start", Qt::QueuedConnection, Q_ARG(int, ms_till_task.count())); + timer->start(msec); } } + else + { + mapper->setMapping(this, msec); + mapper->map(this); + mapper->removeMappings(this); + } } // Flush the RxCpp run loop @@ -66,14 +72,14 @@ class run_loop if (!rxcpp_run_loop.empty()) { const auto time_till_next_event = ms_until(rxcpp_run_loop.peek().when); - timer.start(static_cast(time_till_next_event.count())); + timer->start(static_cast(time_till_next_event.count())); } } // Calculate milliseconds from now until `when` std::chrono::milliseconds ms_until(rxcpp::schedulers::run_loop::clock_type::time_point const& when) const { - return ceil(when - rxcpp_run_loop.now()); + return (std::max)(ceil(when - rxcpp_run_loop.now()), std::chrono::milliseconds::zero()); } // Round the specified duration to the smallest number of `To` ticks that's not less than `duration` @@ -85,8 +91,9 @@ class run_loop } rxcpp::schedulers::run_loop rxcpp_run_loop; - QTimer timer; + QTimer* timer; Qt::HANDLE threadId; + QSignalMapper* mapper; }; } // namespace rxqt diff --git a/test/run_loop/run_loop_test.cpp b/test/run_loop/run_loop_test.cpp index ec30535..784ae58 100644 --- a/test/run_loop/run_loop_test.cpp +++ b/test/run_loop/run_loop_test.cpp @@ -3,29 +3,46 @@ namespace rx { using namespace rxcpp; + using namespace rxcpp::subjects; using namespace rxcpp::sources; using namespace rxcpp::operators; + using namespace rxcpp::schedulers; using namespace rxcpp::util; } +using namespace std::chrono; + class RunLoopTest : public QObject { Q_OBJECT private: - void flush(const rxqt::run_loop& rl) const + void flush(const rxqt::run_loop& runLoop) const { - while(!rl.empty()) + while(!runLoop.empty()) { qApp->processEvents(); } } + template + void wait_until(F f, milliseconds timeout = milliseconds(1000)) + { + QTimer timer; + timer.setSingleShot(true); + timer.setTimerType(Qt::PreciseTimer); + timer.start(timeout.count()); + while(!f() && timer.remainingTime() >= 0) + { + qApp->processEvents(); + } + QVERIFY(timer.remainingTime() >= 0); + } + private slots: void empty() { - using namespace std::chrono; rxqt::run_loop runLoop; QVERIFY(runLoop.empty()); @@ -35,20 +52,126 @@ private slots: QVERIFY(true); } + void timer() + { + rxqt::run_loop runLoop; + bool called = false; + + rx::observable<>::timer(milliseconds(5)).subscribe([&](auto){ called = true; }); + QVERIFY(!called); + + flush(runLoop); + QVERIFY(called); + } + void thread() { rxqt::run_loop runLoop; auto mainThread = runLoop.observe_on_run_loop(); auto mainThreadId = QThread::currentThreadId(); + bool called = false; rx::observable<>::range(1) .subscribe_on(rx::observe_on_event_loop()) .take(1) .tap([=](auto){ QVERIFY(mainThreadId != QThread::currentThreadId()); }) .observe_on(mainThread) - .subscribe([=](auto){ QVERIFY(mainThreadId == QThread::currentThreadId()); }); - + .subscribe([&](auto){ called = true; QVERIFY(mainThreadId == QThread::currentThreadId()); }); + + wait_until([&](){ return called; }); + flush(runLoop); + } + + void subscribe_on_synchronize_new_thread() + { + rxqt::run_loop runLoop; + bool called = false; + + auto mainthread = runLoop.observe_on_run_loop(); + auto workthread = rx::synchronize_new_thread(); + + rx::observable<>::timer(milliseconds(5)) + .subscribe_on(workthread) + .observe_on(mainthread) + .subscribe([&](auto){ called = true; }); + + QVERIFY(runLoop.empty()); + QVERIFY(!called); + + wait_until([&](){ return called; }); + + flush(runLoop); + } + + void subscribe_on_observe_on_event_loop() + { + rxqt::run_loop runLoop; + bool called = false; + + auto mainthread = runLoop.observe_on_run_loop(); + auto workthread = rx::observe_on_event_loop(); + + rx::observable<>::timer(milliseconds(5)) + .subscribe_on(workthread) + .observe_on(mainthread) + .subscribe([&](auto){ called = true; }); + + QVERIFY(runLoop.empty()); + QVERIFY(!called); + + wait_until([&](){ return called; }); + + flush(runLoop); + } + + void observe_on_synchronize_new_thread() + { + rxqt::run_loop runLoop; + bool called = false; + + auto workthread = rx::synchronize_new_thread(); + + rx::observable<>::timer(milliseconds(5)) + .observe_on(workthread) + .subscribe([&](auto){ called = true; }); + + QVERIFY(!runLoop.empty()); + QVERIFY(!called); + + wait_until([&](){ return called; }); + + flush(runLoop); + } + + void observe_on_observe_on_event_loop() + { + rxqt::run_loop runLoop; + bool called = false; + + auto workthread = rx::observe_on_event_loop(); + + rx::observable<>::timer(milliseconds(5)) + .observe_on(workthread) + .subscribe([&](auto){ called = true; }); + + QVERIFY(!runLoop.empty()); + QVERIFY(!called); + + wait_until([&](){ return called; }); + + flush(runLoop); + } + + void sample_with_time() + { + rxqt::run_loop runLoop; + + rx::observable<>::range(1, 10000) + .subscribe_on(rx::synchronize_new_thread()) + .sample_with_time(milliseconds(5), runLoop.observe_on_run_loop()) + .subscribe([](int) {}); + flush(runLoop); }