Skip to content

Commit

Permalink
[fix](compaction) fixing the inaccurate statistics of concurrent comp…
Browse files Browse the repository at this point in the history
…action tasks (#37318)

Specify the specific stages of compaction tasks and accurately count
concurrent tasks only while they are actively running
  • Loading branch information
luwei16 authored and dataroaring committed Jul 17, 2024
1 parent b4ba2d0 commit 4434b09
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 30 deletions.
4 changes: 4 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "common/status.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
Expand All @@ -43,6 +44,8 @@ struct TabletWithVersion {
int64_t version;
};

enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING };

// Base class for all tablet classes
class BaseTablet {
public:
Expand Down Expand Up @@ -301,6 +304,7 @@ class BaseTablet {
std::atomic<int64_t> write_count = 0;
std::atomic<int64_t> compaction_count = 0;

CompactionStage compaction_stage = CompactionStage::NOT_SCHEDULED;
std::mutex sample_info_lock;
std::vector<CompactionSampleInfo> sample_infos;
Status last_compaction_status = Status::OK();
Expand Down
51 changes: 34 additions & 17 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "common/sync_point.h"
#include "gen_cpp/BackendService.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/Types_constants.h"
#include "gen_cpp/internal_service.pb.h"
#include "gutil/ref_counted.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "io/fs/path.h"
#include "olap/base_tablet.h"
#include "olap/cold_data_compaction.h"
#include "olap/compaction_permit_limiter.h"
#include "olap/cumulative_compaction_policy.h"
Expand Down Expand Up @@ -603,8 +605,8 @@ void StorageEngine::_adjust_compaction_thread_num() {
void StorageEngine::_compaction_tasks_producer_callback() {
LOG(INFO) << "try to start compaction producer process!";

std::unordered_set<TTabletId> tablet_submitted_cumu;
std::unordered_set<TTabletId> tablet_submitted_base;
std::unordered_set<TabletSharedPtr> tablet_submitted_cumu;
std::unordered_set<TabletSharedPtr> tablet_submitted_base;
std::vector<DataDir*> data_dirs = get_stores();
for (auto& data_dir : data_dirs) {
_tablet_submitted_cumu_compaction[data_dir] = tablet_submitted_cumu;
Expand Down Expand Up @@ -884,6 +886,17 @@ int get_concurrent_per_disk(int max_score, int thread_per_disk) {
return thread_per_disk;
}

int StorageEngine::_get_executing_compaction_num(
std::unordered_set<TabletSharedPtr>& compaction_tasks) {
int num = 0;
for (const auto& task : compaction_tasks) {
if (task->compaction_stage == CompactionStage::EXECUTING) {
num++;
}
}
return num;
}

std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
_update_cumulative_compaction_policy();
Expand All @@ -896,8 +909,8 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(

// Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex
// when traversing the data dir
std::map<DataDir*, std::unordered_set<TTabletId>> copied_cumu_map;
std::map<DataDir*, std::unordered_set<TTabletId>> copied_base_map;
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map;
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map;
{
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
copied_cumu_map = _tablet_submitted_cumu_compaction;
Expand All @@ -910,7 +923,8 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
// in the current submitted tasks.
// If so, the last Slot can be assigned to Base compaction,
// otherwise, this Slot needs to be reserved for cumulative compaction.
int count = copied_cumu_map[data_dir].size() + copied_base_map[data_dir].size();
int count = _get_executing_compaction_num(copied_cumu_map[data_dir]) +
_get_executing_compaction_num(copied_base_map[data_dir]);
int thread_per_disk = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk
: config::compaction_task_num_per_disk;

Expand Down Expand Up @@ -973,19 +987,16 @@ bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table
bool already_existed = false;
switch (compaction_type) {
case CompactionType::CUMULATIVE_COMPACTION:
already_existed = !(_tablet_submitted_cumu_compaction[tablet->data_dir()]
.insert(tablet->tablet_id())
.second);
already_existed =
!(_tablet_submitted_cumu_compaction[tablet->data_dir()].insert(tablet).second);
break;
case CompactionType::BASE_COMPACTION:
already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()]
.insert(tablet->tablet_id())
.second);
already_existed =
!(_tablet_submitted_base_compaction[tablet->data_dir()].insert(tablet).second);
break;
case CompactionType::FULL_COMPACTION:
already_existed = !(_tablet_submitted_full_compaction[tablet->data_dir()]
.insert(tablet->tablet_id())
.second);
already_existed =
!(_tablet_submitted_full_compaction[tablet->data_dir()].insert(tablet).second);
break;
}
return already_existed;
Expand All @@ -997,13 +1008,13 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
int removed = 0;
switch (compaction_type) {
case CompactionType::CUMULATIVE_COMPACTION:
removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id());
removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet);
break;
case CompactionType::BASE_COMPACTION:
removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id());
removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet);
break;
case CompactionType::FULL_COMPACTION:
removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id());
removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet);
break;
}

