Skip to content

Commit

Permalink
Execute on_earlier_wakeup only on main thread. ref #19
Browse files Browse the repository at this point in the history
  • Loading branch information
tetsurom committed Mar 16, 2019
1 parent f5373b6 commit bb2138c
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 23 deletions.
43 changes: 25 additions & 18 deletions include/rxqt_run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@
#include <rxcpp/rx.hpp>
#include <QTimer>
#include <QThread>
#include <QSignalMapper>

namespace rxqt {

class run_loop
class run_loop: public QObject
{
public:
run_loop(QObject *parent = Q_NULLPTR) : timer(parent), threadId(QThread::currentThreadId())
run_loop(QObject *parent = Q_NULLPTR) : QObject(parent), threadId(QThread::currentThreadId())
{
timer = new QTimer(this);
mapper = new QSignalMapper(this);
// Give the RxCpp run loop a a function to let us schedule a wakeup in order to dispatch run loop events
rxcpp_run_loop.set_notify_earlier_wakeup([this](auto const& when) { on_earlier_wakeup(when); });
timer.setSingleShot(true);
timer.setTimerType(Qt::PreciseTimer);
rxcpp_run_loop.set_notify_earlier_wakeup([this](auto const& when) { this->on_earlier_wakeup(this->ms_until(when).count()); });
timer->setSingleShot(true);
timer->setTimerType(Qt::PreciseTimer);
// When the timer expires, we'll flush the run loop
timer.connect(&timer, &QTimer::timeout, [this]() { on_event_scheduled(); });
timer->connect(timer, &QTimer::timeout, this, &run_loop::on_event_scheduled);
mapper->connect(mapper, static_cast<void (QSignalMapper::*)(int)>(&QSignalMapper::mapped), this, &run_loop::on_earlier_wakeup);
}

rxcpp::schedulers::scheduler get_scheduler() const
Expand All @@ -37,21 +41,23 @@ class run_loop
}

private:
void on_earlier_wakeup(std::chrono::steady_clock::time_point when)
void on_earlier_wakeup(int msec)
{
// Tell the timer to wake-up at `when` if its not already waking up earlier
const auto ms_till_task = ms_until(when);
if (!timer.isActive() || ms_till_task.count() < timer.remainingTime())
if (threadId == QThread::currentThreadId())
{
if (threadId == QThread::currentThreadId())
const int remainingTime = timer->remainingTime();
if (remainingTime < 0 || msec < remainingTime)
{
timer.start(ms_till_task.count());
}
else
{
QMetaObject::invokeMethod(&timer, "start", Qt::QueuedConnection, Q_ARG(int, ms_till_task.count()));
timer->start(msec);
}
}
else
{
mapper->setMapping(this, msec);
mapper->map(this);
mapper->removeMappings(this);
}
}

// Flush the RxCpp run loop
Expand All @@ -66,14 +72,14 @@ class run_loop
if (!rxcpp_run_loop.empty())
{
const auto time_till_next_event = ms_until(rxcpp_run_loop.peek().when);
timer.start(static_cast<int>(time_till_next_event.count()));
timer->start(static_cast<int>(time_till_next_event.count()));
}
}

// Calculate milliseconds from now until `when`
std::chrono::milliseconds ms_until(rxcpp::schedulers::run_loop::clock_type::time_point const& when) const
{
return ceil<std::chrono::milliseconds>(when - rxcpp_run_loop.now());
return (std::max)(ceil<std::chrono::milliseconds>(when - rxcpp_run_loop.now()), std::chrono::milliseconds::zero());
}

// Round the specified duration to the smallest number of `To` ticks that's not less than `duration`
Expand All @@ -85,8 +91,9 @@ class run_loop
}

rxcpp::schedulers::run_loop rxcpp_run_loop;
QTimer timer;
QTimer* timer;
Qt::HANDLE threadId;
QSignalMapper* mapper;
};

} // namespace rxqt
Expand Down
133 changes: 128 additions & 5 deletions test/run_loop/run_loop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,46 @@

namespace rx {
using namespace rxcpp;
using namespace rxcpp::subjects;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::schedulers;
using namespace rxcpp::util;
}

