Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelInputsProcessor should be able to parallelize additional_input_at_end #5263

Closed
gengliqi opened this issue Jun 30, 2022 · 0 comments · Fixed by #5274
Closed

ParallelInputsProcessor should be able to parallelize additional_input_at_end #5263

gengliqi opened this issue Jun 30, 2022 · 0 comments · Fixed by #5274
Assignees
Labels
type/enhancement The issue or PR belongs to an enhancement.

Comments

@gengliqi
Copy link
Contributor

gengliqi commented Jun 30, 2022

Enhancement

Consider the TPC-H sql 13:

select
        c_custkey,
        count(o_orderkey) as c_count
from
        customer left outer join orders on
                c_custkey = o_custkey
group by
        c_custkey;

Execution plan:

+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                         | estRows    | task         | access object  | operator info                                                                                                                                      |
+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| TableReader_61                             | 748800.00  | root         |                | data:ExchangeSender_60                                                                                                                             |
| └─ExchangeSender_60                        | 748800.00  | mpp[tiflash] |                | ExchangeType: PassThrough                                                                                                                          |
|   └─Projection_8                           | 748800.00  | mpp[tiflash] |                | sf10.customer.c_custkey, Column#18                                                                                                                 |
|     └─Projection_55                        | 748800.00  | mpp[tiflash] |                | Column#18, sf10.customer.c_custkey                                                                                                                 |
|       └─HashAgg_53                         | 748800.00  | mpp[tiflash] |                | group by:sf10.customer.c_custkey, funcs:count(sf10.orders.o_orderkey)->Column#18, funcs:firstrow(sf10.customer.c_custkey)->sf10.customer.c_custkey |
|         └─HashJoin_38                      | 7512019.23 | mpp[tiflash] |                | left outer join, equal:[eq(sf10.customer.c_custkey, sf10.orders.o_custkey)]                                                                        |
|           ├─ExchangeReceiver_24(Build)     | 750000.00  | mpp[tiflash] |                |                                                                                                                                                    |
|           │ └─ExchangeSender_23            | 750000.00  | mpp[tiflash] |                | ExchangeType: HashPartition, Hash Cols: [name: sf10.customer.c_custkey, collate: binary]                                                           |
|           │   └─TableFullScan_22           | 750000.00  | mpp[tiflash] | table:customer | keep order:false                                                                                                                                   |
|           └─ExchangeReceiver_27(Probe)     | 7500000.00 | mpp[tiflash] |                |                                                                                                                                                    |
|             └─ExchangeSender_26            | 7500000.00 | mpp[tiflash] |                | ExchangeType: HashPartition, Hash Cols: [name: sf10.orders.o_custkey, collate: binary]                                                             |
|               └─TableFullScan_25           | 7500000.00 | mpp[tiflash] | table:orders   | keep order:false                                                                                                                                   |
+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+

The customer table is considered as hash table and the orders table is considered as probe table.

After the probe phase is done, due to left join, some rows that are not used in customer hash table should also be outputted, which is handled by NonJoinedBlockInputStream. Also, there are several NonJoinedBlockInputStream used to speed up this work.

The aggregation(count(o_orderkey)) is handled by ParallelAggregatingBlockInputStream. It uses a ParallelInputsProcessor to parallelize the aggregation operation.

Then we can describe what's the problem.

/// The last thread on the output indicates that there is no more data.
if (0 == --active_threads)
{
/// And then it processes an additional source, if there is one.
if (additional_input_at_end)
{
try
{
additional_input_at_end->readPrefix();
while (Block block = additional_input_at_end->read())
publishPayload(additional_input_at_end, block, thread_num);
}
catch (...)
{
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
}
handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called.
}

The additional_input_at_end is only one input stream and it is processed by the last thread to finish the task. It's typically used for NonJoinedBlockInputStream because it must be processed after all probe tasks are done.

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
{
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
stream_with_non_joined_data,
params,
context.getFileProvider(),
true,
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());
pipeline.streams.resize(1);
// should record for agg before restore concurrency. See #3804.
recordProfileStreams(pipeline, query_block.aggregation_name);
restorePipelineConcurrency(pipeline);
}

Several input streams(in the above case, is NonJoinedBlockInputStream) are combined into only one input stream.

So the data from several NonJoinedBlockInputStream can only be aggregated in one thread.
The larger the amount of data, the slower the speed.

