Skip to content

Commit

Permalink
[enhancement](compaction) Control the parallelism for urgent compacto…
Browse files Browse the repository at this point in the history
…n tasks (#37782) (#38189)

## Proposed changes

For some urgent compaction tasks, their submittion should take
parallelism into account.

Currently, we apply the control policy for data loading in specific.
Other source of urgent tasks are considered as eager.
  • Loading branch information
TangSiyang2001 authored Jul 22, 2024
1 parent ce1c268 commit 7319912
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <sstream>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -1586,7 +1587,7 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
if (tablet->exceed_version_limit(config::max_tablet_version_num * 2 / 3) &&
published_count % 20 == 0) {
auto st = _engine.submit_compaction_task(
tablet, CompactionType::CUMULATIVE_COMPACTION, true);
tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
if (!st.ok()) [[unlikely]] {
LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
<< ", published=" << published_count << " : " << st;
Expand Down
41 changes: 35 additions & 6 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,12 +854,12 @@ int StorageEngine::_get_executing_compaction_num(
return num;
}

bool need_generate_compaction_tasks(int count, int thread_per_disk, CompactionType compaction_type,
bool all_base) {
if (count >= thread_per_disk) {
bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
CompactionType compaction_type, bool all_base) {
if (task_cnt_per_disk >= thread_per_disk) {
// Return if no available slot
return false;
} else if (count >= thread_per_disk - 1) {
} else if (task_cnt_per_disk >= thread_per_disk - 1) {
// Only one slot left, check if it can be assigned to base compaction task.
if (compaction_type == CompactionType::BASE_COMPACTION) {
if (all_base) {
Expand Down Expand Up @@ -912,7 +912,7 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
copied_cumu_map = _tablet_submitted_cumu_compaction;
copied_base_map = _tablet_submitted_base_compaction;
}
for (auto data_dir : data_dirs) {
for (auto* data_dir : data_dirs) {
bool need_pick_tablet = true;
// We need to reserve at least one Slot for cumulative compaction.
// So when there is only one Slot, we have to judge whether there is a cumulative compaction
Expand Down Expand Up @@ -1091,7 +1091,36 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
}

Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
bool force) {
bool force, bool eager) {
if (!eager) {
DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::CUMULATIVE_COMPACTION);
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map;
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map;
{
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
copied_cumu_map = _tablet_submitted_cumu_compaction;
copied_base_map = _tablet_submitted_base_compaction;
}
auto stores = get_stores();

auto busy_pred = [&copied_cumu_map, &copied_base_map, compaction_type,
this](auto* data_dir) {
int count = _get_executing_compaction_num(copied_base_map[data_dir]) +
_get_executing_compaction_num(copied_cumu_map[data_dir]);
int paral = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk
: config::compaction_task_num_per_disk;
bool all_base = copied_cumu_map[data_dir].empty();
return need_generate_compaction_tasks(count, paral, compaction_type, all_base);
};

bool is_busy = std::none_of(stores.begin(), stores.end(), busy_pred);
if (is_busy) {
LOG_EVERY_N(WARNING, 100)
<< "Too busy to submit a compaction task, tablet=" << tablet->get_table_id();
return Status::OK();
}
}
_update_cumulative_compaction_policy();
// alter table tableName set ("compaction_policy"="time_series")
// if atler table's compaction policy, we need to modify tablet compaction policy shared ptr
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class StorageEngine {
void check_cumulative_compaction_config();

Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
bool force);
bool force, bool eager = true);
Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
SegCompactionCandidatesSharedPtr segments);

Expand Down

0 comments on commit 7319912

Please sign in to comment.