Skip to content

Commit

Permalink
RCQ wakup actively when resource group is in trickle mode (#8388) (#8423
Browse files Browse the repository at this point in the history
)

close #8391
  • Loading branch information
ti-chi-bot authored Dec 7, 2023
1 parent 91ec283 commit 7ce9ad7
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 73 deletions.
8 changes: 7 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,13 @@ namespace DB
F(type_total_consumption, {"type", "total_consumption"}), \
F(type_bucket_fill_rate, {"type", "bucket_fill_rate"}), \
F(type_bucket_capacity, {"type", "bucket_capacity"}), \
F(type_fetch_tokens_from_gac_count, {"type", "fetch_tokens_from_gac_count"}))
F(type_compute_ru_consumption, {"type", "compute_ru_consumption"}), \
F(type_storage_ru_consumption, {"type", "storage_ru_consumption"}), \
F(type_compute_ru_exhausted, {"type", "compute_ru_exhausted"}), \
F(type_gac_req_acquire_tokens, {"type", "gac_req_acquire_tokens"}), \
F(type_gac_req_ru_consumption_delta, {"type", "gac_req_ru_consumption_delta"}), \
F(type_gac_resp_tokens, {"type", "gac_resp_tokens"}), \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ bool ResourceControlQueue<NestedTaskQueueType>::take(TaskPtr & task)
if unlikely (updateResourceGroupInfosWithoutLock())
continue;

UInt64 wait_dura = LocalAdmissionController::DEFAULT_FETCH_GAC_INTERVAL_MS;
if (!resource_group_infos.empty())
{
const ResourceGroupInfo & group_info = resource_group_infos.top();
Expand All @@ -136,13 +137,15 @@ bool ResourceControlQueue<NestedTaskQueueType>::take(TaskPtr & task)
mustTakeTask(group_info.task_queue, task);
return true;
}
wait_dura = LocalAdmissionController::global_instance->estWaitDuraMS(group_info.name);
}

assert(!task);
// Wakeup when:
// 1. finish() is called.
// 2. refill_token_callback is called by LAC.
cv.wait(lock);
// 3. token refilled in trickle mode.
cv.wait_for(lock, std::chrono::milliseconds(wait_dura));
}
}

Expand All @@ -153,7 +156,7 @@ void ResourceControlQueue<NestedTaskQueueType>::updateStatistics(const TaskPtr &
auto ru = cpuTimeToRU(inc_value);
const String & name = task->getResourceGroupName();
LOG_TRACE(logger, "resource group {} will consume {} RU(or {} cpu time in ns)", name, ru, inc_value);
LocalAdmissionController::global_instance->consumeResource(name, ru, inc_value);
LocalAdmissionController::global_instance->consumeCPUResource(name, ru, inc_value);
}

template <typename NestedTaskQueueType>
Expand Down
56 changes: 30 additions & 26 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,13 @@ void LocalAdmissionController::startBackgroudJob()
}
LOG_INFO(log, "get unique_client_id succeed: {}", unique_client_id);

