From bf115800abbed85d8126e67d15d146e72dbf81bf Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Wed, 27 Nov 2024 20:36:20 +0800 Subject: [PATCH 1/3] fix(bulkload) Sometime ingest will hang when encountering write throttling Resolve https://github.com/apache/incubator-pegasus/issues/2156 Due to the flag _is_bulk_load_ingestion allowing only one RPC_RRDB_RRDB_BULK_LOAD RPC call, the throttling_controller::DELAY recall of on_client_write cannot execute RPC_RRDB_RRDB_BULK_LOAD again. --- src/replica/replica_2pc.cpp | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 884c5c4dc3..9d88594326 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -188,6 +188,16 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } + if (static_cast(_primary_states.pc.hp_secondaries.size()) + 1 < + _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) { + response_client_write(request, ERR_NOT_ENOUGH_MEMBER); + return; + } + + if (!ignore_throttling && throttle_write_request(request)) { + return; + } + if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) { auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status(); if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED && @@ -209,15 +219,6 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) _bulk_load_ingestion_start_time_ms = dsn_now_ms(); } - if (static_cast(_primary_states.pc.hp_secondaries.size()) + 1 < - _options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) { - response_client_write(request, ERR_NOT_ENOUGH_MEMBER); - return; - } - - if (!ignore_throttling && throttle_write_request(request)) { - return; - } LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address); auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this); From 5243ff3ba6f414044d9fbc3c7c5c91752767b102 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Wed, 27 Nov 2024 20:56:18 +0800 Subject: [PATCH 2/3] follow clang-format --- src/replica/replica_2pc.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 9d88594326..0f9256885d 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -219,7 +219,6 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) _bulk_load_ingestion_start_time_ms = dsn_now_ms(); } - LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address); auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this); if (mu) { From aed4bf938691ec680d3d35173b6a84b6436c6cc9 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Thu, 28 Nov 2024 09:38:38 +0800 Subject: [PATCH 3/3] follow clang-tidy --- src/replica/replica_2pc.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 0f9256885d..f0c108b7fe 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -221,7 +221,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) LOG_DEBUG_PREFIX("got write request from {}", request->header->from_address); auto mu = _primary_states.write_queue.add_work(request->rpc_code(), request, this); - if (mu) { + if (mu != nullptr) { init_prepare(mu, false); } }