From 6c0e0d42d153c98f7b27641d6f891815a382efaf Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Fri, 19 Jul 2024 10:48:12 +0800 Subject: [PATCH] fix: changed the calculating logic of epoll timeout provided by TimerTaskManager (#2794) * changed the returned timeout of TimerTaskManager --------- Co-authored-by: cheniujh <1271435567@qq.com> --- src/net/src/dispatch_thread.cc | 2 +- src/net/src/net_util.cc | 55 ++++++++++++++-------------------- src/net/src/net_util.h | 40 ++++++++++--------------- 3 files changed, 40 insertions(+), 57 deletions(-) diff --git a/src/net/src/dispatch_thread.cc b/src/net/src/dispatch_thread.cc index 647d254d1..6fbe97373 100644 --- a/src/net/src/dispatch_thread.cc +++ b/src/net/src/dispatch_thread.cc @@ -66,7 +66,7 @@ int DispatchThread::StartThread() { // Adding timer tasks and run timertaskThread timer_task_thread_.AddTimerTask("blrpop_blocking_info_scan", 250, true, [this] { this->ScanExpiredBlockedConnsOfBlrpop(); }); - timer_task_thread_.set_thread_name("TimerTaskThread"); + timer_task_thread_.set_thread_name("DispacherTimerTaskThread"); timer_task_thread_.StartThread(); return ServerThread::StartThread(); } diff --git a/src/net/src/net_util.cc b/src/net/src/net_util.cc index 7efbb0f6c..c52c07f80 100644 --- a/src/net/src/net_util.cc +++ b/src/net/src/net_util.cc @@ -27,7 +27,7 @@ int Setnonblocking(int sockfd) { return flags; } -uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, +TimerTaskID TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function& task) { TimedTask new_task = {last_task_id_++, task_name, interval_ms, repeat_exec, task}; id_to_task_[new_task.task_id] = new_task; @@ -35,31 +35,31 @@ uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interv int64_t next_expired_time = NowInMs() + interval_ms; exec_queue_.insert({next_expired_time, new_task.task_id}); - if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) { - min_interval_ms_ = interval_ms; - } // return the id of this task return new_task.task_id; } + int64_t TimerTaskManager::NowInMs() { auto now = std::chrono::system_clock::now(); return std::chrono::time_point_cast(now).time_since_epoch().count(); } -int TimerTaskManager::ExecTimerTask() { + +int64_t TimerTaskManager::ExecTimerTask() { std::vector fired_tasks_; int64_t now_in_ms = NowInMs(); - // traverse in ascending order - for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) { - if (pair->exec_ts <= now_in_ms) { - auto it = id_to_task_.find(pair->id); + // traverse in ascending order, and exec expired tasks + for (const auto& task : exec_queue_) { + if (task.exec_ts <= now_in_ms) { + auto it = id_to_task_.find(task.id); assert(it != id_to_task_.end()); it->second.fun(); - fired_tasks_.push_back({pair->exec_ts, pair->id}); + fired_tasks_.push_back({task.exec_ts, task.id}); now_in_ms = NowInMs(); } else { break; } } + for (auto task : fired_tasks_) { exec_queue_.erase(task); auto it = id_to_task_.find(task.id); @@ -69,16 +69,21 @@ int TimerTaskManager::ExecTimerTask() { exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id}); } else { // this task only need to be exec once, completely remove this task - int interval_del = it->second.interval_ms; id_to_task_.erase(task.id); - if (interval_del == min_interval_ms_) { - RenewMinIntervalMs(); - } } } - return min_interval_ms_; + + if (exec_queue_.empty()) { + //to avoid wasting of cpu resources, epoll use 5000ms as timeout value when no task to exec + return 5000; + } + + int64_t gap_between_now_and_next_task = exec_queue_.begin()->exec_ts - NowInMs(); + gap_between_now_and_next_task = gap_between_now_and_next_task < 0 ? 0 : gap_between_now_and_next_task; + return gap_between_now_and_next_task; } -bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) { + +bool TimerTaskManager::DelTimerTaskByTaskId(TimerTaskID task_id) { // remove the task auto task_to_del = id_to_task_.find(task_id); if (task_to_del == id_to_task_.end()) { @@ -87,11 +92,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) { int interval_del = task_to_del->second.interval_ms; id_to_task_.erase(task_to_del); - // renew the min_interval_ms_ - if (interval_del == min_interval_ms_) { - RenewMinIntervalMs(); - } - // remove from exec queue ExecTsWithId target_key = {-1, 0}; for (auto pair : exec_queue_) { @@ -106,15 +106,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) { return true; } -void TimerTaskManager::RenewMinIntervalMs() { - min_interval_ms_ = -1; - for (auto pair : id_to_task_) { - if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) { - min_interval_ms_ = pair.second.interval_ms; - } - } -} - TimerTaskThread::~TimerTaskThread() { if (!timer_task_manager_.Empty()) { LOG(INFO) << "TimerTaskThread exit !!!"; @@ -140,9 +131,9 @@ int TimerTaskThread::StopThread() { } void* TimerTaskThread::ThreadMain() { - int timeout; + int32_t timeout; while (!should_stop()) { - timeout = timer_task_manager_.ExecTimerTask(); + timeout = static_cast(timer_task_manager_.ExecTimerTask()); net_multiplexer_->NetPoll(timeout); } return nullptr; diff --git a/src/net/src/net_util.h b/src/net/src/net_util.h index 7b8cd364f..b30806c3b 100644 --- a/src/net/src/net_util.h +++ b/src/net/src/net_util.h @@ -21,9 +21,9 @@ namespace net { int Setnonblocking(int sockfd); - +using TimerTaskID = int64_t; struct TimedTask{ - uint32_t task_id; + TimerTaskID task_id; std::string task_name; int interval_ms; bool repeat_exec; @@ -34,7 +34,7 @@ struct ExecTsWithId { //the next exec time of the task, unit in ms int64_t exec_ts; //id of the task to be exec - uint32_t id; + TimerTaskID id; bool operator<(const ExecTsWithId& other) const{ if(exec_ts == other.exec_ts){ @@ -47,36 +47,28 @@ struct ExecTsWithId { } }; -/* - * For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started, - * but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect - * the timer_task_manager_ and also a pipe to wake up the maybe being endless-wait epoll(if all task consumed, epoll will sink into - * endless wait) to implement the feature. - */ class TimerTaskManager { public: TimerTaskManager() = default; ~TimerTaskManager() = default; - - uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function &task); - //return the newest min_minterval_ms - int ExecTimerTask(); - bool DelTimerTaskByTaskId(uint32_t task_id); - int GetMinIntervalMs() const { return min_interval_ms_; } + TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function &task); + //return the time gap between now and next task-expired time, which can be used as the timeout value of epoll + int64_t ExecTimerTask(); + bool DelTimerTaskByTaskId(TimerTaskID task_id); int64_t NowInMs(); - void RenewMinIntervalMs(); - bool Empty(){ return 0 == last_task_id_; } - + bool Empty() const { return exec_queue_.empty(); } private: //items stored in std::set are ascending ordered, we regard it as an auto sorted queue std::set exec_queue_; - std::unordered_map id_to_task_; - uint32_t last_task_id_{0}; - int min_interval_ms_{-1}; + std::unordered_map id_to_task_; + TimerTaskID last_task_id_{0}; }; - +/* + * For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started, + * but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect the timer_task_manager_ + */ class TimerTaskThread : public Thread { public: TimerTaskThread(){ @@ -88,11 +80,11 @@ class TimerTaskThread : public Thread { int StopThread() override; void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); } - uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function &task){ + TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function &task){ return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task); }; - bool DelTimerTaskByTaskId(uint32_t task_id){ + bool DelTimerTaskByTaskId(TimerTaskID task_id){ return timer_task_manager_.DelTimerTaskByTaskId(task_id); };