Skip to content

Commit

Permalink
[fix](pipeline) Fix mem control in local exchanger (#38885) (#38910)
Browse files Browse the repository at this point in the history
If a block (>128M) is dequeue by local exchange source operator and it
is the last block, both of source operators and sink operators will be
hang. This PR fixed it.

pick #38885
  • Loading branch information
Gabriel39 authored Aug 6, 2024
1 parent ba5c6fb commit 28c0510
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -788,13 +788,13 @@ struct LocalExchangeSharedState : public BasicSharedState {
}

void add_total_mem_usage(size_t delta) {
if (mem_usage.fetch_add(delta) > config::local_exchange_buffer_mem_limit) {
if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) {
sink_deps.front()->block();
}
}

void sub_total_mem_usage(size_t delta) {
if (mem_usage.fetch_sub(delta) <= config::local_exchange_buffer_mem_limit) {
if (mem_usage.fetch_sub(delta) - delta <= config::local_exchange_buffer_mem_limit) {
sink_deps.front()->set_ready();
}
}
Expand Down

0 comments on commit 28c0510

Please sign in to comment.