Expand Down Expand Up @@ -1034,6 +1045,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
"compaction task has already been submitted, tablet_id={}, compaction_type={}.",
tablet->tablet_id(), compaction_type);
}
tablet->compaction_stage = CompactionStage::PENDING;
std::shared_ptr<CompactionMixin> compaction;
int64_t permits = 0;
Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
Expand All @@ -1048,17 +1060,21 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
: _base_compaction_thread_pool;
auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction),
compaction_type, permits, force, this]() {
tablet->compaction_stage = CompactionStage::EXECUTING;
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
tablet->execute_compaction(*compaction);
if (!force) {
_permit_limiter.release(permits);
}
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
});
if (!st.ok()) {
if (!force) {
_permit_limiter.release(permits);
}
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
return Status::InternalError(
"failed to submit compaction task to thread pool, "
"tablet_id={}, compaction_type={}.",
Expand All @@ -1067,6 +1083,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
return Status::OK();
} else {
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
if (!st.ok()) {
return Status::InternalError(
"failed to prepare compaction task and calculate permits, "
Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1404,9 +1404,9 @@ Status StorageEngine::get_compaction_status_json(std::string* result) {
rapidjson::Document arr;
arr.SetArray();

for (auto& tablet_id : it.second) {
for (auto& tablet : it.second) {
rapidjson::Value key;
const std::string& key_str = std::to_string(tablet_id);
const std::string& key_str = std::to_string(tablet->tablet_id());
key.SetString(key_str.c_str(), key_str.length(), path_obj.GetAllocator());
arr.PushBack(key, root.GetAllocator());
}
Expand All @@ -1428,9 +1428,9 @@ Status StorageEngine::get_compaction_status_json(std::string* result) {
rapidjson::Document arr;
arr.SetArray();

for (auto& tablet_id : it.second) {
for (auto& tablet : it.second) {
rapidjson::Value key;
const std::string& key_str = std::to_string(tablet_id);
const std::string& key_str = std::to_string(tablet->tablet_id());
key.SetString(key_str.c_str(), key_str.length(), path_obj2.GetAllocator());
arr.PushBack(key, root.GetAllocator());
}
Expand All @@ -1452,9 +1452,9 @@ Status StorageEngine::get_compaction_status_json(std::string* result) {
rapidjson::Document arr;
arr.SetArray();

for (auto& tablet_id : it.second) {
for (auto& tablet : it.second) {
rapidjson::Value key;
const std::string& key_str = std::to_string(tablet_id);
const std::string& key_str = std::to_string(tablet->tablet_id());
key.SetString(key_str.c_str(), key_str.length(), path_obj3.GetAllocator());
arr.PushBack(key, root.GetAllocator());
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ class StorageEngine final : public BaseStorageEngine {

int32_t _auto_get_interval_by_disk_capacity(DataDir* data_dir);

int _get_executing_compaction_num(std::unordered_set<TabletSharedPtr>& compaction_tasks);

private:
EngineOptions _options;
std::mutex _store_lock;
Expand Down Expand Up @@ -451,9 +453,9 @@ class StorageEngine final : public BaseStorageEngine {

std::mutex _tablet_submitted_compaction_mutex;
// a tablet can do base and cumulative compaction at same time
std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_cumu_compaction;
std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_base_compaction;
std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_full_compaction;
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> _tablet_submitted_cumu_compaction;
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> _tablet_submitted_base_compaction;
std::map<DataDir*, std::unordered_set<TabletSharedPtr>> _tablet_submitted_full_compaction;

std::mutex _low_priority_task_nums_mutex;
std::unordered_map<DataDir*, int32_t> _low_priority_task_nums;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {

std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score,
const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score,
const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>&
all_cumulative_compaction_policies) {
int64_t now_ms = UnixMillis();
Expand All @@ -749,7 +749,7 @@ std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction(
return;
}

auto search = tablet_submitted_compaction.find(tablet_ptr->tablet_id());
auto search = tablet_submitted_compaction.find(tablet_ptr);
if (search != tablet_submitted_compaction.end()) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TabletManager {
// single compaction tasks for the tablet.
std::vector<TabletSharedPtr> find_best_tablets_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score,
const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score,
const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>&
all_cumulative_compaction_policies);

Expand Down
132 changes: 132 additions & 0 deletions be/test/olap/compaction_task_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock-actions.h>
#include <gmock/gmock-matchers.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>

#include <filesystem>
#include <memory>

#include "common/status.h"
#include "common/sync_point.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/local_file_system.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/data_dir.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/threadpool.h"

namespace doris {
using namespace config;

class CompactionTaskTest : public testing::Test {
public:
virtual void SetUp() {
_engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
auto st = io::global_local_filesystem()->delete_directory(_engine_data_path);
ASSERT_TRUE(st.ok()) << st;
st = io::global_local_filesystem()->create_directory(_engine_data_path);
ASSERT_TRUE(st.ok()) << st;
EXPECT_TRUE(
io::global_local_filesystem()->create_directory(_engine_data_path + "/meta").ok());

EngineOptions options;
options.backend_uid = UniqueId::gen_uid();
_storage_engine = std::make_unique<StorageEngine>(options);
_data_dir = std::make_unique<DataDir>(*_storage_engine, _engine_data_path, 100000000);
static_cast<void>(_data_dir->init());
}

virtual void TearDown() {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}

std::unique_ptr<StorageEngine> _storage_engine;
std::string _engine_data_path;
std::unique_ptr<DataDir> _data_dir;
};

static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping,
int data_size) {
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_rowset_type(BETA_ROWSET); // important
rs_meta->_rowset_meta_pb.set_start_version(version.first);
rs_meta->_rowset_meta_pb.set_end_version(version.second);
rs_meta->set_num_segments(num_segments);
rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
rs_meta->set_total_disk_size(data_size);
RowsetSharedPtr rowset;
Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset);
if (!st.ok()) {
return nullptr;
}
return rowset;
}

TEST_F(CompactionTaskTest, TestSubmitCompactionTask) {
auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(2)
.set_max_threads(2)
.build(&_storage_engine->_base_compaction_thread_pool);
EXPECT_TRUE(st.OK());
st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
.set_min_threads(2)
.set_max_threads(2)
.build(&_storage_engine->_cumu_compaction_thread_pool);
EXPECT_TRUE(st.OK());

auto* sp = SyncPoint::get_instance();
sp->enable_processing();
sp->set_call_back("olap_server::execute_compaction", [](auto&& values) {
std::this_thread::sleep_for(std::chrono::seconds(10));
bool* pred = try_any_cast<bool*>(values.back());
*pred = true;
});

for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) {
TabletMetaSharedPtr tablet_meta;
tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), tablet_meta, _data_dir.get(),
CUMULATIVE_SIZE_BASED_POLICY));
st = tablet->init();
EXPECT_TRUE(st.OK());

for (int i = 2; i < 30; ++i) {
RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
tablet->_rs_version_map.emplace(rs->version(), rs);
}
tablet->_cumulative_point = 2;

st = _storage_engine->_submit_compaction_task(tablet, CompactionType::CUMULATIVE_COMPACTION,
false);
EXPECT_TRUE(st.OK());
}

int executing_task_num = _storage_engine->_get_executing_compaction_num(
_storage_engine->_tablet_submitted_cumu_compaction[_data_dir.get()]);
EXPECT_EQ(executing_task_num, 2);
}

} // namespace doris
2 changes: 1 addition & 1 deletion be/test/olap/tablet_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
create_tablet(id, false, rowset_size++);
}

std::unordered_set<TTabletId> cumu_set;
std::unordered_set<TabletSharedPtr> cumu_set;
std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>
cumulative_compaction_policies;
cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] =
Expand Down

0 comments on commit 4434b09

Please sign in to comment.