Skip to content

Commit

Permalink
fix: changed the calculating logic of epoll timeout provided by Timer…
Browse files Browse the repository at this point in the history
…TaskManager (#2794)

* changed the returned timeout of TimerTaskManager

---------

Co-authored-by: cheniujh <1271435567@qq.com>
  • Loading branch information
cheniujh and cheniujh authored Jul 19, 2024
1 parent f4cfbde commit 6c0e0d4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int DispatchThread::StartThread() {
// Adding timer tasks and run timertaskThread
timer_task_thread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });
timer_task_thread_.set_thread_name("TimerTaskThread");
timer_task_thread_.set_thread_name("DispacherTimerTaskThread");
timer_task_thread_.StartThread();
return ServerThread::StartThread();
}
Expand Down
55 changes: 23 additions & 32 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,39 @@ int Setnonblocking(int sockfd) {
return flags;
}

uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
TimerTaskID TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
const std::function<void()>& task) {
TimedTask new_task = {last_task_id_++, task_name, interval_ms, repeat_exec, task};
id_to_task_[new_task.task_id] = new_task;

int64_t next_expired_time = NowInMs() + interval_ms;
exec_queue_.insert({next_expired_time, new_task.task_id});

if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) {
min_interval_ms_ = interval_ms;
}
// return the id of this task
return new_task.task_id;
}

int64_t TimerTaskManager::NowInMs() {
auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
}
int TimerTaskManager::ExecTimerTask() {

int64_t TimerTaskManager::ExecTimerTask() {
std::vector<ExecTsWithId> fired_tasks_;
int64_t now_in_ms = NowInMs();
// traverse in ascending order
for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) {
if (pair->exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair->id);
// traverse in ascending order, and exec expired tasks
for (const auto& task : exec_queue_) {
if (task.exec_ts <= now_in_ms) {
auto it = id_to_task_.find(task.id);
assert(it != id_to_task_.end());
it->second.fun();
fired_tasks_.push_back({pair->exec_ts, pair->id});
fired_tasks_.push_back({task.exec_ts, task.id});
now_in_ms = NowInMs();
} else {
break;
}
}

for (auto task : fired_tasks_) {
exec_queue_.erase(task);
auto it = id_to_task_.find(task.id);
Expand All @@ -69,16 +69,21 @@ int TimerTaskManager::ExecTimerTask() {
exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id});
} else {
// this task only need to be exec once, completely remove this task
int interval_del = it->second.interval_ms;
id_to_task_.erase(task.id);
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}
}
}
return min_interval_ms_;

if (exec_queue_.empty()) {
//to avoid wasting of cpu resources, epoll use 5000ms as timeout value when no task to exec
return 5000;
}

int64_t gap_between_now_and_next_task = exec_queue_.begin()->exec_ts - NowInMs();
gap_between_now_and_next_task = gap_between_now_and_next_task < 0 ? 0 : gap_between_now_and_next_task;
return gap_between_now_and_next_task;
}
bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {

bool TimerTaskManager::DelTimerTaskByTaskId(TimerTaskID task_id) {
// remove the task
auto task_to_del = id_to_task_.find(task_id);
if (task_to_del == id_to_task_.end()) {
Expand All @@ -87,11 +92,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
int interval_del = task_to_del->second.interval_ms;
id_to_task_.erase(task_to_del);

// renew the min_interval_ms_
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}

// remove from exec queue
ExecTsWithId target_key = {-1, 0};
for (auto pair : exec_queue_) {
Expand All @@ -106,15 +106,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
return true;
}

void TimerTaskManager::RenewMinIntervalMs() {
min_interval_ms_ = -1;
for (auto pair : id_to_task_) {
if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) {
min_interval_ms_ = pair.second.interval_ms;
}
}
}

TimerTaskThread::~TimerTaskThread() {
if (!timer_task_manager_.Empty()) {
LOG(INFO) << "TimerTaskThread exit !!!";
Expand All @@ -140,9 +131,9 @@ int TimerTaskThread::StopThread() {
}

void* TimerTaskThread::ThreadMain() {
int timeout;
int32_t timeout;
while (!should_stop()) {
timeout = timer_task_manager_.ExecTimerTask();
timeout = static_cast<int32_t>(timer_task_manager_.ExecTimerTask());
net_multiplexer_->NetPoll(timeout);
}
return nullptr;
Expand Down
40 changes: 16 additions & 24 deletions src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
namespace net {

int Setnonblocking(int sockfd);

using TimerTaskID = int64_t;
struct TimedTask{
uint32_t task_id;
TimerTaskID task_id;
std::string task_name;
int interval_ms;
bool repeat_exec;
Expand All @@ -34,7 +34,7 @@ struct ExecTsWithId {
//the next exec time of the task, unit in ms
int64_t exec_ts;
//id of the task to be exec
uint32_t id;
TimerTaskID id;

bool operator<(const ExecTsWithId& other) const{
if(exec_ts == other.exec_ts){
Expand All @@ -47,36 +47,28 @@ struct ExecTsWithId {
}
};

/*
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect
* the timer_task_manager_ and also a pipe to wake up the maybe being endless-wait epoll(if all task consumed, epoll will sink into
* endless wait) to implement the feature.
*/
class TimerTaskManager {
public:
TimerTaskManager() = default;
~TimerTaskManager() = default;

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
//return the newest min_minterval_ms
int ExecTimerTask();
bool DelTimerTaskByTaskId(uint32_t task_id);
int GetMinIntervalMs() const { return min_interval_ms_; }
TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
//return the time gap between now and next task-expired time, which can be used as the timeout value of epoll
int64_t ExecTimerTask();
bool DelTimerTaskByTaskId(TimerTaskID task_id);
int64_t NowInMs();
void RenewMinIntervalMs();
bool Empty(){ return 0 == last_task_id_; }

bool Empty() const { return exec_queue_.empty(); }
private:
//items stored in std::set are ascending ordered, we regard it as an auto sorted queue
std::set<ExecTsWithId> exec_queue_;
std::unordered_map<uint32_t, TimedTask> id_to_task_;
uint32_t last_task_id_{0};
int min_interval_ms_{-1};
std::unordered_map<TimerTaskID, TimedTask> id_to_task_;
TimerTaskID last_task_id_{0};
};



/*
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect the timer_task_manager_
*/
class TimerTaskThread : public Thread {
public:
TimerTaskThread(){
Expand All @@ -88,11 +80,11 @@ class TimerTaskThread : public Thread {
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task);
};

bool DelTimerTaskByTaskId(uint32_t task_id){
bool DelTimerTaskByTaskId(TimerTaskID task_id){
return timer_task_manager_.DelTimerTaskByTaskId(task_id);
};

Expand Down

0 comments on commit 6c0e0d4

Please sign in to comment.