Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
[ENGINE] Remove additional state, more invariant check.
Browse files Browse the repository at this point in the history
[BUGFIX] Conditional variable modification need to be mutex locked
  • Loading branch information
tqchen committed Sep 23, 2015
1 parent b6e8eb9 commit 539a1af
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 40 deletions.
4 changes: 4 additions & 0 deletions src/common/object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ObjectPool {
* Currently defined to be 4KB.
*/
constexpr static std::size_t kPageSize = 1 << 12;
/*! \brief internal mutex */
std::mutex m_;
/*!
* \brief Head of free list.
Expand Down Expand Up @@ -147,6 +148,9 @@ ObjectPool<T>::ObjectPool() {
template <typename T>
void ObjectPool<T>::AllocateChunk() {
static_assert(sizeof(LinkedList) <= kPageSize, "Object too big.");
static_assert(sizeof(LinkedList) % alignof(LinkedList) == 0, "ObjectPooll Invariant");
static_assert(alignof(LinkedList) % alignof(T) == 0, "ObjectPooll Invariant");
static_assert(kPageSize % alignof(LinkedList) == 0, "ObjectPooll Invariant");
void* new_chunk_ptr;
int ret = posix_memalign(&new_chunk_ptr, kPageSize, kPageSize);
CHECK_EQ(ret, 0) << "Allocation failed";
Expand Down
94 changes: 64 additions & 30 deletions src/engine/threaded_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,49 @@ ThreadedVar::ThreadedVar(VersionedVarBlock* head) : head_{head} {

void ThreadedVar::AppendReadDependency(OprBlock* opr_block) {
std::lock_guard<std::mutex> lock{m_};
if (ready_to_read_) {
assert(pending_write_ == nullptr);
if (pending_write_ == nullptr) {
// invariant: is_ready_to_read()
CHECK_GE(num_pending_reads_, 0);
// STATE CHANGE
++num_pending_reads_;
--opr_block->wait;
// decrease wait counter
opr_block->decr_wait();
} else {
auto&& new_var_block = VersionedVarBlock::New();
assert(head_->next == nullptr);
assert(head_->trigger == nullptr);
assert(head_->write == false);
// append things to next.
head_->next = new_var_block;
head_->trigger = opr_block;
head_ = new_var_block;
}
}

void ThreadedVar::AppendWriteDependency(OprBlock* opr_block) {
std::lock_guard<std::mutex> lock{m_};
auto&& new_var_block = VersionedVarBlock::New();
std::lock_guard<std::mutex> lock{m_};
// invariant.
assert(head_->next == nullptr);
assert(head_->trigger == nullptr);
assert(head_->write == false);
// attach to head.
head_->next = new_var_block;
head_->trigger = opr_block;
head_->write = true;
if (ready_to_read_) {
// Raise `num_pending_reads_` temporarily to avoid premature triggering.
++num_pending_reads_;

// check if it is ready to write
if (pending_write_ == nullptr) {
// invariant: is_ready_to_read()
pending_write_ = head_;
if (--num_pending_reads_ == 0) {
--opr_block->wait;
CHECK_GE(num_pending_reads_, 0);
if (num_pending_reads_ == 0) {
// STATE CHANGE
opr_block->decr_wait();
num_pending_reads_ = kWriteTriggered;
}
ready_to_read_ = false;
} else {
CHECK_NE(num_pending_reads_, 0);
}
head_ = new_var_block;
}
Expand All @@ -70,13 +84,17 @@ void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) {
{
// this is lock scope
std::lock_guard<std::mutex> lock{m_};
CHECK_GT(num_pending_reads_, 0);

if (--num_pending_reads_ == 0) {
if (pending_write_ != nullptr && --pending_write_->trigger->wait == 0) {
if (pending_write_ != nullptr) {
// STATE CHANGE
trigger = pending_write_->trigger;
num_pending_reads_ = kWriteTriggered;
}
}
}
if (trigger != nullptr) {
if (trigger != nullptr && trigger->decr_wait() == 0) {
dispatcher(trigger);
}
}
Expand All @@ -88,33 +106,39 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
OprBlock* trigger_write = nullptr;
{
std::lock_guard<std::mutex> lock{m_};
assert(ready_to_read_ == false);
// invariants
assert(head_->next == nullptr);
assert(pending_write_ != nullptr);
CHECK_EQ(num_pending_reads_, kWriteTriggered);

// really delete
if (to_delete_) {
VersionedVarBlock *head = pending_write_->next;
VersionedVarBlock::Delete(pending_write_);
assert(head->next == nullptr);
assert(head_ == head);
VersionedVarBlock::Delete(head);
return true;
}
// detach pending write
old_pending_write = pending_write_;
// search for chains to trigger
end_of_read_chain = old_pending_write->next;
assert(num_pending_reads_ == 0);
while (end_of_read_chain->next != nullptr &&
// reset to 0 pending reads
num_pending_reads_ = 0;
while (end_of_read_chain != head_ &&
end_of_read_chain->write == false) {
++num_pending_reads_;
end_of_read_chain = end_of_read_chain->next;
}
// check the states
if (end_of_read_chain->next == nullptr) {
ready_to_read_ = true;
if (end_of_read_chain == head_) {
pending_write_ = nullptr;
} else {
// check if there is pending reads, if not trigger write
assert(end_of_read_chain->write == true);
pending_write_ = end_of_read_chain;
if (num_pending_reads_ == 0 && --end_of_read_chain->trigger->wait == 0) {
if (num_pending_reads_ == 0) {
// mark write as already actived in this var
num_pending_reads_ = kWriteTriggered;
trigger_write = end_of_read_chain->trigger;
}
}
Expand All @@ -129,15 +153,15 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
VersionedVarBlock::Delete(old_pending_write);
// dispatch all the events
while (cur_head != end_of_read_chain) {
if (--cur_head->trigger->wait == 0) {
if (cur_head->trigger->decr_wait() == 0) {
dispatcher(cur_head->trigger);
}
auto prev = cur_head;
cur_head = cur_head->next;
assert(cur_head != nullptr);
VersionedVarBlock::Delete(prev);
}
if (trigger_write != nullptr) {
if (trigger_write != nullptr && trigger_write->decr_wait() == 0) {
dispatcher(trigger_write);
}
return false;
Expand All @@ -150,7 +174,7 @@ void ThreadedVar::SetToDelete() {

bool ThreadedVar::ready_to_read() {
std::lock_guard<std::mutex> lock{m_};
return ready_to_read_;
return this->is_ready_to_read();
}

// implementation of threaded engine
Expand Down Expand Up @@ -232,8 +256,10 @@ void ThreadedEngine::Push(OprHandle op, Context exec_ctx) {
ThreadedOpr* threaded_opr = ThreadedOpr::CastFromBase(op);
OprBlock* opr_block = OprBlock::New();
opr_block->opr = threaded_opr;
opr_block->wait.store(threaded_opr->const_vars.size() +
threaded_opr->mutable_vars.size() + 1);

opr_block->wait.store(static_cast<int>(
threaded_opr->const_vars.size() +
threaded_opr->mutable_vars.size() + 1));
opr_block->ctx = exec_ctx;
++pending_;
// Add read dependencies.
Expand All @@ -244,7 +270,7 @@ void ThreadedEngine::Push(OprHandle op, Context exec_ctx) {
for (auto&& i : threaded_opr->mutable_vars) {
i->AppendWriteDependency(opr_block);
}
if (--opr_block->wait == 0) {
if (opr_block->decr_wait() == 0) {
this->PushToExecute(opr_block, true);
}
}
Expand Down Expand Up @@ -275,7 +301,10 @@ void ThreadedEngine::WaitForVar(VarHandle var) {
if (threaded_var->ready_to_read()) return;
std::atomic<bool> done{false};
this->PushSync([this, &done](RunContext) {
done.store(true);
{
std::unique_lock<std::mutex> lock{finished_m_};
done.store(true);
}
finished_cv_.notify_all();
}, Context::CPU(), {var}, {}, FnProperty::kNormal);
{
Expand Down Expand Up @@ -310,12 +339,17 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) {
ThreadedVar::Delete(i);
}
}
int npending;
{
std::unique_lock<std::mutex> lock{finished_m_};
if (--pending_ == 0) {
finished_cv_.notify_all();
}
npending = --pending_;
}
CHECK_GE(npending, 0);
if (npending == 0) {
// no need to grab lock when notify.
finished_cv_.notify_all();
}

// delte operator if it is temperory
if (threaded_opr->temporary) {
ThreadedOpr::Delete(threaded_opr);
Expand Down
40 changes: 30 additions & 10 deletions src/engine/threaded_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,23 @@ struct OprBlock : public common::ObjectPoolAllocatable<OprBlock> {
/*!
* \brief wait number of pending tasks this OprBlock is waiting for.
*/
std::atomic<std::size_t> wait{0};
std::atomic<int> wait{0};
/*! \brief Pointer to information on performing real operation */
ThreadedOpr* opr{nullptr};
/*! \brief The context this operator */
Context ctx;
// define possible debug information
DEFINE_ENGINE_DEBUG_INFO(OprBlock);
/*!
* \brief call this function to decrease the wait counter.
* \return the wait counter after the decreasement.
*/
inline int decr_wait() {
// chack invariant, avoid over trigger
int ret = --wait;
CHECK_GE(ret, 0);
return ret;
}
}; // struct OprBlock

