Skip to content

Commit

Permalink
[fix](group commit) fix group commit insert rpc may stuck (#39391)
Browse files Browse the repository at this point in the history
1. when exec_plan_fragment failed and PipelineFragmentContext is not
constructed, the group_commit_insert rpc will stuck
2. the `LoadBlockQueue.add_block.back_pressure_time_out` debug point is
not work
  • Loading branch information
mymeiyi authored Aug 15, 2024
1 parent ffaca6c commit 90d92e9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 7 deletions.
2 changes: 2 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,8 @@ std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock(
template <typename Params>
Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
std::shared_ptr<QueryContext>& query_ctx) {
DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed",
{ return Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ namespace doris {
Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block, bool write_wal,
UniqueId& load_id) {
DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed",
{ return Status::InternalError("LoadBlockQueue.add_block.failed"); });
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
auto start = std::chrono::steady_clock::now();
DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", {
start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000);
});
if (UNLIKELY(runtime_state->is_cancelled())) {
return runtime_state->cancel_reason();
}
Expand Down
23 changes: 20 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2030,7 +2030,10 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont
TUniqueId load_id;
load_id.__set_hi(request->load_id().hi());
load_id.__set_lo(request->load_id().lo());
bool ret = _light_work_pool.try_offer([this, request, response, done, load_id]() {
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
std::shared_ptr<bool> is_done = std::make_shared<bool>(false);
bool ret = _light_work_pool.try_offer([this, request, response, done, load_id, lock,
is_done]() {
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
auto pipe = std::make_shared<io::StreamLoadPipe>(
Expand All @@ -2044,7 +2047,13 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont
request->exec_plan_fragment_request().request(),
request->exec_plan_fragment_request().version(),
request->exec_plan_fragment_request().compact(),
[&, response, done, load_id](RuntimeState* state, Status* status) {
[&, response, done, load_id, lock, is_done](RuntimeState* state,
Status* status) {
std::lock_guard<std::mutex> lock1(*lock);
if (*is_done) {
return;
}
*is_done = true;
brpc::ClosureGuard cb_closure_guard(done);
response->set_label(state->import_label());
response->set_txn_id(state->wal_id());
Expand All @@ -2063,11 +2072,19 @@ void PInternalService::group_commit_insert(google::protobuf::RpcController* cont
st = Status::Error(ErrorCode::INTERNAL_ERROR,
"_exec_plan_fragment_impl meet unknown error");
}
closure_guard.release();
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, load_id=" << print_id(load_id)
<< ", errmsg=" << st;
std::lock_guard<std::mutex> lock1(*lock);
if (*is_done) {
closure_guard.release();
} else {
*is_done = true;
st.to_protobuf(response->mutable_status());
_exec_env->new_load_stream_mgr()->remove(load_id);
}
} else {
closure_guard.release();
for (int i = 0; i < request->data().size(); ++i) {
std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,28 @@ suite("test_group_commit_error", "nonConcurrent") {
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}

try {
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr._get_query_ctx.failed")
sql """ set group_commit = async_mode """
sql """ set enable_nereids_planner = false """
sql """ insert into ${tableName} values (3, 3) """
assertTrue(false)
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}

try {
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.failed")
sql """ set group_commit = async_mode """
sql """ set enable_nereids_planner = false """
sql """ insert into ${tableName} values (4, 4) """
assertTrue(false)
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}
}

0 comments on commit 90d92e9

Please sign in to comment.