-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadPool.h
162 lines (145 loc) · 3.46 KB
/
ThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// ThreadPool with C++ std::thread
// Author: Yuchuan Wang
// yuchuan.wang@gmail.com
//
#include <iostream>
#include <atomic>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
#include <queue>
// Function will be running inside thread pool
using ThreadTask = std::function<void()>;
class ThreadPool
{
public:
// If threads_num is 0, it will use the same number of CPU cores
// If tasks_num is -1, the number of tasks will be unlimited
ThreadPool(int threads_num = 0, int tasks_num = -1)
{
if(threads_num == 0)
{
max_threads = std::thread::hardware_concurrency();
}
else
{
max_threads = threads_num;
}
max_tasks = tasks_num;
is_running = false;
}
~ThreadPool()
{
WaitForStop();
}
bool AddTask(ThreadTask task)
{
// Scope for lock
{
std::unique_lock<std::mutex> lock(tasks_guard);
if(max_tasks == -1)
{
// Unlimited
tasks.push(task);
}
else
{
if(tasks.size() >= max_tasks)
{
return false;
}
else
{
tasks.push(task);
}
}
}
// Notify thread
tasks_event.notify_one();
return true;
}
bool Start()
{
if(is_running)
{
// Running already
return false;
}
is_running = true;
if(threads.empty())
{
CreateThreads();
}
return true;
}
void WaitForStop()
{
if(!is_running)
{
// I am not running
return;
}
is_running = false;
tasks_event.notify_all();
for(auto &t : threads)
{
// Wait for all threads to exit
t.join();
}
threads.clear();
}
private:
void CreateThreads()
{
for(int i = 0; i < max_threads; i++)
{
threads.push_back(std::thread(&ThreadPool::ThreadRoutine, this));
}
}
// Thread worker function
// Take task from queue, and run it
static void ThreadRoutine(ThreadPool* ptr)
{
if(ptr == nullptr)
{
return;
}
while(ptr->is_running || !ptr->tasks.empty())
{
ThreadTask task;
// Scope for lock
{
// Get task to run
std::unique_lock<std::mutex> lock(ptr->tasks_guard);
while(ptr->tasks.empty())
{
// Wait until task is ready
ptr->tasks_event.wait(lock);
}
// OK, now there is a task ready to run
task = ptr->tasks.front();
ptr->tasks.pop();
}
// Run it
task();
}
}
private:
// Max threads allowed
int max_threads;
// Max tasks inside queue
int max_tasks;
// Vector of threads
std::vector<std::thread> threads;
// Queue of tasks
std::queue<ThreadTask> tasks;
// Flag of runnin status
bool is_running;
// Mutex to protect the tasks queue
std::mutex tasks_guard;
// Condition of tasks event
std::condition_variable tasks_event;
};