From 3c5d838ef505fd8af9cab7d209051a6fa47842ff Mon Sep 17 00:00:00 2001 From: Pxl Date: Thu, 5 Sep 2024 11:31:56 +0800 Subject: [PATCH] [Bug](sink) take turns getting data from backends to avoid dead lock (#40360) take turns getting data from backends to avoid dead lock --- be/src/runtime/buffer_control_block.cpp | 5 ----- be/src/runtime/buffer_control_block.h | 1 - .../main/java/org/apache/doris/qe/Coordinator.java | 11 +++++++++-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index be03db04d67f0c..61ea5ef080de5f 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -97,7 +97,6 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int : _fragment_id(id), _is_close(false), _is_cancelled(false), - _buffer_rows(0), _buffer_limit(buffer_size), _packet_num(0), _batch_size(batch_size) { @@ -135,7 +134,6 @@ Status BufferControlBlock::add_batch(RuntimeState* state, _instance_rows_in_queue.emplace_back(); _fe_result_batch_queue.push_back(std::move(result)); } - _buffer_rows += num_rows; _instance_rows[state->fragment_instance_id()] += num_rows; _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; } else { @@ -162,7 +160,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state, // TODO: merge RocordBatch, ToStructArray -> Make again _arrow_flight_batch_queue.push_back(std::move(result)); - _buffer_rows += num_rows; _instance_rows_in_queue.emplace_back(); _instance_rows[state->fragment_instance_id()] += num_rows; _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; @@ -187,7 +184,6 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { // get result std::unique_ptr result = std::move(_fe_result_batch_queue.front()); _fe_result_batch_queue.pop_front(); - _buffer_rows -= result->result_batch.rows.size(); for (auto it : _instance_rows_in_queue.front()) { _instance_rows[it.first] -= it.second; } @@ -228,7 +224,6 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr* if (!_arrow_flight_batch_queue.empty()) { *result = std::move(_arrow_flight_batch_queue.front()); _arrow_flight_batch_queue.pop_front(); - _buffer_rows -= (*result)->num_rows(); for (auto it : _instance_rows_in_queue.front()) { _instance_rows[it.first] -= it.second; } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index d8bb6e0f5061f7..8b45552b2fadb1 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -113,7 +113,6 @@ class BufferControlBlock { bool _is_close; std::atomic_bool _is_cancelled; Status _status; - std::atomic_int _buffer_rows; const int _buffer_limit; int64_t _packet_num; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 0cbe3eebbab9f1..6be430373687cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -285,6 +285,8 @@ public class Coordinator implements CoordInterface { private StatsErrorEstimator statsErrorEstimator; + private int receiverOffset = 0; + // A countdown latch to mark the completion of each instance. // use for old pipeline // instance id -> dummy value @@ -1162,7 +1164,8 @@ public RowBatch getNext() throws Exception { RowBatch resultBatch; Status status = new Status(); - resultBatch = receivers.get(receivers.size() - 1).getNext(status); + ResultReceiver receiver = receivers.get(receiverOffset); + resultBatch = receiver.getNext(status); if (!status.ok()) { LOG.warn("Query {} coordinator get next fail, {}, need cancel.", DebugUtil.printId(queryId), status.getErrorMsg()); @@ -1209,7 +1212,7 @@ public RowBatch getNext() throws Exception { } if (resultBatch.isEos()) { - receivers.remove(receivers.size() - 1); + receivers.remove(receiver); if (receivers.isEmpty()) { returnedAllResults = true; } else { @@ -1227,6 +1230,10 @@ public RowBatch getNext() throws Exception { } } + if (!returnedAllResults) { + receiverOffset += 1; + receiverOffset %= receivers.size(); + } return resultBatch; }