Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor(bulk_load): rename update_partition_status_on_remote_storage #1031

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
handle_app_downloading(response, primary_addr);
break;
case bulk_load_status::BLS_DOWNLOADED:
update_partition_status_on_remote_storage(
update_partition_info_on_remote_storage(
response.app_name, response.pid, bulk_load_status::BLS_INGESTING);
// when app status is downloaded or ingesting, send request frequently
break;
Expand Down Expand Up @@ -616,7 +616,7 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons
if (total_progress >= bulk_load_constant::PROGRESS_FINISHED) {
ddebug_f(
"app({}) partirion({}) download all files from remote provider succeed", app_name, pid);
update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_DOWNLOADED);
update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_DOWNLOADED);
}
}

Expand Down Expand Up @@ -676,7 +676,7 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
if (response.is_group_ingestion_finished) {
ddebug_f("app({}) partition({}) ingestion files succeed", app_name, pid);
decrease_app_ingestion_count(pid);
update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED);
update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED);
}
}

Expand Down Expand Up @@ -806,7 +806,7 @@ void bulk_load_service::handle_app_pausing(const bulk_load_response &response,

if (is_group_paused) {
ddebug_f("app({}) partirion({}) pause bulk load succeed", response.app_name, pid);
update_partition_status_on_remote_storage(
update_partition_info_on_remote_storage(
response.app_name, pid, bulk_load_status::BLS_PAUSED);
}
}
Expand Down Expand Up @@ -897,10 +897,10 @@ void bulk_load_service::update_partition_metadata_on_remote_storage(
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::update_partition_status_on_remote_storage(const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
bool should_send_request)
void bulk_load_service::update_partition_info_on_remote_storage(const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
bool should_send_request)
{
zauto_write_lock l(_lock);
partition_bulk_load_info pinfo = _partition_bulk_load_info[pid];
Expand All @@ -922,17 +922,16 @@ void bulk_load_service::update_partition_status_on_remote_storage(const std::str
pid,
dsn::enum_to_string(pinfo.status),
dsn::enum_to_string(new_status));
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::update_partition_status_on_remote_storage,
this,
app_name,
pid,
new_status,
should_send_request),
0,
std::chrono::seconds(1));
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::update_partition_info_on_remote_storage,
this,
app_name,
pid,
new_status,
should_send_request),
0,
std::chrono::seconds(1));
} else {
ddebug_f("app({}) partition({}) has already sync bulk load status, current_status = "
"{}, new_status = {}, wait for next round",
Expand All @@ -951,7 +950,7 @@ void bulk_load_service::update_partition_status_on_remote_storage(const std::str
_meta_svc->get_meta_storage()->set_data(
get_partition_bulk_load_path(pid),
std::move(value),
std::bind(&bulk_load_service::update_partition_status_on_remote_storage_reply,
std::bind(&bulk_load_service::update_partition_info_on_remote_storage_reply,
this,
app_name,
pid,
Expand All @@ -960,7 +959,7 @@ void bulk_load_service::update_partition_status_on_remote_storage(const std::str
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::update_partition_status_on_remote_storage_reply(
void bulk_load_service::update_partition_info_on_remote_storage_reply(
const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
Expand Down Expand Up @@ -1109,7 +1108,7 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk
new_status == bulk_load_status::BLS_CANCELED ||
new_status == bulk_load_status::BLS_FAILED) {
for (int i = 0; i < ainfo.partition_count; ++i) {
update_partition_status_on_remote_storage(
update_partition_info_on_remote_storage(
ainfo.app_name, gpid(app_id, i), new_status, should_send_request);
}
}
Expand Down Expand Up @@ -1842,7 +1841,7 @@ void bulk_load_service::do_continue_app_bulk_load(
// _apps_in_progress_count is used for updating app bulk load, when _apps_in_progress_count = 0
// means app bulk load status can transfer to next stage, for example, when app status is
// downloaded, and _apps_in_progress_count = 0, app status can turn to ingesting
// see more in function `update_partition_status_on_remote_storage_reply`
// see more in function `update_partition_info_on_remote_storage_reply`
int32_t in_progress_partition_count = partition_count;
if (app_status == bulk_load_status::BLS_DOWNLOADING) {
if (invalid_count > 0) {
Expand Down Expand Up @@ -1893,8 +1892,7 @@ void bulk_load_service::do_continue_app_bulk_load(
app_status == bulk_load_status::BLS_DOWNLOADING) &&
different_count > 0) {
for (auto pidx : different_status_pidx_set) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, pidx), app_status);
update_partition_info_on_remote_storage(ainfo.app_name, gpid(app_id, pidx), app_status);
}
}

Expand Down
27 changes: 15 additions & 12 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ struct bulk_load_info
/// start bulk load
/// |
/// v
/// remove previous bulk load info on remote storage
/// |
/// v
/// is_bulk_loading = true
/// |
/// v
Expand All @@ -98,8 +101,8 @@ struct bulk_load_info
/// v v
/// Failed Succeed
/// | |
/// v v
/// remove bulk load info on remote storage
/// | v
/// |---> is_bulk_loading = false
/// |
/// v
/// is_bulk_loading = false
Expand Down Expand Up @@ -226,19 +229,19 @@ class bulk_load_service
const gpid &pid,
const bulk_load_metadata &metadata);

// update partition bulk load status on remote storage
// update partition bulk load info on remote storage
// if should_send_request = true, will send bulk load request after update local partition
// status, this parameter will be true when restarting bulk load, status will turn from paused
// to downloading
void update_partition_status_on_remote_storage(const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
bool should_send_request = false);

void update_partition_status_on_remote_storage_reply(const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
bool should_send_request);
void update_partition_info_on_remote_storage(const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
bool should_send_request = false);

void update_partition_info_on_remote_storage_reply(const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
bool should_send_request);

// update app bulk load status on remote storage
void update_app_status_on_remote_storage_unlocked(int32_t app_id,
Expand Down