auto last_metric_time_point = SteadyClock::now();
while (!stopped.load())
{
bool fetch_token_periodically = false;

{
std::unique_lock<std::mutex> lock(mu);

auto now = SteadyClock::now();
if (now - last_metric_time_point >= COLLECT_METRIC_INTERVAL)
{
last_metric_time_point = now;
for (const auto & resource_group : resource_groups)
resource_group.second->collectMetrics();
}

if (low_token_resource_groups.empty())
{
fetch_token_periodically = true;
Expand Down Expand Up @@ -181,25 +172,24 @@ std::optional<LocalAdmissionController::AcquireTokenInfo> LocalAdmissionControll
return;

// To avoid periodically_token_fetch after low_token_fetch immediately
if (is_periodically_fetch && !resource_group->needFetchTokenPeridically(now, DEFAULT_FETCH_GAC_INTERVAL))
if (is_periodically_fetch && !resource_group->needFetchToken(now, DEFAULT_FETCH_GAC_INTERVAL))
return;

// During trickle mode, no need to fetch tokens from GAC.
if (resource_group->inTrickleModeLease(now))
return;

acquire_tokens = resource_group->getAcquireRUNum(
consumption_update_info.speed,
DEFAULT_FETCH_GAC_INTERVAL.count(),
ACQUIRE_RU_AMPLIFICATION);

if (acquire_tokens == 0.0 && token_consumption == 0.0 && resource_group->trickleModeLeaseExpire(now))
if (resource_group->trickleModeLeaseExpire(now))
{
// If acquire_tokens and token_consumption are both zero, will ignore send RPC to GAC.
// But we need to make sure trickle mode should exit timely, which needs to talk with GAC.
// So we force acquire 1RU.
LOG_DEBUG(log, "force acquire 1RU because of try to exit trickle mode");
acquire_tokens = 1.0;
acquire_tokens
= consumption_update_info.speed * DEFAULT_FETCH_GAC_INTERVAL.count() * ACQUIRE_RU_AMPLIFICATION;
}
else
{
acquire_tokens = resource_group->getAcquireRUNum(
consumption_update_info.speed,
DEFAULT_FETCH_GAC_INTERVAL.count(),
ACQUIRE_RU_AMPLIFICATION);
}

assert(acquire_tokens >= 0.0);
Expand All @@ -218,7 +208,8 @@ std::optional<LocalAdmissionController::AcquireTokenInfo> LocalAdmissionControll

void LocalAdmissionController::fetchTokensFromGAC(
const std::vector<AcquireTokenInfo> & acquire_infos,
const std::string & desc_str)
const std::string & desc_str,
bool is_final_report)
{
if (acquire_infos.empty())
{
Expand All @@ -239,19 +230,26 @@ void LocalAdmissionController::fetchTokensFromGAC(

auto * single_group_req = gac_req.add_requests();
single_group_req->set_resource_group_name(info.resource_group_name);
assert(info.acquire_tokens > 0.0 || info.ru_consumption_delta > 0.0);
if (info.acquire_tokens > 0.0)
assert(info.acquire_tokens > 0.0 || info.ru_consumption_delta > 0.0 || is_final_report);
if (info.acquire_tokens > 0.0 || is_final_report)
{
auto * ru_items = single_group_req->mutable_ru_items();
auto * req_ru = ru_items->add_request_r_u();
req_ru->set_type(resource_manager::RequestUnitType::RU);
req_ru->set_value(info.acquire_tokens);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_gac_req_acquire_tokens, info.resource_group_name)
.Set(info.acquire_tokens);
}
if (info.ru_consumption_delta > 0.0)
if (info.ru_consumption_delta > 0.0 || is_final_report)
{
single_group_req->set_is_tiflash(true);
auto * tiflash_consumption = single_group_req->mutable_consumption_since_last_request();
tiflash_consumption->set_r_r_u(info.ru_consumption_delta);
GET_RESOURCE_GROUP_METRIC(
tiflash_resource_group,
type_gac_req_ru_consumption_delta,
info.resource_group_name)
.Set(info.ru_consumption_delta);
}
}

Expand Down Expand Up @@ -351,7 +349,8 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(
continue;
}

auto resource_group = findResourceGroup(one_resp.resource_group_name());
const auto & name = one_resp.resource_group_name();
auto resource_group = findResourceGroup(name);
if (resource_group == nullptr)
continue;

Expand All @@ -370,6 +369,11 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(

int64_t capacity = granted_token_bucket.granted_tokens().settings().burst_limit();

if (added_tokens > 0)
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_gac_resp_tokens, name).Set(added_tokens);
if (capacity > 0)
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_gac_resp_capacity, name).Set(capacity);

// fill_rate should never be setted.
RUNTIME_CHECK(granted_token_bucket.granted_tokens().settings().fill_rate() == 0);

Expand Down
Loading

0 comments on commit 7ce9ad7

Please sign in to comment.