@gengliqi gengliqi added the type/enhancement The issue or PR belongs to an enhancement. label Jun 30, 2022
@gengliqi gengliqi self-assigned this Jun 30, 2022
ti-chi-bot pushed a commit that referenced this issue Jul 8, 2022
SeaRise pushed a commit to SeaRise/tiflash that referenced this issue Jul 8, 2022
Lloyd-Pottiger pushed a commit to Lloyd-Pottiger/tiflash that referenced this issue Jul 12, 2022
…s in README (pingcap#5182)

close pingcap#5172, ref pingcap#5178

Enhancement: add a integrated test on DDL module (pingcap#5130)

ref pingcap#5129

Revert "Revise default background threads size" (pingcap#5176)

close pingcap#5177

chore: remove extra dyn cast (pingcap#5186)

close pingcap#5185

Add MPPReceiverSet, which includes ExchangeReceiver and CoprocessorReader (pingcap#5175)

ref pingcap#5095

DDL: Use Column Name Instead of Offset to Find the common handle cluster index (pingcap#5166)

close pingcap#5154

Add random failpoint in critical paths (pingcap#4876)

close pingcap#4807

Segment test framework (pingcap#5150)

close pingcap#5151

optimize ps v3 restore (pingcap#5163)

ref pingcap#4914

Fix build failed (pingcap#5196)

close pingcap#5195

feat: delta tree dispatching (pingcap#5199)

close pingcap#5200

feat: introduce specialized API to write fixed length data rapidly (pingcap#5181)

close pingcap#5183

Add gtest for Limit, TopN, Projection (pingcap#5187) (pingcap#5188)

close pingcap#5187

add `MPPTask::handleError()` (pingcap#5202)

ref pingcap#5095

Check result of starting grpc server (pingcap#5257)

close pingcap#5255

feat: add optimized routines for aarch64 (pingcap#5231)

close pingcap#5240

fix: aarch64-quick-fix (pingcap#5259)

close pingcap#5260

Update client-c to support ipv6 (pingcap#5270)

close pingcap#5247

upgrade prometheus-cpp to v1.0.1 (pingcap#5279)

ref pingcap#2103, close pingcap#5278

Fix README type error (pingcap#5273)

ref pingcap#5178

fix(cmake): make sure libc++ is utilized by tiflash-proxy (pingcap#5281)

close pingcap#5282

fix the wrong order of execution summary for list based executors (pingcap#5242)

close pingcap#5241

Schema: allow loading empty schema diff when the version grows up. (pingcap#5245)

close pingcap#5244

Optimize apply speed under heavy write pressure (pingcap#4883)

ref pingcap#4728

update proxy to raftstore-proxy-6.2 (pingcap#5287)

ref pingcap#4982

Flush segment cache when doing the compaction (pingcap#5284)

close pingcap#5179

metrics: Fix incorrect metrics for delta_merge tasks (pingcap#5061)

close pingcap#5055

dep: upgrade jemalloc (pingcap#5197)

close pingcap#5258

*: TiFlash pagectl/dttool use only-decryption mode (pingcap#5271)

close pingcap#5122

suppresion false positive report from tsan (pingcap#5303)

close pingcap#5088

Refine test framework code and tests (pingcap#5261)

close pingcap#5262

feat: add logical cpu cores and memory into grafana (pingcap#5124)

close pingcap#3821

Implement TimeToSec function push down (pingcap#5235)

close pingcap#5116

feat: implement shiftRight function push down (pingcap#5156)

close pingcap#5100

schema : make update to partition tables when 'set tiflash replica' (pingcap#5267)

close pingcap#5266

Replace initializer_list with vector for planner test framework (pingcap#5307)

close pingcap#5295

KVStore: decouple flush region and CompactLog with a new FFI fn_try_flush_data (pingcap#5283)

ref pingcap#5170

refine error message in mpptask (pingcap#5304)

ref pingcap#5095

Implement ReverseUTF8/Reverse function push down (pingcap#5233)

close pingcap#5111

Optimize comparision for collation `UTF8_BIN` and `UTF8MB4_BIN` (pingcap#5299)

ref pingcap#5294

feat : support set tiflash mode ddl action (pingcap#5256)

ref pingcap#5252

Add non-blocking functions for MPMCQueue (pingcap#5311)

close pingcap#5310

add random segment test for CI weekly (pingcap#5300)

close pingcap#5301

*: tidy FunctionString.cpp (pingcap#5312)

close pingcap#5313

ci: fix check-license github action (pingcap#5318)

close pingcap#5317

update proxy to raftstore-proxy-6.2 (pingcap#5316)

ref pingcap#4982

Change one `additional_input_at_end` to many streams in `ParallelInputsProcessor`  (pingcap#5274)

close pingcap#4856, close pingcap#5263

support fine grained shuffle for window function (pingcap#5048)

close pingcap#5142

feat: pushdown get_format into TiFlash (pingcap#5269)

close pingcap#5115

fix: format throw data truncated error (pingcap#5272)

close pingcap#4891

Print content of columns for gtest (pingcap#5243)

close pingcap#5203

*: also enable O3 for aarch64 (pingcap#5338)

close pingcap#5342

Add debug image build target for CentOS7 (pingcap#5344)

close pingcap#5343

*: mini refactor (pingcap#5326)

close pingcap#4739

Refactor initialize of background pool (pingcap#5190)

close pingcap#5189

delete copy/move ctor of MPMCQueue explicitly (pingcap#5328)

close pingcap#5329

Introduce proxy_server and new-mock-engine-store (pingcap#5319)

ref pingcap#5170

fix: incorrect uptime in grafana panel

Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
Lloyd-Pottiger pushed a commit to Lloyd-Pottiger/tiflash that referenced this issue Jul 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant