Skip to content

Commit

Permalink
Merge pull request #491 from sfiligoi/is2409-opt-cv
Browse files Browse the repository at this point in the history
Add back the condition_variable based LockedQueue for low-thread setups
  • Loading branch information
ch4rr0 authored Nov 15, 2024
2 parents 4cf8d52 + f5a6221 commit b7e97e9
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 14 deletions.
8 changes: 7 additions & 1 deletion bt2_search.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4716,8 +4716,14 @@ static void multiseedSearch(
EList<int> tids;
EList<std::thread*> threads(nthreads);
EList<thread_tracking_pair> tps;
// The condition_variable synchronization can be problematic
// in certain situations.
// Disabling it and using the polling-based lock-free mechanism can help there
// The (relative) polling cost is much higher for low thread count, so use that as a treshold
bool readahead_useCVLocks = nthreads<=16; // Note: We may want to consider other factors, too

// Important: Need at least nthreads+1 elements, more is OK
PatternSourceReadAheadFactory readahead_factory(patsrc,pp,4*nthreads+1);
PatternSourceReadAheadFactory readahead_factory(patsrc,pp,4*nthreads+1,readahead_useCVLocks);
multiseed_readahead_factory = &readahead_factory;

tps.resize(std::max(nthreads, thread_ceiling));
Expand Down
144 changes: 131 additions & 13 deletions pat.h
Original file line number Diff line number Diff line change
Expand Up @@ -1298,10 +1298,10 @@ class PatternSourceReadAheadFactory {

PatternSourceReadAheadFactory(
PatternComposer& composer,
const PatternParams& pp, size_t n) :
const PatternParams& pp, size_t n, bool useCV) :
psfact_(composer,pp),
psq_ready_(),
psq_idle_(psfact_,n),
psq_ready_(useCV),
psq_idle_(useCV,psfact_,n),
asynct_(readAsync, this) {}

~PatternSourceReadAheadFactory() {
Expand All @@ -1325,9 +1325,75 @@ class PatternSourceReadAheadFactory {
private:

template <typename T>
class LockedQueue {
class LockedQueueCV {
public:
virtual ~LockedQueue() {}
virtual ~LockedQueueCV() {}

bool empty() {
bool ret = false;
{
std::unique_lock<std::mutex> lk(m_);
ret = q_.empty();
}
return ret;
}

void push(T& ps) {
std::unique_lock<std::mutex> lk(m_);
q_.push(ps);
cv_.notify_all();
}

// wait for data, if none in the queue
T pop() {
T ret;
{
std::unique_lock<std::mutex> lk(m_);
cv_.wait(lk, [this] { return !q_.empty();});
ret = q_.front();
q_.pop();
}
return ret;
}

protected:
std::mutex m_;
std::condition_variable cv_;
std::queue<T> q_;
};

class LockedPSQueueCV : public LockedQueueCV<PatternSourcePerThread*> {
public:
LockedPSQueueCV(PatternSourcePerThreadFactory& psfact, size_t n) :
LockedQueueCV<PatternSourcePerThread*>() {
for (size_t i=0; i<n; i++) {
q_.push(psfact.create());
}
}

virtual ~LockedPSQueueCV() {
while (!q_.empty()) {
delete q_.front();
q_.pop();
}
}
};

class LockedREQueueCV : public LockedQueueCV<ReadElement> {
public:
virtual ~LockedREQueueCV() {
// we actually own ps while in the queue
while (!q_.empty()) {
delete q_.front().ps;
q_.pop();
}
}
};

template <typename T>
class LockedQueueMC {
public:
virtual ~LockedQueueMC() {}

bool empty() {
return q_.size_approx() == 0;
Expand All @@ -1346,21 +1412,19 @@ class PatternSourceReadAheadFactory {
}

protected:
// std::mutex m_;
// std::condition_variable cv_;
ConcurrentQueue<T> q_;
};

class LockedPSQueue: public LockedQueue<PatternSourcePerThread*> {
class LockedPSQueueMC: public LockedQueueMC<PatternSourcePerThread*> {
public:
LockedPSQueue(PatternSourcePerThreadFactory& psfact, size_t n) :
LockedQueue<PatternSourcePerThread*>() {
LockedPSQueueMC(PatternSourcePerThreadFactory& psfact, size_t n) :
LockedQueueMC<PatternSourcePerThread*>() {
for (size_t i=0; i<n; i++) {
q_.enqueue(psfact.create());
}
}

virtual ~LockedPSQueue() {
virtual ~LockedPSQueueMC() {
PatternSourcePerThread *item;

while (q_.size_approx() != 0) {
Expand All @@ -1371,9 +1435,9 @@ class PatternSourceReadAheadFactory {

};

class LockedREQueue : public LockedQueue<ReadElement> {
class LockedREQueueMC : public LockedQueueMC<ReadElement> {
public:
virtual ~LockedREQueue() {
virtual ~LockedREQueueMC() {
// we actually own ps while in the queue
ReadElement item;

Expand All @@ -1385,6 +1449,60 @@ class PatternSourceReadAheadFactory {
}
};

template <typename T, typename Q1, typename Q2>
class LockedQueueDuo {
public:
LockedQueueDuo(Q1 *pq1, Q2 *pq2)
: pq1_(pq1), pq2_(pq2)
{
assert( (pq1_!=NULL) || (pq2_!=NULL) );
}

virtual ~LockedQueueDuo() {
if (pq1_!=NULL) delete pq1_;
if (pq2_!=NULL) delete pq2_;
}

bool empty() {
return (pq1_!=NULL) ? pq1_->empty() : pq2_->empty();
}

void push(T& ps) {
if (pq1_!=NULL) {
pq1_->push(ps);
} else {
pq2_->push(ps);
}
}

T pop() {
return (pq1_!=NULL) ? pq1_->pop() : pq2_->pop();
}

protected:
// owned, and exactly one should be not NULL
Q1 *pq1_;
Q2 *pq2_;
};

class LockedPSQueue: public LockedQueueDuo<PatternSourcePerThread*, LockedPSQueueCV, LockedPSQueueMC> {
public:
LockedPSQueue(bool useCV, PatternSourcePerThreadFactory& psfact, size_t n) :
LockedQueueDuo<PatternSourcePerThread*, LockedPSQueueCV, LockedPSQueueMC>(
useCV ? new LockedPSQueueCV(psfact,n) : NULL,
useCV ? NULL : new LockedPSQueueMC(psfact,n)
) {}
};

class LockedREQueue: public LockedQueueDuo<ReadElement, LockedREQueueCV, LockedREQueueMC> {
public:
LockedREQueue(bool useCV) :
LockedQueueDuo<ReadElement, LockedREQueueCV, LockedREQueueMC>(
useCV ? new LockedREQueueCV() : NULL,
useCV ? NULL : new LockedREQueueMC()
) {}
};

static void readAsync(PatternSourceReadAheadFactory *obj) {
LockedREQueue &psq_ready = obj->psq_ready_;
LockedPSQueue &psq_idle = obj->psq_idle_;
Expand Down

0 comments on commit b7e97e9

Please sign in to comment.