-
Notifications
You must be signed in to change notification settings - Fork 0
/
MutiBuffer.hpp
133 lines (113 loc) · 3.21 KB
/
MutiBuffer.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#pragma once
#include<atomic>
#include<chrono>
#include<vector>
#include<memory>
#include<cassert>
#include<thread>
#include<functional>
#include "ThreadPool.hpp"
/*
* 多个buffer缓冲区,多个线程写,一个线程读
,目的是解决读快写慢的问题,尽量缩小读写速度差
*/
const int BUFFER_CAN_READ = 0; //缓冲区可以读
const int BUFFER_CAN_WRITE = 1; //缓冲区可以写
const int BUFFER_IDEL = -1; //缓冲区空闲
template<typename T>
using Buffer = std::vector<std::shared_ptr<T>>;
template<typename T>
using LoadDataFunc = std::function<void(Buffer<T>& datas)>;
template<typename T>
class MutiBuffer
{
public:
explicit MutiBuffer(unsigned int writeThreadNum = 2, unsigned int bufferSize = 1024);
void write(LoadDataFunc<T> loadDataFunc);
void waitAndRead(Buffer<T>& datas);
void writeBuffer(LoadDataFunc<T> loadDataFunc, size_t taskId);
~MutiBuffer() = default;
private:
unsigned int _writeThreadNum; //写线程数
unsigned int _bufferSize; //每个buffer的大小
std::vector<Buffer<T>> _buffers; //buffers
ThreadPool _threadPool; //线程池
std::vector<int> _bufferEmploy; //缓冲区占用状态
std::mutex _bufferStatusMutex; //缓冲区状态临界区
int _readBufferIndex; //当前读取的缓冲区的下标
int _writeBufferIndex; //当前写的缓冲区的下标
};
template<typename T>
inline MutiBuffer<T>::MutiBuffer(unsigned int writeThreadNum, unsigned int bufferSize)
:_writeThreadNum(writeThreadNum), _bufferSize(bufferSize)
, _threadPool(writeThreadNum), _readBufferIndex(0), _writeBufferIndex(0)
{
assert(writeThreadNum >= 1);
//缓冲区个数是写线程数的两倍
for (size_t i = 0; i < writeThreadNum * 2; i++)
{
_buffers.push_back(Buffer<T>(bufferSize));
_bufferEmploy.push_back(BUFFER_IDEL);
}
}
template<typename T>
inline void MutiBuffer<T>::write(LoadDataFunc<T> loadDataFunc)
{
//任务数量超过缓冲区个数就等待
static size_t taskNums = 0;
while (_threadPool.getTaskNums() >= _buffers.size())
{
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
_threadPool.enqueue(&MutiBuffer::writeBuffer, this, loadDataFunc, taskNums);
taskNums = (taskNums + 1) % _buffers.size();
}
template<typename T>
void MutiBuffer<T>::waitAndRead(Buffer<T>& datas)
{
while (true)
{
{
std::lock_guard<std::mutex> lk(_bufferStatusMutex);
if (_bufferEmploy[_readBufferIndex] == BUFFER_CAN_READ)
{
datas.swap(_buffers[_readBufferIndex]);
_bufferEmploy[_readBufferIndex] = BUFFER_IDEL;
_readBufferIndex = (++_readBufferIndex) % _buffers.size();
break;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
template<typename T>
inline void MutiBuffer<T>::writeBuffer(LoadDataFunc<T> loadDataFunc, size_t taskId)
{
size_t curBufferIndex = 0;
while (true)
{
{
std::lock_guard<std::mutex> lk(_bufferStatusMutex);
if (_bufferEmploy[taskId] == BUFFER_IDEL)
{
curBufferIndex = taskId;
#ifdef DEBUG
std::cout << "任务" << taskId << ",开始写数据" << std::endl;
#endif // DEBUG
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
//调用函数区读取数据
Buffer<T> datas;
loadDataFunc(datas);
if (datas.empty())
{
return;
}
assert(datas.size() <= _bufferSize);
_buffers[curBufferIndex].swap(datas);
std::lock_guard<std::mutex> lk(_bufferStatusMutex);
_bufferEmploy[curBufferIndex] = BUFFER_CAN_READ;
}