Skip to content

Commit

Permalink
fix error -230
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Apr 28, 2024
1 parent 57621c2 commit da9c446
Show file tree
Hide file tree
Showing 59 changed files with 1,340 additions and 252 deletions.
3 changes: 3 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
_workers[TTaskType::CLEAN_TRASH] = std::make_unique<TaskWorkerPool>(
"CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); });

_workers[TTaskType::UPDATE_VISIBLE_VERSION] = std::make_unique<TaskWorkerPool>(
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); }));

Expand Down
13 changes: 13 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand All @@ -452,6 +453,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -1077,6 +1079,11 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
return;
}

std::map<int64_t, int64_t> partitions_version;
engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
request.__set_partitions_version(std::move(partitions_version));

int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
Expand Down Expand Up @@ -1927,6 +1934,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
engine.gc_binlogs(gc_tablet_infos);
}

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const TVisibleVersionReq& visible_version_req = req.visible_version_req;
engine.tablet_manager()->update_partitions_visible_version(
visible_version_req.partition_version);
}

void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
const TAgentTaskRequest& req) {
const auto& clone_req = req.clone_req;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk,
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");

// Not compact the invisible versions, but with some limitations:
// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions;
// if timeout, keep no more than compaction_keep_invisible_version_min_count versions.
DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800");
DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50");
DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500");

// Threshold to logging compaction trace, in seconds.
DEFINE_mInt32(base_compaction_trace_threshold, "60");
DEFINE_mInt32(cumulative_compaction_trace_threshold, "10");
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk);
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round);

// Not compact the invisible versions, but with some limitations:
// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions;
// if timeout, keep no more than compaction_keep_invisible_version_min_count versions.
DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec);
DECLARE_mInt32(compaction_keep_invisible_version_min_count);
DECLARE_mInt32(compaction_keep_invisible_version_max_count);

// Threshold to logging compaction trace, in seconds.
DECLARE_mInt32(base_compaction_trace_threshold);
DECLARE_mInt32(cumulative_compaction_trace_threshold);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& ro
"Full compaction rowsets' versions not equal to all exist rowsets' versions. "
"full compaction rowsets max version={}-{}"
", current rowsets max version={}-{}"
"full compaction rowsets min version={}-{}, current rowsets min version=0-1",
", full compaction rowsets min version={}-{}, current rowsets min version=0-1",
last_rowset->start_version(), last_rowset->end_version(), max_version.first,
max_version.second, first_rowset->start_version(), first_rowset->end_version());
}
Expand Down
21 changes: 21 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/Types_types.h>
#include <netinet/in.h>

#include <atomic>
#include <charconv>
#include <cstdint>
#include <functional>
Expand All @@ -36,6 +37,7 @@
#include "io/io_common.h"
#include "olap/olap_define.h"
#include "util/hash_util.hpp"
#include "util/time.h"
#include "util/uid_util.h"

namespace doris {
Expand Down Expand Up @@ -509,6 +511,25 @@ struct RidAndPos {

using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>;

// used for controll compaction
struct VersionWithTime {
std::atomic<int64_t> version;
int64_t update_ts;

VersionWithTime() : version(0), update_ts(MonotonicMillis()) {}

void update_version_monoto(int64_t new_version) {
int64_t cur_version = version.load(std::memory_order_relaxed);
while (cur_version < new_version) {
if (version.compare_exchange_strong(cur_version, new_version, std::memory_order_relaxed,
std::memory_order_relaxed)) {
update_ts = MonotonicMillis();
break;
}
}
}
};

} // namespace doris

// This intended to be a "good" hash function. It may change from time to time.
Expand Down
74 changes: 54 additions & 20 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,25 +1074,41 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compac
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return candidate_rowsets;
}
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
if (version.first >= _cumulative_point && rs->is_local()) {
candidate_rowsets.push_back(rs);
}
}
}
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
return candidate_rowsets;
return _pick_visible_rowsets_to_compaction(_cumulative_point,
std::numeric_limits<int64_t>::max());
}

std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() {
return _pick_visible_rowsets_to_compaction(std::numeric_limits<int64_t>::min(),
_cumulative_point - 1);
}

