Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](compaction) be do not compact invisible version to avoid query error -230 #28082 #36222

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ void AgentServer::start_workers(ExecEnv* exec_env) {

_clean_trash_workers = std::make_unique<TaskWorkerPool>(
"CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); });

_update_visible_version_workers = std::make_unique<TaskWorkerPool>(
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

// clang-format on
}

Expand Down Expand Up @@ -278,6 +282,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
"task(signature={}) has wrong request member = clean_trash_req", signature);
}
break;
case TTaskType::UPDATE_VISIBLE_VERSION:
if (task.__isset.visible_version_req) {
_update_visible_version_workers->submit_task(task);
} else {
ret_st = Status::InvalidArgument(
"task(signature={}) has wrong request member = visible_version_req",
signature);
}
break;
default:
ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type",
signature, task_type);
Expand Down
1 change: 1 addition & 0 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class AgentServer {
std::unique_ptr<TopicSubscriber> _topic_subscriber;
std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
std::unique_ptr<TaskWorkerPool> _clean_trash_workers;
std::unique_ptr<TaskWorkerPool> _update_visible_version_workers;
};

} // end namespace doris
13 changes: 13 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");

void add_task_count(const TAgentTaskRequest& task, int n) {
// clang-format off
Expand Down Expand Up @@ -382,6 +383,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) {
ADD_TASK_COUNT(CLONE)
ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
ADD_TASK_COUNT(GC_BINLOG)
ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
#undef ADD_TASK_COUNT
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
Expand Down Expand Up @@ -970,6 +972,11 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
return;
}

std::map<int64_t, int64_t> partitions_version;
engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
request.__set_partitions_version(std::move(partitions_version));

int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
Expand Down Expand Up @@ -1699,6 +1706,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
engine.gc_binlogs(gc_tablet_infos);
}

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const TVisibleVersionReq& visible_version_req = req.visible_version_req;
engine.tablet_manager()->update_partitions_visible_version(
visible_version_req.partition_version);
}

void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
const TAgentTaskRequest& req) {
const auto& clone_req = req.clone_req;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req);

void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk,
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");

// Not compact the invisible versions, but with some limitations:
// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions;
// if timeout, keep no more than compaction_keep_invisible_version_min_count versions.
DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800");
DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50");
DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500");

// Threshold to logging compaction trace, in seconds.
DEFINE_mInt32(base_compaction_trace_threshold, "60");
DEFINE_mInt32(cumulative_compaction_trace_threshold, "10");
Expand Down
7 changes: 7 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk);
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round);

// Not compact the invisible versions, but with some limitations:
// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions;
// if timeout, keep no more than compaction_keep_invisible_version_min_count versions.
DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec);
DECLARE_mInt32(compaction_keep_invisible_version_min_count);
DECLARE_mInt32(compaction_keep_invisible_version_max_count);

// Threshold to logging compaction trace, in seconds.
DECLARE_mInt32(base_compaction_trace_threshold);
DECLARE_mInt32(cumulative_compaction_trace_threshold);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& ro
"Full compaction rowsets' versions not equal to all exist rowsets' versions. "
"full compaction rowsets max version={}-{}"
", current rowsets max version={}-{}"
"full compaction rowsets min version={}-{}, current rowsets min version=0-1",
", full compaction rowsets min version={}-{}, current rowsets min version=0-1",
last_rowset->start_version(), last_rowset->end_version(),
_tablet->max_version().first, _tablet->max_version().second,
first_rowset->start_version(), first_rowset->end_version());
Expand Down
21 changes: 21 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/Types_types.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'gen_cpp/Types_types.h' file not found [clang-diagnostic-error]

#include <gen_cpp/Types_types.h>
         ^

#include <netinet/in.h>

#include <atomic>
#include <charconv>
#include <cstdint>
#include <functional>
Expand All @@ -37,6 +38,7 @@
#include "olap/olap_define.h"
#include "olap/rowset/rowset_fwd.h"
#include "util/hash_util.hpp"
#include "util/time.h"
#include "util/uid_util.h"

namespace doris {
Expand Down Expand Up @@ -517,6 +519,25 @@ struct RidAndPos {

using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>;

// used for controll compaction
struct VersionWithTime {
std::atomic<int64_t> version;
int64_t update_ts;

VersionWithTime() : version(0), update_ts(MonotonicMillis()) {}

void update_version_monoto(int64_t new_version) {
int64_t cur_version = version.load(std::memory_order_relaxed);
while (cur_version < new_version) {
if (version.compare_exchange_strong(cur_version, new_version, std::memory_order_relaxed,
std::memory_order_relaxed)) {
update_ts = MonotonicMillis();
break;
}
}
}
};

} // namespace doris

// This intended to be a "good" hash function. It may change from time to time.
Expand Down
74 changes: 54 additions & 20 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1349,25 +1349,41 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compac
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
return candidate_rowsets;
}
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
if (version.first >= _cumulative_point && rs->is_local()) {
candidate_rowsets.push_back(rs);
}
}
}
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
return candidate_rowsets;
return _pick_visible_rowsets_to_compaction(_cumulative_point,
std::numeric_limits<int64_t>::max());
}

std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() {
return _pick_visible_rowsets_to_compaction(std::numeric_limits<int64_t>::min(),
_cumulative_point - 1);
}

