Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

[ENGINE] Remove additional state, more invariant check, bugfix #138

Merged
merged 1 commit into from
Sep 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ObjectPool {
* Currently defined to be 4KB.
*/
constexpr static std::size_t kPageSize = 1 << 12;
/*! \brief internal mutex */
std::mutex m_;
/*!
* \brief Head of free list.
Expand Down Expand Up @@ -147,6 +148,9 @@ ObjectPool<T>::ObjectPool() {
template <typename T>
void ObjectPool<T>::AllocateChunk() {
static_assert(sizeof(LinkedList) <= kPageSize, "Object too big.");
static_assert(sizeof(LinkedList) % alignof(LinkedList) == 0, "ObjectPooll Invariant");
static_assert(alignof(LinkedList) % alignof(T) == 0, "ObjectPooll Invariant");
static_assert(kPageSize % alignof(LinkedList) == 0, "ObjectPooll Invariant");
void* new_chunk_ptr;
int ret = posix_memalign(&new_chunk_ptr, kPageSize, kPageSize);
CHECK_EQ(ret, 0) << "Allocation failed";
Expand Down
94 changes: 64 additions & 30 deletions src/engine/threaded_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,49 @@ ThreadedVar::ThreadedVar(VersionedVarBlock* head) : head_{head} {

void ThreadedVar::AppendReadDependency(OprBlock* opr_block) {
std::lock_guard<std::mutex> lock{m_};
if (ready_to_read_) {
assert(pending_write_ == nullptr);
if (pending_write_ == nullptr) {
// invariant: is_ready_to_read()
CHECK_GE(num_pending_reads_, 0);
// STATE CHANGE
++num_pending_reads_;
--opr_block->wait;
// decrease wait counter
opr_block->decr_wait();
} else {
auto&& new_var_block = VersionedVarBlock::New();
assert(head_->next == nullptr);
assert(head_->trigger == nullptr);
assert(head_->write == false);
// append things to next.
head_->next = new_var_block;
head_->trigger = opr_block;
head_ = new_var_block;
}
}

void ThreadedVar::AppendWriteDependency(OprBlock* opr_block) {
std::lock_guard<std::mutex> lock{m_};
auto&& new_var_block = VersionedVarBlock::New();
std::lock_guard<std::mutex> lock{m_};
// invariant.
assert(head_->next == nullptr);
assert(head_->trigger == nullptr);
assert(head_->write == false);
// attach to head.
head_->next = new_var_block;
head_->trigger = opr_block;
head_->write = true;
if (ready_to_read_) {
// Raise `num_pending_reads_` temporarily to avoid premature triggering.
++num_pending_reads_;

// check if it is ready to write
if (pending_write_ == nullptr) {
// invariant: is_ready_to_read()
pending_write_ = head_;
if (--num_pending_reads_ == 0) {
--opr_block->wait;
CHECK_GE(num_pending_reads_, 0);
if (num_pending_reads_ == 0) {
// STATE CHANGE
opr_block->decr_wait();
num_pending_reads_ = kWriteTriggered;
}
ready_to_read_ = false;
} else {
CHECK_NE(num_pending_reads_, 0);
}
head_ = new_var_block;
}
Expand All @@ -70,13 +84,17 @@ void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) {
{
// this is lock scope
std::lock_guard<std::mutex> lock{m_};
CHECK_GT(num_pending_reads_, 0);

if (--num_pending_reads_ == 0) {
if (pending_write_ != nullptr && --pending_write_->trigger->wait == 0) {
if (pending_write_ != nullptr) {
// STATE CHANGE
trigger = pending_write_->trigger;
num_pending_reads_ = kWriteTriggered;
}
}
}
if (trigger != nullptr) {
if (trigger != nullptr && trigger->decr_wait() == 0) {
dispatcher(trigger);
}
}
Expand All @@ -88,33 +106,39 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
OprBlock* trigger_write = nullptr;
{
std::lock_guard<std::mutex> lock{m_};
assert(ready_to_read_ == false);
// invariants
assert(head_->next == nullptr);
assert(pending_write_ != nullptr);
CHECK_EQ(num_pending_reads_, kWriteTriggered);

// really delete
if (to_delete_) {
VersionedVarBlock *head = pending_write_->next;
VersionedVarBlock::Delete(pending_write_);
assert(head->next == nullptr);
assert(head_ == head);
VersionedVarBlock::Delete(head);
return true;
}
// detach pending write
old_pending_write = pending_write_;
// search for chains to trigger
end_of_read_chain = old_pending_write->next;
assert(num_pending_reads_ == 0);
while (end_of_read_chain->next != nullptr &&
// reset to 0 pending reads
num_pending_reads_ = 0;
while (end_of_read_chain != head_ &&
end_of_read_chain->write == false) {
++num_pending_reads_;
end_of_read_chain = end_of_read_chain->next;
}
// check the states
if (end_of_read_chain->next == nullptr) {
ready_to_read_ = true;
if (end_of_read_chain == head_) {
pending_write_ = nullptr;
} else {
// check if there is pending reads, if not trigger write
assert(end_of_read_chain->write == true);
pending_write_ = end_of_read_chain;
if (num_pending_reads_ == 0 && --end_of_read_chain->trigger->wait == 0) {
if (num_pending_reads_ == 0) {
// mark write as already actived in this var
num_pending_reads_ = kWriteTriggered;
trigger_write = end_of_read_chain->trigger;
}
}
Expand All @@ -129,15 +153,15 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
VersionedVarBlock::Delete(old_pending_write);
// dispatch all the events
while (cur_head != end_of_read_chain) {
if (--cur_head->trigger->wait == 0) {
if (cur_head->trigger->decr_wait() == 0) {
dispatcher(cur_head->trigger);
}
auto prev = cur_head;
cur_head = cur_head->next;
assert(cur_head != nullptr);
VersionedVarBlock::Delete(prev);
}
if (trigger_write != nullptr) {
if (trigger_write != nullptr && trigger_write->decr_wait() == 0) {
dispatcher(trigger_write);
}
return false;
Expand All @@ -150,7 +174,7 @@ void ThreadedVar::SetToDelete() {

bool ThreadedVar::ready_to_read() {
std::lock_guard<std::mutex> lock{m_};
return ready_to_read_;
return this->is_ready_to_read();
}

// implementation of threaded engine
Expand Down Expand Up @@ -232,8 +256,10 @@ void ThreadedEngine::Push(OprHandle op, Context exec_ctx) {
ThreadedOpr* threaded_opr = ThreadedOpr::CastFromBase(op);
OprBlock* opr_block = OprBlock::New();
opr_block->opr = threaded_opr;
opr_block->wait.store(threaded_opr->const_vars.size() +
threaded_opr->mutable_vars.size() + 1);

opr_block->wait.store(static_cast<int>(
threaded_opr->const_vars.size() +
threaded_opr->mutable_vars.size() + 1));
opr_block->ctx = exec_ctx;
++pending_;
// Add read dependencies.
Expand All @@ -244,7 +270,7 @@ void ThreadedEngine::Push(OprHandle op, Context exec_ctx) {
for (auto&& i : threaded_opr->mutable_vars) {
i->AppendWriteDependency(opr_block);
}
if (--opr_block->wait == 0) {
if (opr_block->decr_wait() == 0) {
this->PushToExecute(opr_block, true);
}
}
Expand Down Expand Up @@ -275,7 +301,10 @@ void ThreadedEngine::WaitForVar(VarHandle var) {
if (threaded_var->ready_to_read()) return;
std::atomic<bool> done{false};
this->PushSync([this, &done](RunContext) {
done.store(true);
{
std::unique_lock<std::mutex> lock{finished_m_};
done.store(true);
}
finished_cv_.notify_all();
}, Context::CPU(), {var}, {}, FnProperty::kNormal);
{
Expand Down Expand Up @@ -310,12 +339,17 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) {
ThreadedVar::Delete(i);
}
}
int npending;
{
std::unique_lock<std::mutex> lock{finished_m_};
if (--pending_ == 0) {
finished_cv_.notify_all();
}
npending = --pending_;
}
CHECK_GE(npending, 0);
if (npending == 0) {
// no need to grab lock when notify.
finished_cv_.notify_all();
}

// delte operator if it is temperory
if (threaded_opr->temporary) {
ThreadedOpr::Delete(threaded_opr);
Expand Down
40 changes: 30 additions & 10 deletions src/engine/threaded_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,23 @@ struct OprBlock : public common::ObjectPoolAllocatable<OprBlock> {
/*!
* \brief wait number of pending tasks this OprBlock is waiting for.
*/
std::atomic<std::size_t> wait{0};
std::atomic<int> wait{0};
/*! \brief Pointer to information on performing real operation */
ThreadedOpr* opr{nullptr};
/*! \brief The context this operator */
Context ctx;
// define possible debug information
DEFINE_ENGINE_DEBUG_INFO(OprBlock);
/*!
* \brief call this function to decrease the wait counter.
* \return the wait counter after the decreasement.
*/
inline int decr_wait() {
// chack invariant, avoid over trigger
int ret = --wait;
CHECK_GE(ret, 0);
return ret;
}
}; // struct OprBlock