std::vector<RowsetSharedPtr> Tablet::_pick_visible_rowsets_to_compaction(
int64_t min_start_version, int64_t max_start_version) {
auto [visible_version, update_ts] = get_visible_version_and_time();
bool update_time_long = MonotonicMillis() - update_ts >
config::compaction_keep_invisible_version_timeout_sec * 1000L;
int32_t keep_invisible_version_limit =
update_time_long ? config::compaction_keep_invisible_version_min_count
: config::compaction_keep_invisible_version_max_count;

std::vector<RowsetSharedPtr> candidate_rowsets;
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
// Do compaction on local rowsets only.
if (version.first < _cumulative_point && rs->is_local()) {
int64_t version_start = version.first;
// rowset is remote or rowset is not in given range
if (!rs->is_local() || version_start < min_start_version ||
version_start > max_start_version) {
continue;
}

// can compact, met one of the conditions:
// 1. had been visible;
// 2. exceeds the limit of keep invisible versions.
int64_t version_end = version.second;
if (version_end <= visible_version ||
version_end > visible_version + keep_invisible_version_limit) {
candidate_rowsets.push_back(rs);
}
}
Expand All @@ -1115,13 +1131,8 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_full_compaction()

std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int limit) {
std::vector<RowsetSharedPtr> consecutive_empty_rowsets;
std::vector<RowsetSharedPtr> candidate_rowsets;
traverse_rowsets([&candidate_rowsets, this](const auto& rs) {
if (rs->is_local() && rs->start_version() >= _cumulative_point) {
candidate_rowsets.emplace_back(rs);
}
});
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
std::vector<RowsetSharedPtr> candidate_rowsets =
pick_candidate_rowsets_to_cumulative_compaction();
int len = candidate_rowsets.size();
for (int i = 0; i < len - 1; ++i) {
auto rowset = candidate_rowsets[i];
Expand Down Expand Up @@ -1190,6 +1201,19 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_build_inverted_in
return candidate_rowsets;
}

std::tuple<int64_t, int64_t> Tablet::get_visible_version_and_time() const {
// some old tablet has bug, its partition_id is 0, fe couldn't update its visible version.
// so let this tablet's visible version become int64 max.
auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed);
if (version_info != nullptr && partition_id() != 0) {
return std::make_tuple(version_info->version.load(std::memory_order_relaxed),
version_info->update_ts);
} else {
return std::make_tuple(std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
}
}

// For http compaction action
void Tablet::get_compaction_status(std::string* json_result) {
rapidjson::Document root;
Expand Down Expand Up @@ -1526,13 +1550,23 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
}
});

int64_t total_version_count = _tablet_meta->version_count();

// For compatibility.
// For old fe, it wouldn't send visible version request to be, then be's visible version is always 0.
// Let visible_version_count set to total_version_count in be's report.
int64_t visible_version_count = total_version_count;
if (auto [visible_version, _] = get_visible_version_and_time(); visible_version > 0) {
visible_version_count = _tablet_meta->version_count_cross_with_range({0, visible_version});
}
// the report version is the largest continuous version, same logic as in FE side
tablet_info->__set_version(cversion.second);
// Useless but it is a required filed in TTabletInfo
tablet_info->__set_version_hash(0);
tablet_info->__set_partition_id(_tablet_meta->partition_id());
tablet_info->__set_storage_medium(_data_dir->storage_medium());
tablet_info->__set_version_count(_tablet_meta->version_count());
tablet_info->__set_total_version_count(total_version_count);
tablet_info->__set_visible_version_count(visible_version_count);
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
tablet_info->__set_replica_id(replica_id());
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ class Tablet final : public BaseTablet {

std::string get_last_base_compaction_status() { return _last_base_compaction_status; }

std::tuple<int64_t, int64_t> get_visible_version_and_time() const;

void set_visible_version(const std::shared_ptr<const VersionWithTime>& visible_version) {
std::atomic_store_explicit(&_visible_version, visible_version, std::memory_order_relaxed);
}

inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
Expand Down Expand Up @@ -484,6 +490,9 @@ class Tablet final : public BaseTablet {
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
uint32_t _calc_base_compaction_score() const;

std::vector<RowsetSharedPtr> _pick_visible_rowsets_to_compaction(int64_t min_start_version,
int64_t max_start_version);

void _init_context_common_fields(RowsetWriterContext& context);

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -578,6 +587,9 @@ class Tablet final : public BaseTablet {

int64_t _io_error_times = 0;

// partition's visible version. it sync from fe, but not real-time.
std::shared_ptr<const VersionWithTime> _visible_version;

std::atomic_bool _is_full_compaction_running = false;
};

Expand Down
Loading

0 comments on commit da9c446

Please sign in to comment.