Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Move write trigger out correctly #69

Merged
merged 2 commits into from
Sep 14, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions src/engine/threaded_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,40 +83,40 @@ void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) {

template <typename Dispatcher>
bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
VersionedVarBlock *old_pending_write, *end_of_dispatch_chain;
VersionedVarBlock *old_pending_write, *end_of_read_chain;
bool trigger_write = false;
{
// this is lock scope
std::lock_guard<std::mutex> lock{m_};
assert(ready_to_read_ == false);
// detach pending write
old_pending_write = pending_write_;
// search for chains to trigger
VersionedVarBlock *p = old_pending_write->next;
end_of_read_chain = old_pending_write->next;
assert(num_pending_reads_ == 0);
while (p->next != nullptr && p->write == false) {
while (end_of_read_chain->next != nullptr &&
end_of_read_chain->write == false) {
++num_pending_reads_;
p = p->next;
end_of_read_chain = end_of_read_chain->next;
}
// mark end of dispatch chain
end_of_dispatch_chain = p;

if (p->next == nullptr) {
// check the states
if (end_of_read_chain->next == nullptr) {
ready_to_read_ = true;
pending_write_ = nullptr;
assert(p->trigger == nullptr);
assert(p->write ==false);
} else {
assert(p->write == true);
pending_write_ = p;
assert(end_of_read_chain->write == true);
pending_write_ = end_of_read_chain;
if (num_pending_reads_ == 0) {
if (--pending_write_->trigger->wait == 0) {
dispatcher(pending_write_->trigger);
}
trigger_write = true;
}
}
}
// this is outside of lock scope
// the linked list is detached from variable
// This is outside of lock scope
// Be very carful, pending_write_ and num_pending_reads_
// can change now, do not reply ont the two variables.
// The linked list \in [old_pending_write, end_of_read_chain)
// is already detached from this Var.
// So it is safe to modify these
VersionedVarBlock *cur_head = old_pending_write->next;
VersionedVarBlock::Delete(old_pending_write);
if (to_delete_) {
Expand All @@ -125,7 +125,7 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
return true;
}
// dispatch all the events
while (cur_head != end_of_dispatch_chain) {
while (cur_head != end_of_read_chain) {
if (--cur_head->trigger->wait == 0) {
dispatcher(cur_head->trigger);
}
Expand All @@ -134,6 +134,13 @@ bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
assert(cur_head != nullptr);
VersionedVarBlock::Delete(prev);
}
// Be careful, do not use pending_write_ or num_pending_reads_ here.
// As they can change, use end_of_read_chain
if (trigger_write) {
if (--end_of_read_chain->trigger->wait == 0) {
dispatcher(end_of_read_chain->trigger);
}
}
return false;
}

Expand Down