Skip to content

Commit

Permalink
Support more than one CScheduler thread for serial clients
Browse files Browse the repository at this point in the history
This will be used by CValidationInterface soon.

This requires a bit of work as we need to ensure that most of our
callbacks happen in-order (to avoid synchronization issues in
wallet) - we keep our own internal queue and push things onto it,
scheduling a queue-draining function immediately upon new
callbacks.
  • Loading branch information
TheBlueMatt committed Jul 7, 2017
1 parent 2fbf2db commit 08096bb
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 10 deletions.
52 changes: 52 additions & 0 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
}
return result;
}


void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
{
LOCK(m_cs_callbacks_pending);
// Try to avoid scheduling too many copies here, but if we
// accidentally have two ProcessQueue's scheduled at once its
// not a big deal.
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
}
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
}

void SingleThreadedSchedulerClient::ProcessQueue() {
std::function<void (void)> callback;
{
LOCK(m_cs_callbacks_pending);
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
m_are_callbacks_running = true;

callback = std::move(m_callbacks_pending.front());
m_callbacks_pending.pop_front();
}

// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
SingleThreadedSchedulerClient* instance;
RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
~RAIICallbacksRunning() {
{
LOCK(instance->m_cs_callbacks_pending);
instance->m_are_callbacks_running = false;
}
instance->MaybeScheduleProcessQueue();
}
} raiicallbacksrunning(this);

callback();
}

void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
assert(m_pscheduler);

{
LOCK(m_cs_callbacks_pending);
m_callbacks_pending.emplace_back(std::move(func));
}
MaybeScheduleProcessQueue();
}
24 changes: 24 additions & 0 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <boost/thread.hpp>
#include <map>

#include "sync.h"

//
// Simple class for background tasks that should be run
// periodically or once "after a while"
Expand Down Expand Up @@ -79,4 +81,26 @@ class CScheduler
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};

/**
* Class used by CScheduler clients which may schedule multiple jobs
* which are required to be run serially. Does not require such jobs
* to be executed on the same thread, but no two jobs will be executed
* at the same time.
*/
class SingleThreadedSchedulerClient {
private:
CScheduler *m_pscheduler;

CCriticalSection m_cs_callbacks_pending;
std::list<std::function<void (void)>> m_callbacks_pending;
bool m_are_callbacks_running = false;

void MaybeScheduleProcessQueue();
void ProcessQueue();

public:
SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
void AddToProcessQueue(std::function<void (void)> func);
};

#endif
22 changes: 14 additions & 8 deletions src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
#include "validationinterface.h"
#include "init.h"
#include "scheduler.h"
#include "sync.h"
#include "util.h"

#include <list>
#include <atomic>

#include <boost/signals2/signal.hpp>

Expand All @@ -20,22 +25,23 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;

CScheduler *m_scheduler = NULL;
// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;

MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
};

static CMainSignals g_signals;

CMainSignals::CMainSignals() {
m_internals.reset(new MainSignalsInstance());
}

void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) {
assert(!m_internals->m_scheduler);
m_internals->m_scheduler = &scheduler;
assert(!m_internals);
m_internals.reset(new MainSignalsInstance(&scheduler));
}

void CMainSignals::UnregisterBackgroundSignalScheduler() {
m_internals->m_scheduler = NULL;
m_internals.reset(nullptr);
}

CMainSignals& GetMainSignals()
Expand Down
2 changes: 0 additions & 2 deletions src/validationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ class CMainSignals {
friend void ::UnregisterAllValidationInterfaces();

public:
CMainSignals();

/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
Expand Down

0 comments on commit 08096bb

Please sign in to comment.