Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk_load): remove bulk load request short interval avoid unnecessary timeout #959

Merged
merged 2 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_re

const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;
const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata");
const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR("bulk_load");
const int32_t bulk_load_constant::PROGRESS_FINISHED = 100;
Expand Down
1 change: 0 additions & 1 deletion src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ class bulk_load_constant
public:
static const std::string BULK_LOAD_INFO;
static const int32_t BULK_LOAD_REQUEST_INTERVAL;
static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
static const std::string BULK_LOAD_METADATA;
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
static const int32_t PROGRESS_FINISHED;
Expand Down
21 changes: 8 additions & 13 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,14 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
const rpc_address &primary_addr = request.primary_addr;
int32_t interval = bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL;

if (err != ERR_OK) {
derror_f("app({}), partition({}) failed to receive bulk load response, error = {}",
app_name,
pid,
err.to_string());
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
try_resend_bulk_load_request(app_name, pid);
return;
}

Expand All @@ -375,7 +374,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
primary_addr.to_string(),
response.err.to_string());
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
try_resend_bulk_load_request(app_name, pid);
return;
}

Expand All @@ -385,7 +384,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
primary_addr.to_string(),
app_name,
pid);
try_resend_bulk_load_request(app_name, pid, interval);
try_resend_bulk_load_request(app_name, pid);
return;
}

Expand All @@ -397,7 +396,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
response.err.to_string(),
dsn::enum_to_string(response.primary_bulk_load_status));
handle_bulk_load_failed(pid.get_app_id());
try_resend_bulk_load_request(app_name, pid, interval);
try_resend_bulk_load_request(app_name, pid);
return;
}

Expand All @@ -423,7 +422,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
request.ballot,
current_ballot);
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid, interval);
try_resend_bulk_load_request(app_name, pid);
return;
}

Expand All @@ -437,11 +436,9 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
update_partition_status_on_remote_storage(
response.app_name, response.pid, bulk_load_status::BLS_INGESTING);
// when app status is downloaded or ingesting, send request frequently
interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL;
break;
case bulk_load_status::BLS_INGESTING:
handle_app_ingestion(response, primary_addr);
interval = bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL;
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_FAILED:
Expand All @@ -459,13 +456,11 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
break;
}

try_resend_bulk_load_request(app_name, pid, interval);
try_resend_bulk_load_request(app_name, pid);
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name,
const gpid &pid,
const int32_t interval)
void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name, const gpid &pid)
{
FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](dsn::string_view) {});
zauto_read_lock l(_lock);
Expand All @@ -474,7 +469,7 @@ void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid),
0,
std::chrono::seconds(interval));
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ class bulk_load_service
const bulk_load_response &response);

// if app is still in bulk load, resend bulk_load_request to primary after interval seconds
void try_resend_bulk_load_request(const std::string &app_name,
const gpid &pid,
const int32_t interval);
void try_resend_bulk_load_request(const std::string &app_name, const gpid &pid);

void handle_app_downloading(const bulk_load_response &response,
const rpc_address &primary_addr);
Expand Down