From c72834c8f62861eb1b8dee716c0f64d55b6c8fe9 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 20 Jan 2022 15:03:03 +0800 Subject: [PATCH] refactor(bulk_load): rename update_partition_status_on_remote_storage --- src/meta/meta_bulk_load_service.cpp | 48 ++++++++++++++--------------- src/meta/meta_bulk_load_service.h | 27 ++++++++-------- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index d07cfb2f64..b47bfaeb5f 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -498,7 +498,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err, handle_app_downloading(response, primary_addr); break; case bulk_load_status::BLS_DOWNLOADED: - update_partition_status_on_remote_storage( + update_partition_info_on_remote_storage( response.app_name, response.pid, bulk_load_status::BLS_INGESTING); // when app status is downloaded or ingesting, send request frequently break; @@ -616,7 +616,7 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons if (total_progress >= bulk_load_constant::PROGRESS_FINISHED) { ddebug_f( "app({}) partirion({}) download all files from remote provider succeed", app_name, pid); - update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_DOWNLOADED); + update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_DOWNLOADED); } } @@ -676,7 +676,7 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response, if (response.is_group_ingestion_finished) { ddebug_f("app({}) partition({}) ingestion files succeed", app_name, pid); decrease_app_ingestion_count(pid); - update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED); + update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED); } } @@ -806,7 +806,7 @@ void bulk_load_service::handle_app_pausing(const bulk_load_response &response, if (is_group_paused) { ddebug_f("app({}) partirion({}) pause bulk load succeed", response.app_name, pid); - update_partition_status_on_remote_storage( + update_partition_info_on_remote_storage( response.app_name, pid, bulk_load_status::BLS_PAUSED); } } @@ -897,10 +897,10 @@ void bulk_load_service::update_partition_metadata_on_remote_storage( } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::update_partition_status_on_remote_storage(const std::string &app_name, - const gpid &pid, - bulk_load_status::type new_status, - bool should_send_request) +void bulk_load_service::update_partition_info_on_remote_storage(const std::string &app_name, + const gpid &pid, + bulk_load_status::type new_status, + bool should_send_request) { zauto_write_lock l(_lock); partition_bulk_load_info pinfo = _partition_bulk_load_info[pid]; @@ -922,17 +922,16 @@ void bulk_load_service::update_partition_status_on_remote_storage(const std::str pid, dsn::enum_to_string(pinfo.status), dsn::enum_to_string(new_status)); - tasking::enqueue( - LPC_META_STATE_NORMAL, - _meta_svc->tracker(), - std::bind(&bulk_load_service::update_partition_status_on_remote_storage, - this, - app_name, - pid, - new_status, - should_send_request), - 0, - std::chrono::seconds(1)); + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::update_partition_info_on_remote_storage, + this, + app_name, + pid, + new_status, + should_send_request), + 0, + std::chrono::seconds(1)); } else { ddebug_f("app({}) partition({}) has already sync bulk load status, current_status = " "{}, new_status = {}, wait for next round", @@ -951,7 +950,7 @@ void bulk_load_service::update_partition_status_on_remote_storage(const std::str _meta_svc->get_meta_storage()->set_data( get_partition_bulk_load_path(pid), std::move(value), - std::bind(&bulk_load_service::update_partition_status_on_remote_storage_reply, + std::bind(&bulk_load_service::update_partition_info_on_remote_storage_reply, this, app_name, pid, @@ -960,7 +959,7 @@ void bulk_load_service::update_partition_status_on_remote_storage(const std::str } // ThreadPool: THREAD_POOL_META_STATE -void bulk_load_service::update_partition_status_on_remote_storage_reply( +void bulk_load_service::update_partition_info_on_remote_storage_reply( const std::string &app_name, const gpid &pid, bulk_load_status::type new_status, @@ -1109,7 +1108,7 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk new_status == bulk_load_status::BLS_CANCELED || new_status == bulk_load_status::BLS_FAILED) { for (int i = 0; i < ainfo.partition_count; ++i) { - update_partition_status_on_remote_storage( + update_partition_info_on_remote_storage( ainfo.app_name, gpid(app_id, i), new_status, should_send_request); } } @@ -1842,7 +1841,7 @@ void bulk_load_service::do_continue_app_bulk_load( // _apps_in_progress_count is used for updating app bulk load, when _apps_in_progress_count = 0 // means app bulk load status can transfer to next stage, for example, when app status is // downloaded, and _apps_in_progress_count = 0, app status can turn to ingesting - // see more in function `update_partition_status_on_remote_storage_reply` + // see more in function `update_partition_info_on_remote_storage_reply` int32_t in_progress_partition_count = partition_count; if (app_status == bulk_load_status::BLS_DOWNLOADING) { if (invalid_count > 0) { @@ -1893,8 +1892,7 @@ void bulk_load_service::do_continue_app_bulk_load( app_status == bulk_load_status::BLS_DOWNLOADING) && different_count > 0) { for (auto pidx : different_status_pidx_set) { - update_partition_status_on_remote_storage( - ainfo.app_name, gpid(app_id, pidx), app_status); + update_partition_info_on_remote_storage(ainfo.app_name, gpid(app_id, pidx), app_status); } } diff --git a/src/meta/meta_bulk_load_service.h b/src/meta/meta_bulk_load_service.h index b759127a3c..010fe2b7b4 100644 --- a/src/meta/meta_bulk_load_service.h +++ b/src/meta/meta_bulk_load_service.h @@ -80,6 +80,9 @@ struct bulk_load_info /// start bulk load /// | /// v +/// remove previous bulk load info on remote storage +/// | +/// v /// is_bulk_loading = true /// | /// v @@ -98,8 +101,8 @@ struct bulk_load_info /// v v /// Failed Succeed /// | | -/// v v -/// remove bulk load info on remote storage +/// | v +/// |---> is_bulk_loading = false /// | /// v /// is_bulk_loading = false @@ -226,19 +229,19 @@ class bulk_load_service const gpid &pid, const bulk_load_metadata &metadata); - // update partition bulk load status on remote storage + // update partition bulk load info on remote storage // if should_send_request = true, will send bulk load request after update local partition // status, this parameter will be true when restarting bulk load, status will turn from paused // to downloading - void update_partition_status_on_remote_storage(const std::string &app_name, - const gpid &pid, - bulk_load_status::type new_status, - bool should_send_request = false); - - void update_partition_status_on_remote_storage_reply(const std::string &app_name, - const gpid &pid, - bulk_load_status::type new_status, - bool should_send_request); + void update_partition_info_on_remote_storage(const std::string &app_name, + const gpid &pid, + bulk_load_status::type new_status, + bool should_send_request = false); + + void update_partition_info_on_remote_storage_reply(const std::string &app_name, + const gpid &pid, + bulk_load_status::type new_status, + bool should_send_request); // update app bulk load status on remote storage void update_app_status_on_remote_storage_unlocked(int32_t app_id,