diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index beb1bb5098..13177de619 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -419,18 +419,19 @@ class producer_plugin_impl : public std::enable_shared_from_this g( mtx ); queue.push_back(std::move(trx)); - notify_waiting(false); + cond.notify_one(); } void push_front(ro_trx_t&& trx) { std::unique_lock g( mtx ); queue.push_front(std::move(trx)); - notify_waiting(false); + cond.notify_one(); } bool empty() const { @@ -441,29 +442,19 @@ class producer_plugin_impl : public std::enable_shared_from_this g( mtx ); - - if (should_exit()) { - notify_waiting(true); + + ++num_waiting; + cond.wait(g, [this]() { + return !queue.empty() || should_exit(); + }); + if (queue.empty() || should_exit()) { + cond.notify_all(); return false; } + --num_waiting; - if (queue.empty()) { - if (num_waiting + 1 == max_waiting) { - exiting_read_window = true; - notify_waiting(true); - return false; - } - - ++num_waiting; - cond.wait(g, [this]() { return !queue.empty() || should_exit(); }); - --num_waiting; - - if (queue.empty() || should_exit()) // were we woken up by another thread? - return false; - } - trx = std::move(queue.front()); queue.pop_front(); return true; @@ -473,34 +464,27 @@ class producer_plugin_impl : public std::enable_shared_from_this* received_block, fc::time_point deadline) { - assert(num_tasks > 0 && num_waiting == 0); + void set_exit_criteria(uint32_t num_tasks, std::atomic* received_block, const fc::time_point& deadline) { + std::lock_guard g( mtx ); + assert(num_tasks > 0); assert(received_block && *received_block == false); max_waiting = num_tasks; + num_waiting = 0; received_block_ptr = received_block; read_window_deadline = deadline; } private: - void notify_waiting(bool all) { - if (num_waiting) { - all ? cond.notify_all() : cond.notify_one(); - } - } - bool should_exit() { - return *received_block_ptr || exiting_read_window || - fc::time_point::now() >= read_window_deadline; + return num_waiting == max_waiting || fc::time_point::now() >= read_window_deadline || *received_block_ptr; } mutable std::mutex mtx; std::condition_variable cond; deque queue; - uint32_t num_waiting {0}; - uint32_t max_waiting {0}; - bool exiting_read_window {false}; - std::atomic* received_block_ptr; - fc::time_point start_time; + uint32_t num_waiting{0}; + uint32_t max_waiting{0}; + std::atomic* received_block_ptr = nullptr; fc::time_point read_window_deadline; };