Skip to content

Commit

Permalink
[fix](BE) Fix inefficient problem in PriorTaskWorkerPool (#37169)
Browse files Browse the repository at this point in the history
## Proposed changes

In the original implementation of `PriorTaskWorkerPool`, although
multiple threads were launched in the normal pool and high prior pool,
only one thread was actually working in each pool (running `normal_loop`
and `high_prior_loop`, respectively). This PR fixes this issue.
  • Loading branch information
platoneko authored Jul 5, 2024
1 parent d86a633 commit dbf963c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
38 changes: 15 additions & 23 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,26 +540,20 @@ Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
}

PriorTaskWorkerPool::PriorTaskWorkerPool(
std::string_view name, int normal_worker_count, int high_prior_worker_count,
const std::string& name, int normal_worker_count, int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback)
: _callback(std::move(callback)) {
auto st = ThreadPoolBuilder(fmt::format("TaskWP_.{}", name))
.set_min_threads(normal_worker_count)
.set_max_threads(normal_worker_count)
.build(&_normal_pool);
CHECK(st.ok()) << name << ": " << st;

st = _normal_pool->submit_func([this] { normal_loop(); });
CHECK(st.ok()) << name << ": " << st;

st = ThreadPoolBuilder(fmt::format("HighPriorPool.{}", name))
.set_min_threads(high_prior_worker_count)
.set_max_threads(high_prior_worker_count)
.build(&_high_prior_pool);
CHECK(st.ok()) << name << ": " << st;
for (int i = 0; i < normal_worker_count; ++i) {
auto st = Thread::create(
"Normal", name, [this] { normal_loop(); }, &_workers.emplace_back());
CHECK(st.ok()) << name << ": " << st;
}

st = _high_prior_pool->submit_func([this] { high_prior_loop(); });
CHECK(st.ok()) << name << ": " << st;
for (int i = 0; i < high_prior_worker_count; ++i) {
auto st = Thread::create(
"HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back());
CHECK(st.ok()) << name << ": " << st;
}
}

PriorTaskWorkerPool::~PriorTaskWorkerPool() {
Expand All @@ -578,12 +572,10 @@ void PriorTaskWorkerPool::stop() {
_normal_condv.notify_all();
_high_prior_condv.notify_all();

if (_normal_pool) {
_normal_pool->shutdown();
}

if (_high_prior_pool) {
_high_prior_pool->shutdown();
for (auto&& w : _workers) {
if (w) {
w->join();
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class PublishVersionWorkerPool final : public TaskWorkerPool {

class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
public:
PriorTaskWorkerPool(std::string_view name, int normal_worker_count, int high_prior_worker_count,
PriorTaskWorkerPool(const std::string& name, int normal_worker_count,
int high_prior_worker_count,
std::function<void(const TAgentTaskRequest& task)> callback);

~PriorTaskWorkerPool() override;
Expand All @@ -101,8 +102,7 @@ class PriorTaskWorkerPool final : public TaskWorkerPoolIf {
std::condition_variable _high_prior_condv;
std::deque<std::unique_ptr<TAgentTaskRequest>> _high_prior_queue;

std::unique_ptr<ThreadPool> _normal_pool;
std::unique_ptr<ThreadPool> _high_prior_pool;
std::vector<scoped_refptr<Thread>> _workers;

std::function<void(const TAgentTaskRequest&)> _callback;
};
Expand Down

0 comments on commit dbf963c

Please sign in to comment.