Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify condition variable use #862

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 20 additions & 36 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,19 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
};

// The queue storing read-only transactions to be executed by read-only threads
// Thread-safe
class ro_trx_queue_t {
public:
void push_back(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_back(std::move(trx));
notify_waiting(false);
cond.notify_one();
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
}

void push_front(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_front(std::move(trx));
notify_waiting(false);
cond.notify_one();
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
}

bool empty() const {
Expand All @@ -441,29 +442,19 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// may wait if the queue is empty, and not all other threads are already waiting.
// returns true if a transaction was dequeued and should be executed, or false
// if conditions are met to stop processing transactions.
bool pop_front(ro_trx_t &trx) {
bool pop_front(ro_trx_t& trx) {
std::unique_lock<std::mutex> g( mtx );

if (should_exit()) {
notify_waiting(true);

++num_waiting;
cond.wait(g, [this]() {
return !queue.empty() || should_exit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. Earlier I was worried when switching from write window, we might end up waiting forever in some corner cases.

});
if (queue.empty() || should_exit()) {
cond.notify_all();
return false;
}
--num_waiting;

if (queue.empty()) {
if (num_waiting + 1 == max_waiting) {
exiting_read_window = true;
notify_waiting(true);
return false;
}

++num_waiting;
cond.wait(g, [this]() { return !queue.empty() || should_exit(); });
--num_waiting;

if (queue.empty() || should_exit()) // were we woken up by another thread?
return false;
}

trx = std::move(queue.front());
queue.pop_front();
return true;
Expand All @@ -473,34 +464,27 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// - all threads would be idle
// - or the net_plugin received a block.
// - or we have reached the read_window_deadline
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, fc::time_point deadline) {
assert(num_tasks > 0 && num_waiting == 0);
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, const fc::time_point& deadline) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to pass deadline by value https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#Rf-in

std::lock_guard<std::mutex> g( mtx );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::lock_guard unnecessary as this is called from a single thread

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but safer to have here in case this code is called from somewhere else. A uncontested lock is not very expensive.

assert(num_tasks > 0);
assert(received_block && *received_block == false);
max_waiting = num_tasks;
num_waiting = 0;
received_block_ptr = received_block;
read_window_deadline = deadline;
}

private:
void notify_waiting(bool all) {
if (num_waiting) {
all ? cond.notify_all() : cond.notify_one();
}
}

bool should_exit() {
return *received_block_ptr || exiting_read_window ||
fc::time_point::now() >= read_window_deadline;
return num_waiting == max_waiting || fc::time_point::now() >= read_window_deadline || *received_block_ptr;
}

mutable std::mutex mtx;
std::condition_variable cond;
deque<ro_trx_t> queue;
uint32_t num_waiting {0};
uint32_t max_waiting {0};
bool exiting_read_window {false};
std::atomic<bool>* received_block_ptr;
fc::time_point start_time;
uint32_t num_waiting{0};
uint32_t max_waiting{0};
std::atomic<bool>* received_block_ptr = nullptr;
fc::time_point read_window_deadline;
};

Expand Down