std::vector<RowsetSharedPtr> Tablet::_pick_visible_rowsets_to_compaction(
int64_t min_start_version, int64_t max_start_version) {
auto [visible_version, update_ts] = get_visible_version_and_time();
bool update_time_long = MonotonicMillis() - update_ts >
config::compaction_keep_invisible_version_timeout_sec * 1000L;
int32_t keep_invisible_version_limit =
update_time_long ? config::compaction_keep_invisible_version_min_count
: config::compaction_keep_invisible_version_max_count;

std::vector<RowsetSharedPtr> candidate_rowsets;
{
std::shared_lock rlock(_meta_lock);
for (const auto& [version, rs] : _rs_version_map) {
// Do compaction on local rowsets only.
if (version.first < _cumulative_point && rs->is_local()) {
int64_t version_start = version.first;
// rowset is remote or rowset is not in given range
if (!rs->is_local() || version_start < min_start_version ||
version_start > max_start_version) {
continue;
}

// can compact, met one of the conditions:
// 1. had been visible;
// 2. exceeds the limit of keep invisible versions.
int64_t version_end = version.second;
if (version_end <= visible_version ||
version_end > visible_version + keep_invisible_version_limit) {
candidate_rowsets.push_back(rs);
}
}
Expand All @@ -1390,13 +1406,8 @@ std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_full_compaction()

std::vector<RowsetSharedPtr> Tablet::pick_first_consecutive_empty_rowsets(int limit) {
std::vector<RowsetSharedPtr> consecutive_empty_rowsets;
std::vector<RowsetSharedPtr> candidate_rowsets;
traverse_rowsets([&candidate_rowsets, this](const auto& rs) {
if (rs->is_local() && rs->start_version() >= _cumulative_point) {
candidate_rowsets.emplace_back(rs);
}
});
std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
std::vector<RowsetSharedPtr> candidate_rowsets =
pick_candidate_rowsets_to_cumulative_compaction();
int len = candidate_rowsets.size();
for (int i = 0; i < len - 1; ++i) {
auto rowset = candidate_rowsets[i];
Expand Down Expand Up @@ -1475,6 +1486,19 @@ std::string Tablet::_get_rowset_info_str(RowsetSharedPtr rowset, bool delete_fla
rowset->rowset_id().to_string(), disk_size);
}

std::tuple<int64_t, int64_t> Tablet::get_visible_version_and_time() const {
// some old tablet has bug, its partition_id is 0, fe couldn't update its visible version.
// so let this tablet's visible version become int64 max.
auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed);
if (version_info != nullptr && partition_id() != 0) {
return std::make_tuple(version_info->version.load(std::memory_order_relaxed),
version_info->update_ts);
} else {
return std::make_tuple(std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max());
}
}

// For http compaction action
void Tablet::get_compaction_status(std::string* json_result) {
rapidjson::Document root;
Expand Down Expand Up @@ -1837,13 +1861,23 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
}
});

int64_t total_version_count = _tablet_meta->version_count();

// For compatibility.
// For old fe, it wouldn't send visible version request to be, then be's visible version is always 0.
// Let visible_version_count set to total_version_count in be's report.
int64_t visible_version_count = total_version_count;
if (auto [visible_version, _] = get_visible_version_and_time(); visible_version > 0) {
visible_version_count = _tablet_meta->version_count_cross_with_range({0, visible_version});
}
// the report version is the largest continuous version, same logic as in FE side
tablet_info->__set_version(cversion.second);
// Useless but it is a required filed in TTabletInfo
tablet_info->__set_version_hash(0);
tablet_info->__set_partition_id(_tablet_meta->partition_id());
tablet_info->__set_storage_medium(_data_dir->storage_medium());
tablet_info->__set_version_count(_tablet_meta->version_count());
tablet_info->__set_total_version_count(total_version_count);
tablet_info->__set_visible_version_count(visible_version_count);
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
tablet_info->__set_replica_id(replica_id());
Expand Down
13 changes: 13 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ class Tablet final : public BaseTablet {

bool should_fetch_from_peer();

std::tuple<int64_t, int64_t> get_visible_version_and_time() const;

void set_visible_version(const std::shared_ptr<const VersionWithTime>& visible_version) {
std::atomic_store_explicit(&_visible_version, visible_version, std::memory_order_relaxed);
}

inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
Expand Down Expand Up @@ -625,6 +631,10 @@ class Tablet final : public BaseTablet {
// When the proportion of empty edges in the adjacency matrix used to represent the version graph
// in the version tracker is greater than the threshold, rebuild the version tracker
bool _reconstruct_version_tracker_if_necessary();

std::vector<RowsetSharedPtr> _pick_visible_rowsets_to_compaction(int64_t min_start_version,
int64_t max_start_version);

void _init_context_common_fields(RowsetWriterContext& context);

void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre,
Expand Down Expand Up @@ -740,6 +750,9 @@ class Tablet final : public BaseTablet {

int64_t _io_error_times = 0;

// partition's visible version. it sync from fe, but not real-time.
std::shared_ptr<const VersionWithTime> _visible_version;

std::atomic_bool _is_full_compaction_running = false;
};

Expand Down
Loading
Loading