Skip to content

Commit

Permalink
[Fix](load) Fix the channel leak when close wait has been cancelled (a…
Browse files Browse the repository at this point in the history
…pache#38031)

When the close_wait is called, the NodeChannel has already been marked
as cancelled, but close_wait will set _is_closed to true. When it
actually sends a cancel request to the downstream LoadChannel, it finds
that _is_closed has already been set to true, so it will not send an RPC
request, causing a LoadChannel leak.
  • Loading branch information
liaoxin01 committed Jul 19, 2024
1 parent 53ca312 commit 9389b11
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -878,11 +878,6 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
Status VNodeChannel::close_wait(RuntimeState* state) {
DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { MemoryReclamation::process_full_gc(); });
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// set _is_closed to true finally
Defer set_closed {[&]() {
std::lock_guard<std::mutex> l(_closed_lock);
_is_closed = true;
}};

auto st = none_of({_cancelled, !_eos_is_produced});
if (!st.ok()) {
Expand All @@ -906,8 +901,8 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
_close_time_ms = UnixMillis() - _close_time_ms;

if (_cancelled || state->is_cancelled()) {
cancel(state->cancel_reason());
if (state->is_cancelled()) {
_cancel_with_msg(state->cancel_reason().to_string());
}

if (_add_batches_finished) {
Expand All @@ -919,6 +914,11 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
_index_channel->set_error_tablet_in_state(state);
_index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
_index_channel->set_tablets_filtered_rows(_tablets_filtered_rows, _node_id);

std::lock_guard<std::mutex> l(_closed_lock);
// only when normal close, we set _is_closed to true.
// otherwise, we will set it to true in cancel().
_is_closed = true;
return Status::OK();
}

Expand Down

0 comments on commit 9389b11

Please sign in to comment.