using namespace std::chrono;

class RunLoopTest : public QObject
{
Q_OBJECT

private:
void flush(const rxqt::run_loop& rl) const
void flush(const rxqt::run_loop& runLoop) const
{
while(!rl.empty())
while(!runLoop.empty())
{
qApp->processEvents();
}
}

template<class F>
void wait_until(F f, milliseconds timeout = milliseconds(1000))
{
QTimer timer;
timer.setSingleShot(true);
timer.setTimerType(Qt::PreciseTimer);
timer.start(timeout.count());
while(!f() && timer.remainingTime() >= 0)
{
qApp->processEvents();
}
QVERIFY(timer.remainingTime() >= 0);
}

private slots:

void empty()
{
using namespace std::chrono;
rxqt::run_loop runLoop;

QVERIFY(runLoop.empty());
Expand All @@ -35,20 +52,126 @@ private slots:
QVERIFY(true);
}

void timer()
{
rxqt::run_loop runLoop;
bool called = false;

rx::observable<>::timer(milliseconds(5)).subscribe([&](auto){ called = true; });
QVERIFY(!called);

flush(runLoop);
QVERIFY(called);
}

void thread()
{
rxqt::run_loop runLoop;

auto mainThread = runLoop.observe_on_run_loop();
auto mainThreadId = QThread::currentThreadId();
bool called = false;

rx::observable<>::range(1)
.subscribe_on(rx::observe_on_event_loop())
.take(1)
.tap([=](auto){ QVERIFY(mainThreadId != QThread::currentThreadId()); })
.observe_on(mainThread)
.subscribe([=](auto){ QVERIFY(mainThreadId == QThread::currentThreadId()); });

.subscribe([&](auto){ called = true; QVERIFY(mainThreadId == QThread::currentThreadId()); });

wait_until([&](){ return called; });
flush(runLoop);
}

void subscribe_on_synchronize_new_thread()
{
rxqt::run_loop runLoop;
bool called = false;

auto mainthread = runLoop.observe_on_run_loop();
auto workthread = rx::synchronize_new_thread();

rx::observable<>::timer(milliseconds(5))
.subscribe_on(workthread)
.observe_on(mainthread)
.subscribe([&](auto){ called = true; });

QVERIFY(runLoop.empty());
QVERIFY(!called);

wait_until([&](){ return called; });

flush(runLoop);
}

void subscribe_on_observe_on_event_loop()
{
rxqt::run_loop runLoop;
bool called = false;

auto mainthread = runLoop.observe_on_run_loop();
auto workthread = rx::observe_on_event_loop();

rx::observable<>::timer(milliseconds(5))
.subscribe_on(workthread)
.observe_on(mainthread)
.subscribe([&](auto){ called = true; });

QVERIFY(runLoop.empty());
QVERIFY(!called);

wait_until([&](){ return called; });

flush(runLoop);
}

void observe_on_synchronize_new_thread()
{
rxqt::run_loop runLoop;
bool called = false;

auto workthread = rx::synchronize_new_thread();

rx::observable<>::timer(milliseconds(5))
.observe_on(workthread)
.subscribe([&](auto){ called = true; });

QVERIFY(!runLoop.empty());
QVERIFY(!called);

wait_until([&](){ return called; });

flush(runLoop);
}

void observe_on_observe_on_event_loop()
{
rxqt::run_loop runLoop;
bool called = false;

auto workthread = rx::observe_on_event_loop();

rx::observable<>::timer(milliseconds(5))
.observe_on(workthread)
.subscribe([&](auto){ called = true; });

QVERIFY(!runLoop.empty());
QVERIFY(!called);

wait_until([&](){ return called; });

flush(runLoop);
}

void sample_with_time()
{
rxqt::run_loop runLoop;

rx::observable<>::range(1, 10000)
.subscribe_on(rx::synchronize_new_thread())
.sample_with_time(milliseconds(5), runLoop.observe_on_run_loop())
.subscribe([](int) {});

flush(runLoop);
}

Expand Down

0 comments on commit bb2138c

Please sign in to comment.