/*!
Expand Down Expand Up @@ -141,8 +151,11 @@ class ThreadedVar final : public Var,
// TODO(hotpxl) consider rename head
/*! \brief inetrnal mutex of the ThreadedVar */
std::mutex m_;
/*! \brief number of pending reads operation in the variable. */
std::size_t num_pending_reads_{0};
/*!
* \brief number of pending reads operation in the variable.
* will be marked as -1 when there is a already triggered pending write.
*/
int num_pending_reads_{0};
/*!
* \brief Points to the last VersionedVarBlock in the queue.
* head_ always points to a empty VersionedVarBlock.
Expand All @@ -158,14 +171,19 @@ class ThreadedVar final : public Var,
* This is actually the head(oldest operation) in the queue.
*/
VersionedVarBlock* pending_write_{nullptr};
/*!
* \brief If true, then there are no running or pending write on this variable.
*/
bool ready_to_read_{true};
/*!
* \brief If true, delete after operation completes.
*/
bool to_delete_{false};
/*! \brief special const on num_pending_reads_ to mark write being triggered */
static constexpr int kWriteTriggered = -1;
/*!
* \brief derived invariant of ready to ready, without lock.
* \return whether the current variable is ready to read.
*/
inline bool is_ready_to_read() const {
return pending_write_ == nullptr;
}
}; // struct ThreadedVar

/*!
Expand Down Expand Up @@ -230,8 +248,10 @@ class ThreadedEngine : public Engine {

ThreadedEngine() {}
~ThreadedEngine() {
// notify all pending waiters
kill_.store(true);
{
std::unique_lock<std::mutex> lock{finished_m_};
kill_.store(true);
}
finished_cv_.notify_all();
}

Expand Down Expand Up @@ -291,7 +311,7 @@ class ThreadedEngine : public Engine {
/*!
* \brief Number of pending operations.
*/
std::atomic<std::size_t> pending_{0};
std::atomic<int> pending_{0};
/*! \brief whether we want to kill the waiters */
std::atomic<bool> kill_{false};
/*! \brief whether it is during shutdown phase*/
Expand Down