-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadPool.hpp
114 lines (89 loc) · 2.93 KB
/
ThreadPool.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
* vim:ts=4:shiftwidth=4:et:cindent:fileencoding=utf-8:
*/
#pragma once
#include <thread>
#include <vector>
#include <optional>
#include <mutex>
#include <condition_variable>
#include <cstdint>
#include <sched.h>
#include "debug.hpp"
#include "ThreadStruct.hpp"
#include "MyQueue.hpp"
#include "ExecBarrier.hpp"
template <typename QueElem>
class ThreadPool
{
private:
const size_t mThrCnt;
ExecBarrier mBarrier;
std::vector<ThreadStruct> mThrList;
protected:
MyQueue<QueElem> mQueue;
std::mutex mMutex;
std::condition_variable mCond;
bool mRun = true; // it is ok not to be atomic.
public:
ThreadPool(const size_t aThrCnt, const bool aAffinity) : mThrCnt(aThrCnt), mBarrier(aThrCnt + 1)
{
uint32_t sNumCpus = std::thread::hardware_concurrency();
mThrList.reserve(aThrCnt);
for (size_t i = 0; i < aThrCnt; i++)
{
mThrList.emplace_back(&ThreadPool::threadMain, this);
if (aAffinity == true)
{
cpu_set_t sCpuSet;
CPU_ZERO(&sCpuSet);
CPU_SET((i % sNumCpus), &sCpuSet);
if (int32_t sRc = pthread_setaffinity_np(mThrList.back().mThread.native_handle(),
sizeof(cpu_set_t), &sCpuSet);
sRc != 0)
{
std::cerr << "Error calling pthread_setaffinity_np: " << sRc << "\n";
throw std::pair<int32_t, int32_t>(errno, sRc);
}
}
}
mBarrier.waitOnArrive();
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
void threadMain(ThreadStruct* aStatus)
{
mBarrier.waitOnArrive();
while (true)
{
if (auto sElem = mQueue.pop(); sElem.has_value() == true)
{
executeTask(std::move(sElem));
}
else
{
if (mRun == false)
{
break;
}
else
{
using namespace std::chrono_literals;
std::unique_lock sLock(mMutex);
(void)mCond.wait_for(sLock, 100ms); // no need to prevent spurious wakeups, let it happen
}
}
}
}
void stop(void)
{
mRun = false;
for (auto& sThr : mThrList)
{
sThr.mThread.join();
}
}
virtual void executeTask(std::optional<QueElem>&& aElem) const = 0;
};