Skip to content

Commit

Permalink
[fix](pipeline) Fix mem control in local exchanger (#38885)
Browse files Browse the repository at this point in the history
## Proposed changes

If a block (>128M) is dequeue by local exchange source operator and it
is the last block, both of source operators and sink operators will be
hang. This PR fixed it.

<!--Describe your changes.-->
  • Loading branch information
Gabriel39 authored Aug 6, 2024
1 parent a7e586f commit 0ff1ca5
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -859,13 +859,13 @@ struct LocalExchangeSharedState : public BasicSharedState {
}

void add_total_mem_usage(size_t delta) {
if (mem_usage.fetch_add(delta) > config::local_exchange_buffer_mem_limit) {
if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) {
sink_deps.front()->block();
}
}

void sub_total_mem_usage(size_t delta) {
if (mem_usage.fetch_sub(delta) <= config::local_exchange_buffer_mem_limit) {
if (mem_usage.fetch_sub(delta) - delta <= config::local_exchange_buffer_mem_limit) {
sink_deps.front()->set_ready();
}
}
Expand Down

0 comments on commit 0ff1ca5

Please sign in to comment.