-
Notifications
You must be signed in to change notification settings - Fork 1
/
thread_pool.cpp
116 lines (103 loc) · 2.2 KB
/
thread_pool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include "thread_pool.hpp"
#include <exception>
#include <optional>
ThreadPool::ThreadPool()
{
for (auto i = 0u; i < std::thread::hardware_concurrency(); ++i)
pool.emplace_back(&ThreadPool::run, this);
}
ThreadPool::~ThreadPool()
{
join();
}
void ThreadPool::join()
{
{
std::lock_guard<std::mutex> guard(mutex);
done = true;
}
newJob.notify_all();
auto tmpPool = std::move(pool);
pool.clear();
for (auto &&p : tmpPool)
p.join();
auto tmpAfterJobs = std::move(afterJobs);
afterJobs.clear();
for (auto &&job : tmpAfterJobs)
job();
}
void ThreadPool::addJob(std::function<void()> &&job, std::function<void()> &&afterJob)
{
std::lock_guard<std::mutex> guard(mutex);
jobs.push_back(std::make_pair(job, afterJob));
newJob.notify_one();
}
void ThreadPool::waitForOne()
{
std::unique_lock<std::mutex> lock(mutex);
if (!afterJobs.empty())
{
for (auto &&job : afterJobs)
job();
afterJobs.clear();
return;
}
if (jobs.empty())
return;
jobDone.wait(lock);
for (auto &&job : afterJobs)
job();
afterJobs.clear();
}
bool ThreadPool::empty()
{
return jobs.empty();
}
void ThreadPool::run()
{
for (;;)
{
try
{
auto job = [this]() -> std::optional<std::pair<std::function<void()>, std::function<void()>>> {
std::unique_lock<std::mutex> lock(mutex);
while (jobs.empty())
{
if (done)
return std::nullopt;
newJob.wait(lock);
if (done)
return std::nullopt;
}
auto res = jobs.back();
jobs.pop_back();
return res;
}();
if (!job)
return;
job->first();
{
std::lock_guard<std::mutex> guard(mutex);
afterJobs.push_back(job->second);
}
jobDone.notify_one();
}
catch (std::exception &e)
{
{
std::lock_guard<std::mutex> guard(mutex);
std::string err = e.what();
afterJobs.push_back([err]() { throw std::runtime_error(err); });
}
jobDone.notify_one();
}
catch (int e)
{
{
std::lock_guard<std::mutex> guard(mutex);
afterJobs.push_back([e]() { throw e; });
}
jobDone.notify_one();
}
}
}