Skip to content

Commit

Permalink
GH-822 Simplify condition variable use
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Mar 22, 2023
1 parent d3b227a commit 4b12e6e
Showing 1 changed file with 23 additions and 36 deletions.
59 changes: 23 additions & 36 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,21 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
};

// The queue storing read-only transactions to be executed by read-only threads
// Thread-safe
class ro_trx_queue_t {
public:
void push_back(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_back(std::move(trx));
notify_waiting(false);
g.unlock();
cond.notify_one();
}

void push_front(ro_trx_t&& trx) {
std::unique_lock<std::mutex> g( mtx );
queue.push_front(std::move(trx));
notify_waiting(false);
g.unlock();
cond.notify_one();
}

bool empty() const {
Expand All @@ -441,29 +444,20 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// may wait if the queue is empty, and not all other threads are already waiting.
// returns true if a transaction was dequeued and should be executed, or false
// if conditions are met to stop processing transactions.
bool pop_front(ro_trx_t &trx) {
bool pop_front(ro_trx_t& trx) {
std::unique_lock<std::mutex> g( mtx );

if (should_exit()) {
notify_waiting(true);

++num_waiting;
cond.wait(g, [this]() {
return !queue.empty() || should_exit();
});
if (queue.empty() || should_exit()) {
g.unlock();
cond.notify_all();
return false;
}
--num_waiting;

if (queue.empty()) {
if (num_waiting + 1 == max_waiting) {
exiting_read_window = true;
notify_waiting(true);
return false;
}

++num_waiting;
cond.wait(g, [this]() { return !queue.empty() || should_exit(); });
--num_waiting;

if (queue.empty() || should_exit()) // were we woken up by another thread?
return false;
}

trx = std::move(queue.front());
queue.pop_front();
return true;
Expand All @@ -473,34 +467,27 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// - all threads would be idle
// - or the net_plugin received a block.
// - or we have reached the read_window_deadline
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, fc::time_point deadline) {
assert(num_tasks > 0 && num_waiting == 0);
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, const fc::time_point& deadline) {
std::lock_guard<std::mutex> g( mtx );
assert(num_tasks > 0);
assert(received_block && *received_block == false);
max_waiting = num_tasks;
num_waiting = 0;
received_block_ptr = received_block;
read_window_deadline = deadline;
}

private:
void notify_waiting(bool all) {
if (num_waiting) {
all ? cond.notify_all() : cond.notify_one();
}
}

bool should_exit() {
return *received_block_ptr || exiting_read_window ||
fc::time_point::now() >= read_window_deadline;
return num_waiting == max_waiting || fc::time_point::now() >= read_window_deadline || *received_block_ptr;
}

mutable std::mutex mtx;
std::condition_variable cond;
deque<ro_trx_t> queue;
uint32_t num_waiting {0};
uint32_t max_waiting {0};
bool exiting_read_window {false};
std::atomic<bool>* received_block_ptr;
fc::time_point start_time;
uint32_t num_waiting{0};
uint32_t max_waiting{0};
std::atomic<bool>* received_block_ptr = nullptr;
fc::time_point read_window_deadline;
};

Expand Down

0 comments on commit 4b12e6e

Please sign in to comment.