-
Notifications
You must be signed in to change notification settings - Fork 411
/
ParallelInputsProcessor.h
373 lines (316 loc) · 12.1 KB
/
ParallelInputsProcessor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <Common/CurrentMetrics.h>
#include <Common/Logger.h>
#include <Common/MemoryTracker.h>
#include <Common/ThreadFactory.h>
#include <Common/ThreadManager.h>
#include <Common/setThreadName.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <common/logger_useful.h>
#include <atomic>
#include <list>
#include <mutex>
#include <queue>
#include <thread>
/** Allows to process multiple block input streams (sources) in parallel, using specified number of threads.
* Reads (pulls) blocks from any available source and passes it to specified handler.
*
* Before any reading, calls "readPrefix" method of sources in parallel.
*
* (As an example, "readPrefix" can prepare connections to remote servers,
* and we want this work to be executed in parallel for different sources)
*
* Implemented in following way:
* - there are multiple input sources to read blocks from;
* - there are multiple threads, that could simultaneously read blocks from different sources;
* - "available" sources (that are not read in any thread right now) are put in queue of sources;
* - when thread take a source to read from, it removes source from queue of sources,
* then read block from source and then put source back to queue of available sources.
*/
namespace DB
{
/** Union mode.
*/
enum class StreamUnionMode
{
Basic = 0, /// take out blocks
ExtraInfo /// take out blocks + additional information
};
/// Example of the handler.
struct ParallelInputsHandler
{
/// Processing the data block.
void onBlock(Block & /*block*/, size_t /*thread_num*/) {}
/// Processing the data block + additional information.
void onBlock(Block & /*block*/, BlockExtraInfo & /*extra_info*/, size_t /*thread_num*/) {}
/// Called for each thread, when the thread has nothing else to do.
/// Due to the fact that part of the sources has run out, and now there are fewer sources left than streams.
/// Called if the `onException` method does not throw an exception; is called before the `onFinish` method.
void onFinishThread(size_t /*thread_num*/) {}
/// Blocks are over. Due to the fact that all sources ran out or because of the cancellation of work.
/// This method is always called exactly once, at the end of the work, if the `onException` method does not throw an exception.
void onFinish() {}
/// Exception handling. It is reasonable to call the ParallelInputsProcessor::cancel method in this method, and also pass the exception to the main thread.
void onException(std::exception_ptr & /*exception*/, size_t /*thread_num*/) {}
};
template <typename Handler, StreamUnionMode mode = StreamUnionMode::Basic>
class ParallelInputsProcessor
{
public:
/** additional_input_at_end - if not nullptr,
* then the blocks from this source will start to be processed only after all other sources are processed.
* This is done in the main thread.
*
* Intended for implementation of FULL and RIGHT JOIN
* - where you must first make JOIN in parallel, while noting which keys are not found,
* and only after the completion of this work, create blocks of keys that are not found.
*/
ParallelInputsProcessor(
const BlockInputStreams & inputs_,
const BlockInputStreamPtr & additional_input_at_end_,
size_t max_threads_,
Handler & handler_,
const LoggerPtr & log_)
: inputs(inputs_)
, additional_input_at_end(additional_input_at_end_)
, max_threads(std::min(inputs_.size(), max_threads_))
, handler(handler_)
, log(log_)
{
for (size_t i = 0; i < inputs_.size(); ++i)
unprepared_inputs.emplace(inputs_[i], i);
}
~ParallelInputsProcessor()
{
try
{
wait();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
/// Start background threads, start work.
void process()
{
if (!thread_manager)
thread_manager = newThreadManager();
active_threads = max_threads;
for (size_t i = 0; i < max_threads; ++i)
thread_manager->schedule(true, handler.getName(), [this, i] { this->thread(i); });
}
/// Ask all sources to stop earlier than they run out.
void cancel(bool kill)
{
finish = true;
for (auto & input : inputs)
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*input))
{
try
{
child->cancel(kill);
}
catch (...)
{
/** If you can not ask one or more sources to stop.
* (for example, the connection is broken for distributed query processing)
* - then do not care.
*/
LOG_FMT_ERROR(log, "Exception while cancelling {}", child->getName());
}
}
}
}
/// Wait until all threads are finished, before the destructor.
void wait()
{
if (joined_threads)
return;
if (thread_manager)
thread_manager->wait();
joined_threads = true;
}
size_t getNumActiveThreads() const
{
return active_threads;
}
size_t getMaxThreads() const
{
return max_threads;
}
private:
/// Single source data
struct InputData
{
BlockInputStreamPtr in;
size_t i; /// The source number (for debugging).
InputData() {}
InputData(const BlockInputStreamPtr & in_, size_t i_)
: in(in_)
, i(i_)
{}
};
void publishPayload(BlockInputStreamPtr & stream, Block & block, size_t thread_num)
{
if constexpr (mode == StreamUnionMode::Basic)
handler.onBlock(block, thread_num);
else
{
BlockExtraInfo extra_info = stream->getBlockExtraInfo();
handler.onBlock(block, extra_info, thread_num);
}
}
void thread(size_t thread_num)
{
std::exception_ptr exception;
try
{
while (!finish)
{
InputData unprepared_input;
{
std::lock_guard lock(unprepared_inputs_mutex);
if (unprepared_inputs.empty())
break;
unprepared_input = unprepared_inputs.front();
unprepared_inputs.pop();
}
unprepared_input.in->readPrefix();
{
std::lock_guard lock(available_inputs_mutex);
available_inputs.push(unprepared_input);
}
}
loop(thread_num);
}
catch (...)
{
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
handler.onFinishThread(thread_num);
/// The last thread on the output indicates that there is no more data.
if (0 == --active_threads)
{
/// And then it processes an additional source, if there is one.
if (additional_input_at_end)
{
try
{
additional_input_at_end->readPrefix();
while (Block block = additional_input_at_end->read())
publishPayload(additional_input_at_end, block, thread_num);
}
catch (...)
{
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
}
handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called.
}
}
void loop(size_t thread_num)
{
while (!finish) /// You may need to stop work earlier than all sources run out.
{
InputData input;
/// Select the next source.
{
std::lock_guard lock(available_inputs_mutex);
/// If there are no free sources, then this thread is no longer needed. (But other threads can work with their sources.)
if (available_inputs.empty())
break;
input = available_inputs.front();
/// We remove the source from the queue of available sources.
available_inputs.pop();
}
/// The main work.
Block block = input.in->read();
{
if (finish)
break;
/// If this source is not run out yet, then put the resulting block in the ready queue.
{
std::lock_guard lock(available_inputs_mutex);
if (block)
{
available_inputs.push(input);
}
else
{
if (available_inputs.empty())
break;
}
}
if (finish)
break;
if (block)
publishPayload(input.in, block, thread_num);
}
}
}
BlockInputStreams inputs;
BlockInputStreamPtr additional_input_at_end;
unsigned max_threads;
Handler & handler;
std::shared_ptr<ThreadManager> thread_manager;
/** A set of available sources that are not currently processed by any thread.
* Each thread takes one source from this set, takes a block out of the source (at this moment the source does the calculations)
* and (if the source is not run out), puts it back into the set of available sources.
*
* The question arises what is better to use:
* - the queue (just processed source will be processed the next time later than the rest)
* - stack (just processed source will be processed as soon as possible).
*
* The stack is better than the queue when you need to do work on reading one source more consequentially,
* and theoretically, this allows you to achieve more consequent/consistent reads from the disk.
*
* But when using the stack, there is a problem with distributed query processing:
* data is read only from a part of the servers, and on the other servers
* a timeout occurs during send, and the request processing ends with an exception.
*
* Therefore, a queue is used. This can be improved in the future.
*/
using AvailableInputs = std::queue<InputData>;
AvailableInputs available_inputs;
/** For parallel preparing (readPrefix) child streams.
* First, streams are located here.
* After a stream was prepared, it is moved to "available_inputs" for reading.
*/
using UnpreparedInputs = std::queue<InputData>;
UnpreparedInputs unprepared_inputs;
/// For operations with available_inputs.
std::mutex available_inputs_mutex;
/// For operations with unprepared_inputs.
std::mutex unprepared_inputs_mutex;
/// How many sources ran out.
std::atomic<size_t> active_threads{0};
/// Finish the threads work (before the sources run out).
std::atomic<bool> finish{false};
/// Wait for the completion of all threads.
std::atomic<bool> joined_threads{false};
const LoggerPtr log;
};
} // namespace DB