/*!
Expand Down Expand Up @@ -141,8 +151,11 @@ class ThreadedVar final : public Var,
// TODO(hotpxl) consider rename head
/*! \brief inetrnal mutex of the ThreadedVar */
std::mutex m_;
/*! \brief number of pending reads operation in the variable. */
std::size_t num_pending_reads_{0};
/*!
* \brief number of pending reads operation in the variable.
* will be marked as -1 when there is a already triggered pending write.
*/
int num_pending_reads_{0};
/*!
* \brief Points to the last VersionedVarBlock in the queue.
* head_ always points to a empty VersionedVarBlock.
Expand All @@ -158,14 +171,19 @@ class ThreadedVar final : public Var,
* This is actually the head(oldest operation) in the queue.
*/
VersionedVarBlock* pending_write_{nullptr};
/*!
* \brief If true, then there are no running or pending write on this variable.
*/
bool ready_to_read_{true};
/*!
* \brief If true, delete after operation completes.
*/
bool to_delete_{false};
/*! \brief special const on num_pending_reads_ to mark write being triggered */
static constexpr int kWriteTriggered = -1;
/*!
* \brief derived invariant of ready to ready, without lock.
* \return whether the current variable is ready to read.
*/
inline bool is_ready_to_read() const {
return pending_write_ == nullptr;
}
}; // struct ThreadedVar

/*!
Expand Down Expand Up @@ -230,8 +248,10 @@ class ThreadedEngine : public Engine {

ThreadedEngine() {}
~ThreadedEngine() {
// notify all pending waiters
kill_.store(true);
{
std::unique_lock<std::mutex> lock{finished_m_};
kill_.store(true);
}
finished_cv_.notify_all();
}

Expand Down Expand Up @@ -291,7 +311,7 @@ class ThreadedEngine : public Engine {
/*!
* \brief Number of pending operations.
*/
std::atomic<std::size_t> pending_{0};
std::atomic<int> pending_{0};
/*! \brief whether we want to kill the waiters */
std::atomic<bool> kill_{false};
/*! \brief whether it is during shutdown phase*/
Expand Down

0 comments on commit 539a1af

Please sign in to comment.