Skip to content

Commit

Permalink
[Bug](sink) take turns getting data from backends to avoid dead lock (#…
Browse files Browse the repository at this point in the history
…40360)

take turns getting data from backends to avoid dead lock
  • Loading branch information
BiteTheDDDDt authored Sep 5, 2024
1 parent 1849425 commit 3c5d838
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 8 deletions.
5 changes: 0 additions & 5 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, int
: _fragment_id(id),
_is_close(false),
_is_cancelled(false),
_buffer_rows(0),
_buffer_limit(buffer_size),
_packet_num(0),
_batch_size(batch_size) {
Expand Down Expand Up @@ -135,7 +134,6 @@ Status BufferControlBlock::add_batch(RuntimeState* state,
_instance_rows_in_queue.emplace_back();
_fe_result_batch_queue.push_back(std::move(result));
}
_buffer_rows += num_rows;
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
} else {
Expand All @@ -162,7 +160,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState* state,
// TODO: merge RocordBatch, ToStructArray -> Make again

_arrow_flight_batch_queue.push_back(std::move(result));
_buffer_rows += num_rows;
_instance_rows_in_queue.emplace_back();
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
Expand All @@ -187,7 +184,6 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
// get result
std::unique_ptr<TFetchDataResult> result = std::move(_fe_result_batch_queue.front());
_fe_result_batch_queue.pop_front();
_buffer_rows -= result->result_batch.rows.size();
for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
Expand Down Expand Up @@ -228,7 +224,6 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
if (!_arrow_flight_batch_queue.empty()) {
*result = std::move(_arrow_flight_batch_queue.front());
_arrow_flight_batch_queue.pop_front();
_buffer_rows -= (*result)->num_rows();
for (auto it : _instance_rows_in_queue.front()) {
_instance_rows[it.first] -= it.second;
}
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ class BufferControlBlock {
bool _is_close;
std::atomic_bool _is_cancelled;
Status _status;
std::atomic_int _buffer_rows;
const int _buffer_limit;
int64_t _packet_num;

Expand Down
11 changes: 9 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ public class Coordinator implements CoordInterface {

private StatsErrorEstimator statsErrorEstimator;

private int receiverOffset = 0;

// A countdown latch to mark the completion of each instance.
// use for old pipeline
// instance id -> dummy value
Expand Down Expand Up @@ -1162,7 +1164,8 @@ public RowBatch getNext() throws Exception {

RowBatch resultBatch;
Status status = new Status();
resultBatch = receivers.get(receivers.size() - 1).getNext(status);
ResultReceiver receiver = receivers.get(receiverOffset);
resultBatch = receiver.getNext(status);
if (!status.ok()) {
LOG.warn("Query {} coordinator get next fail, {}, need cancel.",
DebugUtil.printId(queryId), status.getErrorMsg());
Expand Down Expand Up @@ -1209,7 +1212,7 @@ public RowBatch getNext() throws Exception {
}

if (resultBatch.isEos()) {
receivers.remove(receivers.size() - 1);
receivers.remove(receiver);
if (receivers.isEmpty()) {
returnedAllResults = true;
} else {
Expand All @@ -1227,6 +1230,10 @@ public RowBatch getNext() throws Exception {
}
}

if (!returnedAllResults) {
receiverOffset += 1;
receiverOffset %= receivers.size();
}
return resultBatch;
}

Expand Down

0 comments on commit 3c5d838

Please sign in to comment.