diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 3fbf487b7dd531..565b0b373eae12 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -180,6 +180,9 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { _workers[TTaskType::CLEAN_TRASH] = std::make_unique( "CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); }); + _workers[TTaskType::UPDATE_VISIBLE_VERSION] = std::make_unique( + "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); }); + _report_workers.push_back(std::make_unique( "REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); })); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 3b8d63f7a95dbe..745a8286490414 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -426,6 +426,7 @@ bvar::Adder ALTER_count("task", "ALTER_TABLE"); bvar::Adder CLONE_count("task", "CLONE"); bvar::Adder STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE"); bvar::Adder GC_BINLOG_count("task", "GC_BINLOG"); +bvar::Adder UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION"); void add_task_count(const TAgentTaskRequest& task, int n) { // clang-format off @@ -452,6 +453,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) { ADD_TASK_COUNT(CLONE) ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE) ADD_TASK_COUNT(GC_BINLOG) + ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION) #undef ADD_TASK_COUNT case TTaskType::REALTIME_PUSH: case TTaskType::PUSH: @@ -1077,6 +1079,11 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); return; } + + std::map partitions_version; + engine.tablet_manager()->get_partitions_visible_version(&partitions_version); + request.__set_partitions_version(std::move(partitions_version)); + int64_t max_compaction_score = std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), DorisMetrics::instance()->tablet_base_max_compaction_score->value()); @@ -1927,6 +1934,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) { engine.gc_binlogs(gc_tablet_infos); } +void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) { + const TVisibleVersionReq& visible_version_req = req.visible_version_req; + engine.tablet_manager()->update_partitions_visible_version( + visible_version_req.partition_version); +} + void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, const TAgentTaskRequest& req) { const auto& clone_req = req.clone_req; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 10c0bcce4efd10..4b9a2b6d8dc7e5 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -176,6 +176,8 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req); void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req); +void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req); + void report_task_callback(const TMasterInfo& master_info); void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 0c257e48a8f452..3dd170557c907d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -429,6 +429,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk, // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9"); +// Not compact the invisible versions, but with some limitations: +// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions; +// if timeout, keep no more than compaction_keep_invisible_version_min_count versions. +DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800"); +DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50"); +DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500"); + // Threshold to logging compaction trace, in seconds. DEFINE_mInt32(base_compaction_trace_threshold, "60"); DEFINE_mInt32(cumulative_compaction_trace_threshold, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index e1ec93ff63f768..f67eb6ccf7ace8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -474,6 +474,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk); // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round); +// Not compact the invisible versions, but with some limitations: +// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions; +// if timeout, keep no more than compaction_keep_invisible_version_min_count versions. +DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec); +DECLARE_mInt32(compaction_keep_invisible_version_min_count); +DECLARE_mInt32(compaction_keep_invisible_version_max_count); + // Threshold to logging compaction trace, in seconds. DECLARE_mInt32(base_compaction_trace_threshold); DECLARE_mInt32(cumulative_compaction_trace_threshold); diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 8a2712c38b5507..9d675f731924c1 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -134,7 +134,7 @@ Status FullCompaction::_check_all_version(const std::vector& ro "Full compaction rowsets' versions not equal to all exist rowsets' versions. " "full compaction rowsets max version={}-{}" ", current rowsets max version={}-{}" - "full compaction rowsets min version={}-{}, current rowsets min version=0-1", + ", full compaction rowsets min version={}-{}, current rowsets min version=0-1", last_rowset->start_version(), last_rowset->end_version(), max_version.first, max_version.second, first_rowset->start_version(), first_rowset->end_version()); } diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 782ebb5a60fc55..4d0c035bb0facd 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include "io/io_common.h" #include "olap/olap_define.h" #include "util/hash_util.hpp" +#include "util/time.h" #include "util/uid_util.h" namespace doris { @@ -509,6 +511,25 @@ struct RidAndPos { using PartialUpdateReadPlan = std::map>>; +// used for controll compaction +struct VersionWithTime { + std::atomic version; + int64_t update_ts; + + VersionWithTime() : version(0), update_ts(MonotonicMillis()) {} + + void update_version_monoto(int64_t new_version) { + int64_t cur_version = version.load(std::memory_order_relaxed); + while (cur_version < new_version) { + if (version.compare_exchange_strong(cur_version, new_version, std::memory_order_relaxed, + std::memory_order_relaxed)) { + update_ts = MonotonicMillis(); + break; + } + } + } +}; + } // namespace doris // This intended to be a "good" hash function. It may change from time to time. diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 358292463fc3fa..ab6471df6be025 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1074,25 +1074,41 @@ std::vector Tablet::pick_candidate_rowsets_to_cumulative_compac if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { return candidate_rowsets; } - { - std::shared_lock rlock(_meta_lock); - for (const auto& [version, rs] : _rs_version_map) { - if (version.first >= _cumulative_point && rs->is_local()) { - candidate_rowsets.push_back(rs); - } - } - } - std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); - return candidate_rowsets; + return _pick_visible_rowsets_to_compaction(_cumulative_point, + std::numeric_limits::max()); } std::vector Tablet::pick_candidate_rowsets_to_base_compaction() { + return _pick_visible_rowsets_to_compaction(std::numeric_limits::min(), + _cumulative_point - 1); +} + +std::vector Tablet::_pick_visible_rowsets_to_compaction( + int64_t min_start_version, int64_t max_start_version) { + auto [visible_version, update_ts] = get_visible_version_and_time(); + bool update_time_long = MonotonicMillis() - update_ts > + config::compaction_keep_invisible_version_timeout_sec * 1000L; + int32_t keep_invisible_version_limit = + update_time_long ? config::compaction_keep_invisible_version_min_count + : config::compaction_keep_invisible_version_max_count; + std::vector candidate_rowsets; { std::shared_lock rlock(_meta_lock); for (const auto& [version, rs] : _rs_version_map) { - // Do compaction on local rowsets only. - if (version.first < _cumulative_point && rs->is_local()) { + int64_t version_start = version.first; + // rowset is remote or rowset is not in given range + if (!rs->is_local() || version_start < min_start_version || + version_start > max_start_version) { + continue; + } + + // can compact, met one of the conditions: + // 1. had been visible; + // 2. exceeds the limit of keep invisible versions. + int64_t version_end = version.second; + if (version_end <= visible_version || + version_end > visible_version + keep_invisible_version_limit) { candidate_rowsets.push_back(rs); } } @@ -1115,13 +1131,8 @@ std::vector Tablet::pick_candidate_rowsets_to_full_compaction() std::vector Tablet::pick_first_consecutive_empty_rowsets(int limit) { std::vector consecutive_empty_rowsets; - std::vector candidate_rowsets; - traverse_rowsets([&candidate_rowsets, this](const auto& rs) { - if (rs->is_local() && rs->start_version() >= _cumulative_point) { - candidate_rowsets.emplace_back(rs); - } - }); - std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + std::vector candidate_rowsets = + pick_candidate_rowsets_to_cumulative_compaction(); int len = candidate_rowsets.size(); for (int i = 0; i < len - 1; ++i) { auto rowset = candidate_rowsets[i]; @@ -1190,6 +1201,19 @@ std::vector Tablet::pick_candidate_rowsets_to_build_inverted_in return candidate_rowsets; } +std::tuple Tablet::get_visible_version_and_time() const { + // some old tablet has bug, its partition_id is 0, fe couldn't update its visible version. + // so let this tablet's visible version become int64 max. + auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed); + if (version_info != nullptr && partition_id() != 0) { + return std::make_tuple(version_info->version.load(std::memory_order_relaxed), + version_info->update_ts); + } else { + return std::make_tuple(std::numeric_limits::max(), + std::numeric_limits::max()); + } +} + // For http compaction action void Tablet::get_compaction_status(std::string* json_result) { rapidjson::Document root; @@ -1526,13 +1550,23 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, } }); + int64_t total_version_count = _tablet_meta->version_count(); + + // For compatibility. + // For old fe, it wouldn't send visible version request to be, then be's visible version is always 0. + // Let visible_version_count set to total_version_count in be's report. + int64_t visible_version_count = total_version_count; + if (auto [visible_version, _] = get_visible_version_and_time(); visible_version > 0) { + visible_version_count = _tablet_meta->version_count_cross_with_range({0, visible_version}); + } // the report version is the largest continuous version, same logic as in FE side tablet_info->__set_version(cversion.second); // Useless but it is a required filed in TTabletInfo tablet_info->__set_version_hash(0); tablet_info->__set_partition_id(_tablet_meta->partition_id()); tablet_info->__set_storage_medium(_data_dir->storage_medium()); - tablet_info->__set_version_count(_tablet_meta->version_count()); + tablet_info->__set_total_version_count(total_version_count); + tablet_info->__set_visible_version_count(visible_version_count); tablet_info->__set_path_hash(_data_dir->path_hash()); tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory()); tablet_info->__set_replica_id(replica_id()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 678a519cfaeb72..04fee1fb3caa3b 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -311,6 +311,12 @@ class Tablet final : public BaseTablet { std::string get_last_base_compaction_status() { return _last_base_compaction_status; } + std::tuple get_visible_version_and_time() const; + + void set_visible_version(const std::shared_ptr& visible_version) { + std::atomic_store_explicit(&_visible_version, visible_version, std::memory_order_relaxed); + } + inline bool all_beta() const { std::shared_lock rdlock(_meta_lock); return _tablet_meta->all_beta(); @@ -484,6 +490,9 @@ class Tablet final : public BaseTablet { std::shared_ptr cumulative_compaction_policy); uint32_t _calc_base_compaction_score() const; + std::vector _pick_visible_rowsets_to_compaction(int64_t min_start_version, + int64_t max_start_version); + void _init_context_common_fields(RowsetWriterContext& context); //////////////////////////////////////////////////////////////////////////// @@ -578,6 +587,9 @@ class Tablet final : public BaseTablet { int64_t _io_error_times = 0; + // partition's visible version. it sync from fe, but not real-time. + std::shared_ptr _visible_version; + std::atomic_bool _is_full_compaction_running = false; }; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index d8a23430363c8f..d3fdc52e06c8e6 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -1016,13 +1016,14 @@ void TabletManager::build_all_report_tablets_info(std::map* tablet_info.__set_transaction_ids(find->second); expire_txn_map.erase(find); } - tablet_version_num_hist.add(tablet->version_count()); + tablet_version_num_hist.add(tablet_info.total_version_count); auto& t_tablet_stat = local_cache->emplace_back(); t_tablet_stat.__set_tablet_id(tablet_info.tablet_id); t_tablet_stat.__set_data_size(tablet_info.data_size); t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size); - t_tablet_stat.__set_row_num(tablet_info.row_count); - t_tablet_stat.__set_version_count(tablet_info.version_count); + t_tablet_stat.__set_row_count(tablet_info.row_count); + t_tablet_stat.__set_total_version_count(tablet_info.total_version_count); + t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count); }; for_each_tablet(handler, filter_all_tablets); @@ -1257,9 +1258,29 @@ void TabletManager::update_root_path_info(std::map* path_ma void TabletManager::get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos) { - std::shared_lock rdlock(_partition_tablet_map_lock); - if (_partition_tablet_map.find(partition_id) != _partition_tablet_map.end()) { - *tablet_infos = _partition_tablet_map[partition_id]; + std::shared_lock rdlock(_partitions_lock); + auto it = _partitions.find(partition_id); + if (it != _partitions.end()) { + *tablet_infos = it->second.tablets; + } +} + +void TabletManager::get_partitions_visible_version(std::map* partitions_version) { + std::shared_lock rdlock(_partitions_lock); + for (const auto& [partition_id, partition] : _partitions) { + partitions_version->insert( + {partition_id, partition.visible_version->version.load(std::memory_order_relaxed)}); + } +} + +void TabletManager::update_partitions_visible_version( + const std::map& partitions_version) { + std::shared_lock rdlock(_partitions_lock); + for (auto [partition_id, version] : partitions_version) { + auto it = _partitions.find(partition_id); + if (it != _partitions.end()) { + it->second.visible_version->update_version_monoto(version); + } } } @@ -1348,15 +1369,25 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { } void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) { - std::lock_guard wrlock(_partition_tablet_map_lock); - _partition_tablet_map[tablet->partition_id()].insert(tablet->get_tablet_info()); + std::lock_guard wrlock(_partitions_lock); + auto& partition = _partitions[tablet->partition_id()]; + partition.tablets.insert(tablet->get_tablet_info()); + tablet->set_visible_version( + std::static_pointer_cast(partition.visible_version)); } void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) { - std::lock_guard wrlock(_partition_tablet_map_lock); - _partition_tablet_map[tablet->partition_id()].erase(tablet->get_tablet_info()); - if (_partition_tablet_map[tablet->partition_id()].empty()) { - _partition_tablet_map.erase(tablet->partition_id()); + tablet->set_visible_version(nullptr); + std::lock_guard wrlock(_partitions_lock); + auto it = _partitions.find(tablet->partition_id()); + if (it == _partitions.end()) { + return; + } + + auto& tablets = it->second.tablets; + tablets.erase(tablet->get_tablet_info()); + if (tablets.empty()) { + _partitions.erase(it); } } @@ -1393,22 +1424,23 @@ void TabletManager::get_tablets_distribution_on_different_disks( std::map>& tablets_num_on_disk, std::map>>& tablets_info_on_disk) { std::vector data_dirs = _engine.get_stores(); - std::map> partition_tablet_map; + std::map partitions; { - // When drop tablet, '_partition_tablet_map_lock' is locked in 'tablet_shard_lock'. - // To avoid locking 'tablet_shard_lock' in '_partition_tablet_map_lock', we lock and - // copy _partition_tablet_map here. - std::shared_lock rdlock(_partition_tablet_map_lock); - partition_tablet_map = _partition_tablet_map; + // When drop tablet, '_partitions_lock' is locked in 'tablet_shard_lock'. + // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock and + // copy _partitions here. + std::shared_lock rdlock(_partitions_lock); + partitions = _partitions; } - for (auto& [partition_id, tablet_infos] : partition_tablet_map) { + + for (const auto& [partition_id, partition] : partitions) { std::map tablets_num; std::map> tablets_info; for (auto* data_dir : data_dirs) { tablets_num[data_dir] = 0; } - for (const auto& tablet_info : tablet_infos) { + for (const auto& tablet_info : partition.tablets) { // get_tablet() will hold 'tablet_shard_lock' TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id); if (tablet == nullptr) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 9f8164b853f44d..b090277677b5fb 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -144,6 +144,10 @@ class TabletManager { void get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos); + void get_partitions_visible_version(std::map* partitions_version); + + void update_partitions_visible_version(const std::map& partitions_version); + void do_tablet_meta_checkpoint(DataDir* data_dir); void obtain_specific_quantity_tablets(std::vector& tablets_info, int64_t num); @@ -229,22 +233,27 @@ class TabletManager { std::set tablets_under_clone; }; + struct Partition { + std::set tablets; + std::shared_ptr visible_version {new VersionWithTime}; + }; + StorageEngine& _engine; // TODO: memory size of TabletSchema cannot be accurately tracked. - // trace the memory use by meta of tablet std::shared_ptr _tablet_meta_mem_tracker; const int32_t _tablets_shards_size; const int32_t _tablets_shards_mask; std::vector _tablets_shards; - // Protect _partition_tablet_map, should not be obtained before _tablet_map_lock to avoid dead lock - std::shared_mutex _partition_tablet_map_lock; + // Protect _partitions, should not be obtained before _tablet_map_lock to avoid dead lock + std::shared_mutex _partitions_lock; + // partition_id => partition + std::map _partitions; + // Protect _shutdown_tablets, should not be obtained before _tablet_map_lock to avoid dead lock std::shared_mutex _shutdown_tablets_lock; - // partition_id => tablet_info - std::map> _partition_tablet_map; // the delete tablets. notice only allow function `start_trash_sweep` can erase tablets in _shutdown_tablets std::list _shutdown_tablets; std::mutex _gc_tablets_lock; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 434fbaaf2e8921..3cd001fd3e19d4 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -751,6 +751,16 @@ Version TabletMeta::max_version() const { return max_version; } +size_t TabletMeta::version_count_cross_with_range(const Version& range) const { + size_t count = 0; + for (const auto& rs_meta : _rs_metas) { + if (!(range.first > rs_meta->version().second || range.second < rs_meta->version().first)) { + count++; + } + } + return count; +} + Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) { // check RowsetMeta is valid for (auto& rs : _rs_metas) { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 0feba660750b7c..12999ed7d31b44 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -165,6 +165,7 @@ class TabletMeta { // Remote disk space occupied by tablet. size_t tablet_remote_size() const; size_t version_count() const; + size_t version_count_cross_with_range(const Version& range) const; Version max_version() const; TabletState tablet_state() const; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 94afb9f20fb8b3..1f5f11fa8c0b17 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -162,6 +162,8 @@ Status EngineCloneTask::execute() { } Status st = _do_clone(); _engine.tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); + _engine.tablet_manager()->update_partitions_visible_version( + {{_clone_req.partition_id, _clone_req.version}}); return st; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e2861310faa592..384a858c3fa4d3 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1475,11 +1475,11 @@ public class Config extends ConfigBase { public static int default_max_query_instances = -1; /* - * One master daemon thread will update global partition in memory - * info every partition_in_memory_update_interval_secs + * One master daemon thread will update global partition info, include in memory and visible version + * info every partition_info_update_interval_secs */ @ConfField(mutable = false, masterOnly = true) - public static int partition_in_memory_update_interval_secs = 300; + public static int partition_info_update_interval_secs = 60; @ConfField(masterOnly = true) public static boolean enable_concurrent_update = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 8d18c687a40773..4c4e1118670cce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -242,8 +242,7 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce task.getSignature(), replica, task.getVersion()); boolean versionChanged = false; if (replica.getVersion() < task.getVersion()) { - replica.updateVersionInfo(task.getVersion(), replica.getDataSize(), replica.getRemoteDataSize(), - replica.getRowCount()); + replica.updateVersion(task.getVersion()); versionChanged = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 10ffb398bc138c..01d61c64cff5c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1779,8 +1779,7 @@ private Status allTabletCommitted(boolean isReplay) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { if (!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) { - replica.updateVersionInfo(part.getVisibleVersion(), replica.getDataSize(), - replica.getRemoteDataSize(), replica.getRowCount()); + replica.updateVersion(part.getVisibleVersion()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java index 0d16d3bbc55ae8..e2375131dc5b1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java @@ -219,10 +219,14 @@ private void updateTabletStat(GetTabletStatsResponse response) { for (TabletStatsPB stat : response.getTabletStatsList()) { if (invertedIndex.getTabletMeta(stat.getIdx().getTabletId()) != null) { List replicas = invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId()); - if (replicas != null && !replicas.isEmpty() && replicas.get(0) != null) { - replicas.get(0).updateCloudStat(stat.getDataSize(), stat.getNumRowsets(), - stat.getNumSegments(), stat.getNumRows()); + if (replicas == null || replicas.isEmpty() || replicas.get(0) == null) { + continue; } + Replica replica = replicas.get(0); + replica.setDataSize(stat.getDataSize()); + replica.setRowsetCount(stat.getNumRowsets()); + replica.setSegmentCount(stat.getNumSegments()); + replica.setRowCount(stat.getNumRows()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 275c4d1ff42171..e4914ff3491fbf 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -174,7 +174,7 @@ import org.apache.doris.load.sync.SyncJobManager; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; -import org.apache.doris.master.PartitionInMemoryInfoCollector; +import org.apache.doris.master.PartitionInfoCollector; import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mtmv.MTMVAlterOpType; @@ -368,7 +368,7 @@ public class Env { private PublishVersionDaemon publishVersionDaemon; private DeleteHandler deleteHandler; private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector; - private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector; + private PartitionInfoCollector partitionInfoCollector; private CooldownConfHandler cooldownConfHandler; private ExternalMetaIdMgr externalMetaIdMgr; private MetastoreEventsProcessor metastoreEventsProcessor; @@ -665,7 +665,7 @@ public Env(boolean isCheckpointCatalog) { this.publishVersionDaemon = new PublishVersionDaemon(); this.deleteHandler = new DeleteHandler(); this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector(); - this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector(); + this.partitionInfoCollector = new PartitionInfoCollector(); if (Config.enable_storage_policy) { this.cooldownConfHandler = new CooldownConfHandler(); } @@ -1695,7 +1695,7 @@ protected void startMasterOnlyDaemonThreads() { // start daemon thread to update db used data quota for db txn manager periodically dbUsedDataQuotaInfoCollector.start(); // start daemon thread to update global partition in memory information periodically - partitionInMemoryInfoCollector.start(); + partitionInfoCollector.start(); if (Config.enable_storage_policy) { cooldownConfHandler.start(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 333b7b146acf27..1f1e2599d9c064 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -110,7 +110,7 @@ private static List> getTabletStatus(String dbName, String tblName, row.add(String.valueOf(replica.getLastSuccessVersion())); row.add(String.valueOf(visibleVersion)); row.add(String.valueOf(replica.getSchemaHash())); - row.add(String.valueOf(replica.getVersionCount())); + row.add(String.valueOf(replica.getTotalVersionCount())); row.add(String.valueOf(replica.isBad())); row.add(String.valueOf(replica.isUserDrop())); row.add(replica.getState().name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index e7831125ca3fda..2d8e7d99af211f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -22,6 +22,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -122,7 +123,8 @@ public static class ReplicaContext { @SerializedName(value = "lastSuccessVersionHash") private long lastSuccessVersionHash = 0L; - private volatile long versionCount = -1; + private volatile long totalVersionCount = -1; + private volatile long visibleVersionCount = -1; private long pathHash = -1; @@ -252,22 +254,42 @@ public long getDataSize() { return dataSize; } + public void setDataSize(long dataSize) { + this.dataSize = dataSize; + } + public long getRemoteDataSize() { return remoteDataSize; } + public void setRemoteDataSize(long remoteDataSize) { + this.remoteDataSize = remoteDataSize; + } + public long getRowCount() { return rowCount; } + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + public long getSegmentCount() { return segmentCount; } + public void setSegmentCount(long segmentCount) { + this.segmentCount = segmentCount; + } + public long getRowsetCount() { return rowsetCount; } + public void setRowsetCount(long rowsetCount) { + this.rowsetCount = rowsetCount; + } + public long getLastFailedVersion() { return lastFailedVersion; } @@ -348,35 +370,24 @@ public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) { this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd; } - // for compatibility - public synchronized void updateStat(long dataSize, long rowNum) { - this.dataSize = dataSize; - this.rowCount = rowNum; - } - - public synchronized void updateStat(long dataSize, long remoteDataSize, long rowNum, long versionCount) { - this.dataSize = dataSize; - this.remoteDataSize = remoteDataSize; - this.rowCount = rowNum; - this.versionCount = versionCount; - } - - public synchronized void updateCloudStat(long dataSize, long rowsetNum, long segmentNum, long rowNum) { - this.dataSize = dataSize; - this.rowsetCount = rowsetNum; - this.segmentCount = segmentNum; - this.rowCount = rowNum; + public void updateWithReport(TTabletInfo backendReplica) { + updateVersion(backendReplica.getVersion()); + setDataSize(backendReplica.getDataSize()); + setRemoteDataSize(backendReplica.getRemoteDataSize()); + setRowCount(backendReplica.getRowCount()); + setTotalVersionCount(backendReplica.getTotalVersionCount()); + setVisibleVersionCount( + backendReplica.isSetVisibleVersionCount() ? backendReplica.getVisibleVersionCount() + : backendReplica.getTotalVersionCount()); } - public synchronized void updateVersionInfo(long newVersion, long newDataSize, long newRemoteDataSize, - long newRowCount) { - updateReplicaInfo(newVersion, this.lastFailedVersion, this.lastSuccessVersion, newDataSize, newRemoteDataSize, - newRowCount); + public synchronized void updateVersion(long newVersion) { + updateReplicaVersion(newVersion, this.lastFailedVersion, this.lastSuccessVersion); } - public synchronized void updateVersionWithFailedInfo( + public synchronized void updateVersionWithFailed( long newVersion, long lastFailedVersion, long lastSuccessVersion) { - updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(newVersion, lastFailedVersion, lastSuccessVersion); } public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion, @@ -439,9 +450,7 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. * We just reset the LFV(hash) to recovery this replica. */ - private void updateReplicaInfo(long newVersion, - long lastFailedVersion, long lastSuccessVersion, - long newDataSize, long newRemoteDataSize, long newRowCount) { + private void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) { if (LOG.isDebugEnabled()) { LOG.debug("before update: {}", this.toString()); } @@ -466,9 +475,6 @@ private void updateReplicaInfo(long newVersion, long oldLastFailedVersion = this.lastFailedVersion; this.version = newVersion; - this.dataSize = newDataSize; - this.remoteDataSize = newRemoteDataSize; - this.rowCount = newRowCount; // just check it if (lastSuccessVersion <= this.version) { @@ -531,7 +537,7 @@ private void updateReplicaInfo(long newVersion, } public synchronized void updateLastFailedVersion(long lastFailedVersion) { - updateReplicaInfo(this.version, lastFailedVersion, this.lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(this.version, lastFailedVersion, this.lastSuccessVersion); } /* @@ -576,16 +582,28 @@ public boolean tooSlow() { return state == ReplicaState.COMPACTION_TOO_SLOW; } + public boolean tooBigVersionCount() { + return visibleVersionCount >= Config.min_version_count_indicate_replica_compaction_too_slow; + } + public boolean isNormal() { return state == ReplicaState.NORMAL; } - public long getVersionCount() { - return versionCount; + public long getTotalVersionCount() { + return totalVersionCount; + } + + public void setTotalVersionCount(long totalVersionCount) { + this.totalVersionCount = totalVersionCount; + } + + public long getVisibleVersionCount() { + return visibleVersionCount; } - public void setVersionCount(long versionCount) { - this.versionCount = versionCount; + public void setVisibleVersionCount(long visibleVersionCount) { + this.visibleVersionCount = visibleVersionCount; } public boolean checkVersionRegressive(long newVersion) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 376f7649c684c3..cdf59fe41b148d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -294,16 +294,16 @@ public List getQueryableReplicas(long visibleVersion, boolean allowFail } if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) { - long minVersionCount = Long.MAX_VALUE; - for (Replica replica : allQueryableReplica) { - if (replica.getVersionCount() != -1 && replica.getVersionCount() < minVersionCount) { - minVersionCount = replica.getVersionCount(); - } + long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount) + .filter(count -> count != -1).min().orElse(Long.MAX_VALUE); + long maxVersionCount = Config.min_version_count_indicate_replica_compaction_too_slow; + if (minVersionCount != Long.MAX_VALUE) { + maxVersionCount = Math.max(maxVersionCount, minVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT); } - final long finalMinVersionCount = minVersionCount; - return allQueryableReplica.stream().filter(replica -> replica.getVersionCount() == -1 - || replica.getVersionCount() < Config.min_version_count_indicate_replica_compaction_too_slow - || replica.getVersionCount() < finalMinVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT) + + final long finalMaxVersionCount = maxVersionCount; + return allQueryableReplica.stream() + .filter(replica -> replica.getVisibleVersionCount() < finalMaxVersionCount) .collect(Collectors.toList()); } return allQueryableReplica; @@ -533,7 +533,7 @@ public Pair getHealthStatusWithPriority(S if (versionCompleted) { stable++; - versions.add(replica.getVersionCount()); + versions.add(replica.getVisibleVersionCount()); allocNum = stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0); stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum + 1)); @@ -624,7 +624,7 @@ public Pair getHealthStatusWithPriority(S // get the max version diff long delta = versions.get(versions.size() - 1) - versions.get(0); double ratio = (double) delta / versions.get(versions.size() - 1); - if (versions.get(versions.size() - 1) > Config.min_version_count_indicate_replica_compaction_too_slow + if (versions.get(versions.size() - 1) >= Config.min_version_count_indicate_replica_compaction_too_slow && ratio > Config.valid_version_count_delta_ratio_between_replicas) { return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW, Priority.HIGH); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index ada44cac5dc5e5..8561d0b98089ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -22,6 +22,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.cooldown.CooldownConf; +import org.apache.doris.master.PartitionInfoCollector.PartitionCollectInfo; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TStorageMedium; @@ -37,7 +38,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -98,7 +99,10 @@ public class TabletInvertedIndex { // backend id -> (tablet id -> replica) private Table backingReplicaMetaTable = HashBasedTable.create(); - private volatile ImmutableSet partitionIdInMemorySet = ImmutableSet.of(); + // partition id -> partition info. + // notice partition info update every Config.partition_info_update_interval_secs seconds, + // so it may be stale. + private volatile ImmutableMap partitionCollectInfoMap = ImmutableMap.of(); private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); @@ -122,11 +126,13 @@ private void writeUnlock(long stamp) { } public void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, final HashMap storageMediumMap, ListMultimap tabletSyncMap, ListMultimap tabletDeleteFromMeta, Set tabletFoundInMeta, ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, Map> transactionsToPublish, ListMultimap transactionsToClear, ListMultimap tabletRecoveryMap, @@ -134,6 +140,7 @@ public void tabletReport(long backendId, Map backendTablets, List cooldownConfToPush, List cooldownConfToUpdate) { List> cooldownTablets = new ArrayList<>(); + long feTabletNum = 0; long stamp = readLock(); long start = System.currentTimeMillis(); try { @@ -142,6 +149,7 @@ public void tabletReport(long backendId, Map backendTablets, } Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { + feTabletNum = replicaMetaWithBackend.size(); taskPool.submit(() -> { // traverse replicas in meta with this backend replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> { @@ -162,11 +170,13 @@ public void tabletReport(long backendId, Map backendTablets, tabletMetaInfo = new TTabletMetaInfo(); tabletMetaInfo.setReplicaId(replica.getId()); } - if (partitionIdInMemorySet.contains( - backendTabletInfo.getPartitionId()) != backendTabletInfo.isIsInMemory()) { + PartitionCollectInfo partitionCollectInfo = + partitionCollectInfoMap.get(backendTabletInfo.getPartitionId()); + boolean isInMemory = partitionCollectInfo != null && partitionCollectInfo.isInMemory(); + if (isInMemory != backendTabletInfo.isIsInMemory()) { if (tabletMetaInfo == null) { tabletMetaInfo = new TTabletMetaInfo(); - tabletMetaInfo.setIsInMemory(!backendTabletInfo.isIsInMemory()); + tabletMetaInfo.setIsInMemory(isInMemory); } } if (Config.fix_tablet_partition_id_eq_0 @@ -325,8 +335,11 @@ && isLocal(tabletMeta.getStorageMedium())) { // update replicase's version count // no need to write log, and no need to get db lock. - if (backendTabletInfo.isSetVersionCount()) { - replica.setVersionCount(backendTabletInfo.getVersionCount()); + if (backendTabletInfo.isSetTotalVersionCount()) { + replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount()); + replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount() + ? backendTabletInfo.getVisibleVersionCount() + : backendTabletInfo.getTotalVersionCount()); } if (tabletMetaInfo != null) { tabletMetaInfo.setTabletId(tabletId); @@ -345,6 +358,15 @@ && isLocal(tabletMeta.getStorageMedium())) { } } }); + + backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> { + long partitionId = entry.getKey(); + long backendVersion = entry.getValue(); + PartitionCollectInfo partitionInfo = partitionCollectInfoMap.get(partitionId); + if (partitionInfo != null && partitionInfo.getVisibleVersion() > backendVersion) { + partitionVersionSyncMap.put(partitionId, partitionInfo.getVisibleVersion()); + } + }); }).join(); } } finally { @@ -353,11 +375,13 @@ && isLocal(tabletMeta.getStorageMedium())) { cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, cooldownConfToPush, cooldownConfToUpdate)); long end = System.currentTimeMillis(); - LOG.info("finished to do tablet diff with backend[{}]. sync: {}." - + " metaDel: {}. foundInMeta: {}. migration: {}. " - + "found invalid transactions {}. found republish transactions {}. tabletToUpdate: {}." - + " need recovery: {}. cost: {} ms", backendId, tabletSyncMap.size(), + LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: {}, backend tablet num: {}. sync: {}." + + " metaDel: {}. foundInMeta: {}. migration: {}. backend partition num: {}, backend need " + + "update: {}. found invalid transactions {}. found republish " + + "transactions {}. tabletToUpdate: {}. need recovery: {}. cost: {} ms", + backendId, feTabletNum, backendTablets.size(), tabletSyncMap.size(), tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), + backendPartitionsVersion.size(), partitionVersionSyncMap.size(), transactionsToClear.size(), transactionsToPublish.size(), tabletToUpdate.size(), tabletRecoveryMap.size(), (end - start)); } @@ -756,8 +780,8 @@ public void clear() { } } - public void setPartitionIdInMemorySet(ImmutableSet partitionIdInMemorySet) { - this.partitionIdInMemorySet = partitionIdInMemorySet; + public void setPartitionCollectInfoMap(ImmutableMap partitionCollectInfoMap) { + this.partitionCollectInfoMap = partitionCollectInfoMap; } public Map getReplicaToTabletMap() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index e2b65a95f6ea63..896ecac6f8ef05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -151,8 +151,12 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); if (replica != null) { - replica.updateStat(stat.getDataSize(), stat.getRemoteDataSize(), stat.getRowNum(), - stat.getVersionCount()); + replica.setDataSize(stat.getDataSize()); + replica.setRemoteDataSize(stat.getRemoteDataSize()); + replica.setRowCount(stat.getRowCount()); + replica.setTotalVersionCount(stat.getTotalVersionCount()); + replica.setVisibleVersionCount(stat.isSetVisibleVersionCount() ? stat.getVisibleVersionCount() + : stat.getTotalVersionCount()); } } } @@ -168,7 +172,8 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { continue; } // TODO(cmy) no db lock protected. I think it is ok even we get wrong row num - replica.updateStat(entry.getValue().getDataSize(), entry.getValue().getRowNum()); + replica.setDataSize(entry.getValue().getDataSize()); + replica.setRowCount(entry.getValue().getRowCount()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 9d54ce2dc5fb24..a7537d59092a2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -574,15 +574,15 @@ public boolean compactionRecovered() { Replica chosenReplica = null; long maxVersionCount = Integer.MIN_VALUE; for (Replica replica : tablet.getReplicas()) { - if (replica.getVersionCount() > maxVersionCount) { - maxVersionCount = replica.getVersionCount(); + if (replica.getVisibleVersionCount() > maxVersionCount) { + maxVersionCount = replica.getVisibleVersionCount(); chosenReplica = replica; } } boolean recovered = false; for (Replica replica : tablet.getReplicas()) { if (replica.isAlive() && replica.tooSlow() && (!replica.equals(chosenReplica) - || replica.getVersionCount() < Config.min_version_count_indicate_replica_compaction_too_slow)) { + || !replica.tooBigVersionCount())) { if (chosenReplica != null) { chosenReplica.setState(ReplicaState.NORMAL); recovered = true; @@ -1177,8 +1177,7 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) "replica does not exist. backend id: " + destBackendId); } - replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), - reportedTablet.getDataSize(), reportedTablet.getRowCount()); + replica.updateWithReport(reportedTablet); if (replica.getLastFailedVersion() > partition.getCommittedVersion() && reportedTablet.getVersion() >= partition.getCommittedVersion() //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() @@ -1365,8 +1364,8 @@ public String toString() { public static class VersionCountComparator implements Comparator { @Override public int compare(Replica r1, Replica r2) { - long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : r1.getVersionCount(); - long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : r2.getVersionCount(); + long verCount1 = r1.getVisibleVersionCount() == -1 ? Long.MAX_VALUE : r1.getVisibleVersionCount(); + long verCount2 = r2.getVisibleVersionCount() == -1 ? Long.MAX_VALUE : r2.getVisibleVersionCount(); if (verCount1 < verCount2) { return -1; } else if (verCount1 > verCount2) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 0de9c5b65895a7..496c9407fc571f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1110,13 +1110,13 @@ private void handleReplicaTooSlow(TabletSchedCtx tabletCtx) throws SchedExceptio if (replica.isAlive() && !replica.tooSlow()) { normalReplicaCount++; } - if (replica.getVersionCount() > maxVersionCount) { - maxVersionCount = replica.getVersionCount(); + if (replica.getVisibleVersionCount() > maxVersionCount) { + maxVersionCount = replica.getVisibleVersionCount(); chosenReplica = replica; } } if (chosenReplica != null && chosenReplica.isAlive() && !chosenReplica.tooSlow() - && chosenReplica.getVersionCount() > Config.min_version_count_indicate_replica_compaction_too_slow + && chosenReplica.tooBigVersionCount() && normalReplicaCount - 1 >= tabletCtx.getReplicas().size() / 2 + 1) { chosenReplica.setState(ReplicaState.COMPACTION_TOO_SLOW); LOG.info("set replica id :{} tablet id: {}, backend id: {} to COMPACTION_TOO_SLOW", diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index aa1218ab9b3f3f..bf03aabc7a79cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -895,7 +895,8 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { } @Override - public void finishTransaction(long dbId, long transactionId) throws UserException { + public void finishTransaction(long dbId, long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { throw new UserException("Disallow to call finishTransaction()"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index 9b2a8f6d8f7eee..edf2a9d35179e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -44,7 +44,7 @@ public class ReplicasProcNode implements ProcNodeInterface { .add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") .add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad") .add("IsUserDrop") - .add("VersionCount").add("PathHash").add("Path") + .add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path") .add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId") .add("CooldownMetaId").add("QueryHits").build(); @@ -117,7 +117,8 @@ public ProcResult fetchResult() { String.valueOf(replica.getState()), String.valueOf(replica.isBad()), String.valueOf(replica.isUserDrop()), - String.valueOf(replica.getVersionCount()), + String.valueOf(replica.getVisibleVersionCount()), + String.valueOf(replica.getTotalVersionCount()), String.valueOf(replica.getPathHash()), path, metaUrl, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index 3ce3ff74c7adaf..ce88a52082e830 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -256,8 +256,7 @@ static class DBTabletStatistic { oversizeTabletIds.add(tablet.getId()); } for (Replica replica : tablet.getReplicas()) { - if (replica.getVersionCount() - > Config.min_version_count_indicate_replica_compaction_too_slow) { + if (replica.tooBigVersionCount()) { replicaCompactionTooSlowNum++; replicaCompactionTooSlowTabletIds.add(tablet.getId()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index c82a55bd3801ba..46c89eb3253e50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -52,9 +52,10 @@ public class TabletsProcDir implements ProcDirInterface { .add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") .add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State") .add("LstConsistencyCheckTime").add("CheckVersion") - .add("VersionCount").add("QueryHits").add("PathHash").add("Path") + .add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path") .add("MetaUrl").add("CompactionStatus") - .add("CooldownReplicaId").add("CooldownMetaId").build(); + .add("CooldownReplicaId").add("CooldownMetaId") + .build(); private Table table; private MaterializedIndex index; @@ -113,7 +114,8 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(-1); // lst consistency check time tabletInfo.add(-1); // check version tabletInfo.add(-1); // check version hash - tabletInfo.add(-1); // version count + tabletInfo.add(-1); // visible version count + tabletInfo.add(-1); // total version count tabletInfo.add(0L); // query hits tabletInfo.add(-1); // path hash tabletInfo.add(FeConstants.null_string); // path @@ -147,7 +149,8 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(TimeUtils.longToTimeString(tablet.getLastCheckTime())); tabletInfo.add(tablet.getCheckedVersion()); - tabletInfo.add(replica.getVersionCount()); + tabletInfo.add(replica.getVisibleVersionCount()); + tabletInfo.add(replica.getTotalVersionCount()); tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L)); tabletInfo.add(replica.getPathHash()); tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), "")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 7bd0040395a174..98e837de34203c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1078,7 +1078,10 @@ private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info Tablet tablet = materializedIndex.getTablet(info.getTabletId()); Replica replica = tablet.getReplicaById(info.getReplicaId()); Preconditions.checkNotNull(replica, info); - replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRemoteDataSize(), info.getRowCount()); + replica.updateVersion(info.getVersion()); + replica.setDataSize(info.getDataSize()); + replica.setRemoteDataSize(info.getRemoteDataSize()); + replica.setRowCount(info.getRowCount()); replica.setBad(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java similarity index 68% rename from fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java rename to fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java index 77ed5829799363..f4bf87ad1099d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java @@ -27,29 +27,49 @@ import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; -public class PartitionInMemoryInfoCollector extends MasterDaemon { +public class PartitionInfoCollector extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(PartitionInMemoryInfoCollector.class); + // notice since collect partition info every Config.partition_info_update_interval_secs seconds, + // so partition collect info may be stale + public static class PartitionCollectInfo { + private long visibleVersion; + private boolean isInMemory; - public PartitionInMemoryInfoCollector() { - super("PartitionInMemoryInfoCollector", Config.partition_in_memory_update_interval_secs * 1000); + PartitionCollectInfo(long visibleVersion, boolean isInMemory) { + this.visibleVersion = visibleVersion; + this.isInMemory = isInMemory; + } + + public long getVisibleVersion() { + return this.visibleVersion; + } + + public boolean isInMemory() { + return this.isInMemory; + } + } + + private static final Logger LOG = LogManager.getLogger(PartitionInfoCollector.class); + + public PartitionInfoCollector() { + super("PartitionInfoCollector", Config.partition_info_update_interval_secs * 1000); } @Override protected void runAfterCatalogReady() { - updatePartitionInMemoryInfo(); + updatePartitionCollectInfo(); } - private void updatePartitionInMemoryInfo() { + private void updatePartitionCollectInfo() { Env env = Env.getCurrentEnv(); TabletInvertedIndex tabletInvertedIndex = env.getTabletInvertedIndex(); - ImmutableSet.Builder builder = ImmutableSet.builder(); + ImmutableMap.Builder builder = ImmutableMap.builder(); List dbIdList = env.getInternalCatalog().getDbIds(); for (Long dbId : dbIdList) { Database db = env.getInternalCatalog().getDbNullable(dbId); @@ -70,10 +90,12 @@ private void updatePartitionInMemoryInfo() { try { OlapTable olapTable = (OlapTable) table; for (Partition partition : olapTable.getAllPartitions()) { - if (olapTable.getPartitionInfo().getIsInMemory(partition.getId())) { + boolean isInMemory = olapTable.getPartitionInfo().getIsInMemory(partition.getId()); + if (isInMemory) { partitionInMemoryCount++; - builder.add(partition.getId()); } + builder.put(partition.getId(), + new PartitionCollectInfo(partition.getVisibleVersion(), isInMemory)); } } finally { table.readUnlock(); @@ -87,9 +109,6 @@ private void updatePartitionInMemoryInfo() { LOG.warn("Update database[" + db.getFullName() + "] partition in memory info failed", e); } } - ImmutableSet partitionIdInMemorySet = builder.build(); - tabletInvertedIndex.setPartitionIdInMemorySet(partitionIdInMemorySet); + tabletInvertedIndex.setPartitionCollectInfoMap(builder.build()); } - - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 6684fe6e71c494..a2427e88d7bb7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -72,6 +72,7 @@ import org.apache.doris.task.PushStoragePolicyTask; import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.task.UpdateTabletMetaInfoTask; +import org.apache.doris.task.UpdateVisibleVersionTask; import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TMasterResult; @@ -156,6 +157,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException { Map> tasks = null; Map disks = null; Map tablets = null; + Map partitionsVersion = null; long reportVersion = -1; ReportType reportType = ReportType.UNKNOWN; @@ -181,11 +183,15 @@ public TMasterResult handleReport(TReportRequest request) throws TException { reportType = ReportType.TABLET; } + if (request.isSetPartitionsVersion()) { + partitionsVersion = request.getPartitionsVersion(); + } + if (request.isSetTabletMaxCompactionScore()) { backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore()); } - ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, + ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, partitionsVersion, reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(), request.getPipelineExecutorSize()); try { @@ -232,6 +238,7 @@ private class ReportTask extends MasterTask { private Map> tasks; private Map disks; private Map tablets; + private Map partitionsVersion; private long reportVersion; private List storagePolicies; @@ -240,14 +247,15 @@ private class ReportTask extends MasterTask { private int pipelineExecutorSize; public ReportTask(long beId, Map> tasks, - Map disks, - Map tablets, long reportVersion, + Map disks, Map tablets, + Map partitionsVersion, long reportVersion, List storagePolicies, List storageResources, int cpuCores, int pipelineExecutorSize) { this.beId = beId; this.tasks = tasks; this.disks = disks; this.tablets = tablets; + this.partitionsVersion = partitionsVersion; this.reportVersion = reportVersion; this.storagePolicies = storagePolicies; this.storageResources = storageResources; @@ -274,7 +282,11 @@ protected void exec() { LOG.warn("out of date report version {} from backend[{}]. current report version[{}]", reportVersion, beId, backendReportVersion); } else { - ReportHandler.tabletReport(beId, tablets, reportVersion); + Map partitions = this.partitionsVersion; + if (partitions == null) { + partitions = Maps.newHashMap(); + } + ReportHandler.tabletReport(beId, tablets, partitions, reportVersion); } } } @@ -409,7 +421,8 @@ private static void diffResource(List storageResourcesInBe, Li } // public for fe ut - public static void tabletReport(long backendId, Map backendTablets, long backendReportVersion) { + public static void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, long backendReportVersion) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); @@ -427,6 +440,9 @@ public static void tabletReport(long backendId, Map backendTablet // storage medium -> tablet id ListMultimap tabletMigrationMap = LinkedListMultimap.create(); + // partition id -> visible version + Map partitionVersionSyncMap = Maps.newConcurrentMap(); + // dbid -> txn id -> [partition info] Map> transactionsToPublish = Maps.newHashMap(); ListMultimap transactionsToClear = LinkedListMultimap.create(); @@ -440,11 +456,13 @@ public static void tabletReport(long backendId, Map backendTablet List cooldownConfToUpdate = new LinkedList<>(); // 1. do the diff. find out (intersection) / (be - meta) / (meta - be) - Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap, + Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, backendPartitionsVersion, + storageMediumMap, tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + partitionVersionSyncMap, transactionsToPublish, transactionsToClear, tabletRecoveryMap, @@ -500,6 +518,9 @@ public static void tabletReport(long backendId, Map backendTablet if (!cooldownConfToUpdate.isEmpty()) { Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate); } + if (!partitionVersionSyncMap.isEmpty()) { + handleUpdatePartitionVersion(partitionVersionSyncMap, backendId); + } final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo(); Backend reportBackend = currentSystemInfo.getBackend(backendId); @@ -660,27 +681,17 @@ private static void sync(Map backendTablets, ListMultimap t.getSchemaHash() == schemaHash).findFirst().orElse(null); + if (tabletInfo == null) { continue; } + long metaVersion = replica.getVersion(); + long backendVersion = tabletInfo.getVersion(); boolean needSync = false; if (metaVersion < backendVersion) { needSync = true; @@ -703,7 +714,7 @@ private static void sync(Map backendTablets, ListMultimap backendTablets, ListMultimap partitionVersionSyncMap, long backendId) { + AgentBatchTask batchTask = new AgentBatchTask(); + UpdateVisibleVersionTask task = new UpdateVisibleVersionTask(backendId, partitionVersionSyncMap, + System.currentTimeMillis()); + batchTask.addTask(task); + AgentTaskExecutor.submit(batchTask); + } + private static void handleRecoverTablet(ListMultimap tabletRecoveryMap, Map backendTablets, long backendId) { // print a warn log here to indicate the exceptions on the backend diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java index 2b084fcfa23431..5e7748a35247be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.common.Config; import com.google.common.collect.Lists; import org.json.simple.JSONObject; @@ -154,9 +153,9 @@ public static List> diagnoseTablet(long tabletId) { + ", and is bad: " + (replica.isBad() ? "Yes" : "No") + ", and is going to drop: " + (replica.isUserDrop() ? "Yes" : "No")); } - if (replica.getVersionCount() > Config.min_version_count_indicate_replica_compaction_too_slow) { + if (replica.tooBigVersionCount()) { compactionErr.append("Replica on backend " + replica.getBackendId() + "'s version count is too high: " - + replica.getVersionCount()); + + replica.getVisibleVersionCount()); } } results.add(Lists.newArrayList("ReplicaBackendStatus", (backendErr.length() == 0 diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index 863d4acde0c8ab..36a421e8415e09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -49,6 +49,7 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUpdateTabletMetaInfoReq; import org.apache.doris.thrift.TUploadReq; +import org.apache.doris.thrift.TVisibleVersionReq; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -394,6 +395,15 @@ private TAgentTaskRequest toAgentTaskRequest(AgentTask task) { tAgentTaskRequest.setGcBinlogReq(request); return tAgentTaskRequest; } + case UPDATE_VISIBLE_VERSION: { + UpdateVisibleVersionTask visibleTask = (UpdateVisibleVersionTask) task; + TVisibleVersionReq request = visibleTask.toThrift(); + if (LOG.isDebugEnabled()) { + LOG.debug(request.toString()); + } + tAgentTaskRequest.setVisibleVersionReq(request); + return tAgentTaskRequest; + } case CALCULATE_DELETE_BITMAP: { CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask) task; TCalcDeleteBitmapRequest request = calcDeleteBitmapTask.toThrift(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java new file mode 100644 index 00000000000000..52ed3b1c490381 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TVisibleVersionReq; + +import java.util.Map; + +public class UpdateVisibleVersionTask extends AgentTask { + private Map partitionVisibleVersions; + + public UpdateVisibleVersionTask(long backendId, Map partitionVisibleVersions, long createTime) { + super(null, backendId, TTaskType.UPDATE_VISIBLE_VERSION, -1L, -1L, -1L, -1L, -1L, -1L, createTime); + this.partitionVisibleVersions = partitionVisibleVersions; + } + + public TVisibleVersionReq toThrift() { + TVisibleVersionReq request = new TVisibleVersionReq(); + partitionVisibleVersions.forEach((partitionId, version) -> { + request.putToPartitionVersion(partitionId, version); + }); + return request; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index eb075628bcc27b..47f0aa9d4062c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -44,6 +44,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.InternalDatabaseUtil; import org.apache.doris.common.util.MetaLockUtils; @@ -996,7 +997,12 @@ protected List getCommittedTxnList() { } } - public void finishTransaction(long transactionId) throws UserException { + public void finishTransaction(long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { + if (DebugPointUtil.isEnable("DatabaseTransactionMgr.stop_finish_transaction")) { + return; + } + TransactionState transactionState = null; readLock(); try { @@ -1059,7 +1065,7 @@ public void finishTransaction(long transactionId) throws UserException { LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } } - updateCatalogAfterVisible(transactionState, db); + updateCatalogAfterVisible(transactionState, db, partitionVisibleVersions, backendPartitions); } finally { MetaLockUtils.writeUnlockTables(tableList); } @@ -1925,7 +1931,8 @@ private void updateCatalogAfterCommitted(TransactionState transactionState, Data } } - private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) { + private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db, + Map partitionVisibleVersions, Map> backendPartitions) { Set errorReplicaIds = transactionState.getErrorReplicas(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); List newPartitionLoadedTableIds = new ArrayList<>(); @@ -1982,7 +1989,13 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat lastFailedVersion = newCommitVersion; } } - replica.updateVersionWithFailedInfo(newVersion, lastFailedVersion, lastSuccessVersion); + replica.updateVersionWithFailed(newVersion, lastFailedVersion, lastSuccessVersion); + Set partitionIds = backendPartitions.get(replica.getBackendId()); + if (partitionIds == null) { + partitionIds = Sets.newHashSet(); + backendPartitions.put(replica.getBackendId(), partitionIds); + } + partitionIds.add(partitionId); } } } // end for indices @@ -1993,6 +2006,7 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat newPartitionLoadedTableIds.add(tableId); } partition.updateVisibleVersionAndTime(version, versionTime); + partitionVisibleVersions.put(partition.getId(), version); if (LOG.isDebugEnabled()) { LOG.debug("transaction state {} set partition {}'s version to [{}]", transactionState, partition.getId(), version); @@ -2141,7 +2155,7 @@ public void replayUpsertTransactionState(TransactionState transactionState) thro if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { updateCatalogAfterCommitted(transactionState, db, true); } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - updateCatalogAfterVisible(transactionState, db); + updateCatalogAfterVisible(transactionState, db, Maps.newHashMap(), Maps.newHashMap()); } unprotectUpsertTransactionState(transactionState, true); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index f527f75c432588..74d2753986c6aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -402,9 +402,10 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { * @param transactionId * @return */ - public void finishTransaction(long dbId, long transactionId) throws UserException { + public void finishTransaction(long dbId, long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.finishTransaction(transactionId); + dbTransactionMgr.finishTransaction(transactionId, partitionVisibleVersions, backendPartitions); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java index 7ca9229d3627d2..2e19c9e3ce00de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeoutException; public interface GlobalTransactionMgrIface extends Writable { @@ -109,7 +110,8 @@ public void abortTransaction(Long dbId, Long txnId, String reason, public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId); - public void finishTransaction(long dbId, long transactionId) throws UserException; + public void finishTransaction(long dbId, long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException; public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, List tableIdList) throws AnalysisException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 2ceebe2b53c128..d7bf0d18b12d5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -32,6 +32,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PublishVersionTask; +import org.apache.doris.task.UpdateVisibleVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; @@ -60,14 +61,18 @@ public PublishVersionDaemon() { @Override protected void runAfterCatalogReady() { + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + try { - publishVersion(); + publishVersion(partitionVisibleVersions, backendPartitions); + sendBackendVisibleVersion(partitionVisibleVersions, backendPartitions); } catch (Throwable t) { LOG.error("errors while publish version to all backends", t); } } - private void publishVersion() { + private void publishVersion(Map partitionVisibleVersions, Map> backendPartitions) { if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) { return; } @@ -176,7 +181,7 @@ private void publishVersion() { try { // one transaction exception should not affect other transaction globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId()); + transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); } catch (Exception e) { LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); } @@ -229,4 +234,23 @@ private Map> getBaseTabletIdsForEachBe(TransactionState transact .collect(Collectors.groupingBy(p -> p.first, Collectors.mapping(p -> p.second, Collectors.toSet()))); } + + private void sendBackendVisibleVersion(Map partitionVisibleVersions, + Map> backendPartitions) { + if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty()) { + return; + } + + long createTime = System.currentTimeMillis(); + AgentBatchTask batchTask = new AgentBatchTask(); + backendPartitions.forEach((backendId, partitionIds) -> { + Map backendPartitionVersions = partitionVisibleVersions.entrySet().stream() + .filter(entry -> partitionIds.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + UpdateVisibleVersionTask task = new UpdateVisibleVersionTask(backendId, backendPartitionVersions, + createTime); + batchTask.addTask(task); + }); + AgentTaskExecutor.submit(batchTask); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 2a47520f5c5e94..a1bc25d5034cc9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -221,10 +221,7 @@ public void testSchemaChange1() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), - shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), - shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } @@ -301,10 +298,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), - shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), - shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 5e99599b041e20..6db68473bdbe8e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -213,8 +213,7 @@ public void testSchemaChange1() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } @@ -296,8 +295,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java index 21fe967a96dd68..54debab9a6362d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java @@ -73,7 +73,8 @@ public void testShowReplicaDistribution() throws Exception { for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateStat(1024, 2); + replica.setDataSize(1024L); + replica.setRowCount(2L); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java index d6a81cdd88339f..eb7dbca0775e54 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java @@ -65,12 +65,7 @@ public void getMethodTest() { // update new version long newVersion = version + 1; - long newDataSize = dataSize + 100; - long newRowCount = rowCount + 10; - replica.updateVersionInfo(newVersion, newDataSize, 0, newRowCount); - Assert.assertEquals(newVersion, replica.getVersion()); - Assert.assertEquals(newDataSize, replica.getDataSize()); - Assert.assertEquals(newRowCount, replica.getRowCount()); + replica.updateVersion(newVersion); // check version catch up Assert.assertFalse(replica.checkVersionCatchUp(5, false)); @@ -132,14 +127,14 @@ public void testSerialization() throws Exception { public void testUpdateVersion1() { Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 3); // new version is little than original version, it is invalid the version will not update - originalReplica.updateVersionInfo(2, 100, 0, 78); + originalReplica.updateVersion(2); Assert.assertEquals(3, originalReplica.getVersion()); } @Test public void testUpdateVersion2() { Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 0); - originalReplica.updateVersionInfo(3, 100, 0, 78); + originalReplica.updateVersion(3); // if new version >= current version and last success version <= new version, then last success version should be updated Assert.assertEquals(3, originalReplica.getLastSuccessVersion()); Assert.assertEquals(3, originalReplica.getVersion()); @@ -155,7 +150,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update last success version 10 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 10); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); @@ -163,7 +158,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update version to 8, the last success version and version should be 10 - originalReplica.updateVersionInfo(8, 100, 0, 78); + originalReplica.updateVersion(8); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); Assert.assertEquals(10, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); @@ -175,7 +170,7 @@ public void testUpdateVersion3() { Assert.assertEquals(12, originalReplica.getLastFailedVersion()); // update last success version to 15 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 15); Assert.assertEquals(15, originalReplica.getLastSuccessVersion()); @@ -189,13 +184,13 @@ public void testUpdateVersion3() { Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 17 then version and success version is 17 - originalReplica.updateVersionInfo(17, 100, 0, 78); + originalReplica.updateVersion(17); Assert.assertEquals(17, originalReplica.getLastSuccessVersion()); Assert.assertEquals(17, originalReplica.getVersion()); Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 18, then version and last success version should be 18 and failed version should be -1 - originalReplica.updateVersionInfo(18, 100, 0, 78); + originalReplica.updateVersion(18); Assert.assertEquals(18, originalReplica.getLastSuccessVersion()); Assert.assertEquals(18, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java index f563b846ed7da2..40b6683da3dc6b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java @@ -110,7 +110,8 @@ public void testDiskReblanceWhenSchedulerIdle() throws Exception { Lists.newArrayList(tablet.getReplicas()).forEach( replica -> { if (replica.getBackendId() == backends.get(1).getId()) { - replica.updateStat(totalCapacity / 4, 1); + replica.setDataSize(totalCapacity / 4); + replica.setRowCount(1); tablet.deleteReplica(replica); replica.setBackendId(backends.get(0).getId()); replica.setPathHash(diskInfo0.getPathHash()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index 063faf2d3b2ea6..1e6af5c7324e85 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -104,7 +104,7 @@ public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex replica.setPathHash(beIds.get(i)); if (replicaSizes != null) { // for disk rebalancer, every beId corresponding to a replicaSize - replica.updateStat(replicaSizes.get(i), 0); + replica.setDataSize(replicaSizes.get(i)); } // isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex tablet.addReplica(replica, true); @@ -165,7 +165,8 @@ public static void updateReplicaDataSize(long minReplicaSize, int tableSkew, in for (Tablet tablet : idx.getTablets()) { long tabletSize = tableBaseSize * (1 + random.nextInt(tabletSkew)); for (Replica replica : tablet.getReplicas()) { - replica.updateStat(tabletSize, 1000L); + replica.setDataSize(tabletSize); + replica.setRowCount(1000L); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java index 1150b32192f341..1ac497dbebe28d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -106,7 +106,7 @@ public void testRepairLastFailedVersionByReport() throws Exception { tablets.put(tablet.getId(), tTablet); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); @@ -136,11 +136,11 @@ public void testVersionRegressive() throws Exception { Map tablets = Maps.newHashMap(); tablets.put(tablet.getId(), tTablet); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); @@ -160,7 +160,7 @@ private TableInfo prepareTableForTest(String tableName) throws Exception { long visibleVersion = 2L; partition.updateVisibleVersion(visibleVersion); partition.setNextVersion(visibleVersion + 1); - tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); + tablet.getReplicas().forEach(replica -> replica.updateVersion(visibleVersion)); Replica replica = tablet.getReplicas().iterator().next(); Assertions.assertEquals(visibleVersion, replica.getVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index 6a38985b73f6e7..7d918ef7db54ae 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -141,7 +141,7 @@ private static void updateReplicaVersionCount() { List pathHashes = be.getDisks().values().stream() .map(DiskInfo::getPathHash).collect(Collectors.toList()); Replica replica = cell.getValue(); - replica.setVersionCount(versionCount); + replica.setVisibleVersionCount(versionCount); versionCount = versionCount + 200; replica.setPathHash(pathHashes.get(0)); @@ -171,7 +171,7 @@ public void test() throws Exception { boolean found = false; for (Table.Cell cell : replicaMetaTable.cellSet()) { Replica replica = cell.getValue(); - if (replica.getVersionCount() == 401) { + if (replica.getVisibleVersionCount() == 401) { if (replica.tooSlow()) { LOG.info("set to TOO_SLOW."); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index d4578e17d7fb38..852f072eca1c35 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -84,19 +84,19 @@ public void testVersionCountComparator() { TabletSchedCtx.VersionCountComparator countComparator = new TabletSchedCtx.VersionCountComparator(); List replicaList = Lists.newArrayList(); Replica replica1 = new Replica(); - replica1.setVersionCount(100); + replica1.setVisibleVersionCount(100); replica1.setState(Replica.ReplicaState.NORMAL); Replica replica2 = new Replica(); - replica2.setVersionCount(50); + replica2.setVisibleVersionCount(50); replica2.setState(Replica.ReplicaState.NORMAL); Replica replica3 = new Replica(); - replica3.setVersionCount(-1); + replica3.setVisibleVersionCount(-1); replica3.setState(Replica.ReplicaState.NORMAL); Replica replica4 = new Replica(); - replica4.setVersionCount(200); + replica4.setVisibleVersionCount(200); replica4.setState(Replica.ReplicaState.NORMAL); replicaList.add(replica1); @@ -105,10 +105,10 @@ public void testVersionCountComparator() { replicaList.add(replica4); Collections.sort(replicaList, countComparator); - Assert.assertEquals(50, replicaList.get(0).getVersionCount()); - Assert.assertEquals(100, replicaList.get(1).getVersionCount()); - Assert.assertEquals(200, replicaList.get(2).getVersionCount()); - Assert.assertEquals(-1, replicaList.get(3).getVersionCount()); + Assert.assertEquals(50, replicaList.get(0).getVisibleVersionCount()); + Assert.assertEquals(100, replicaList.get(1).getVisibleVersionCount()); + Assert.assertEquals(200, replicaList.get(2).getVisibleVersionCount()); + Assert.assertEquals(-1, replicaList.get(3).getVisibleVersionCount()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 76fe82e6599a87..9ff6e7ac3ca693 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1101,7 +1101,7 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1115,7 +1115,7 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1199,7 +1199,7 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1229,7 +1229,7 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2); } } } @@ -1249,7 +1249,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1278,7 +1278,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 84a7d6d283d477..0fc445d4436f5d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -42,6 +42,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; +import java.util.Set; public class DatabaseTransactionMgrTest { @@ -115,7 +116,10 @@ public Map addTransactionToTransactionMgr() throws UserException { setTransactionFinishPublish(transactionState1, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1, + partitionVisibleVersions, backendPartitions); labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); TransactionState.TxnCoordinator beTransactionSource = new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.BE, "be1"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index cc6744ed6a3fae..362c1bf2ff1df0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -53,8 +53,10 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import mockit.Injectable; import mockit.Mocked; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -65,6 +67,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; public class GlobalTransactionMgrTest { @@ -469,7 +472,10 @@ public void testFinishTransaction() throws UserException { CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); // check replica version @@ -488,6 +494,14 @@ public void testFinishTransaction() throws UserException { } } + + Assert.assertEquals(ImmutableMap.of(testPartition.getId(), CatalogTestUtil.testStartVersion + 1), + partitionVisibleVersions); + Set partitionIds = Sets.newHashSet(testPartition.getId()); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId1)); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId2)); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId3)); + // slave replay new state and compare catalog slaveTransMgr.replayUpsertTransactionState(transactionState); Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv)); @@ -530,8 +544,13 @@ public void testFinishTransactionWithOneFailed() throws UserException { // backend2 publish failed transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); + Assert.assertTrue(partitionVisibleVersions.isEmpty()); + Assert.assertTrue(backendPartitions.isEmpty()); Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1); Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2); Replica replica3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3); @@ -549,7 +568,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L); transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica2.getVersion()); @@ -614,7 +634,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica2.getVersion()); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 4e381b168052ad..587715d43cd28d 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -404,6 +404,10 @@ struct TPublishVersionRequest { 4: optional set base_tablet_ids } +struct TVisibleVersionReq { + 1: required map partition_version +} + struct TCalcDeleteBitmapPartitionInfo { 1: required Types.TPartitionId partition_id 2: required Types.TVersion version @@ -516,6 +520,7 @@ struct TAgentTaskRequest { 32: optional TAlterInvertedIndexReq alter_inverted_index_req 33: optional TGcBinlogReq gc_binlog_req 34: optional TCleanTrashReq clean_trash_req + 35: optional TVisibleVersionReq visible_version_req // For cloud 1000: optional TCalcDeleteBitmapRequest calc_delete_bitmap_req diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index d4039ff58127bf..46018fc947d884 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -34,9 +34,10 @@ struct TTabletStat { 1: required i64 tablet_id // local data size 2: optional i64 data_size - 3: optional i64 row_num - 4: optional i64 version_count + 3: optional i64 row_count + 4: optional i64 total_version_count 5: optional i64 remote_data_size + 6: optional i64 visible_version_count } struct TTabletStatResult { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index fb5dd416bd1ae5..ec647dbcf92256 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -33,7 +33,7 @@ struct TTabletInfo { 6: required Types.TSize data_size 7: optional Types.TStorageMedium storage_medium 8: optional list transaction_ids - 9: optional i64 version_count + 9: optional i64 total_version_count 10: optional i64 path_hash 11: optional bool version_miss 12: optional bool used @@ -46,6 +46,7 @@ struct TTabletInfo { // 18: optional bool is_cooldown 19: optional i64 cooldown_term 20: optional Types.TUniqueId cooldown_meta_id + 21: optional i64 visible_version_count // For cloud 1000: optional bool is_persistent @@ -109,6 +110,7 @@ struct TReportRequest { 10: optional list resource // only id and version 11: i32 num_cores 12: i32 pipeline_executor_size + 13: optional map partitions_version } struct TMasterResult { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index c7df9a7e0944f2..4e0ad9afed923a 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -224,6 +224,7 @@ enum TTaskType { ALTER_INVERTED_INDEX, GC_BINLOG, CLEAN_TRASH, + UPDATE_VISIBLE_VERSION, // CLOUD CALCULATE_DELETE_BITMAP = 1000 diff --git a/regression-test/data/compaction/test_compaction_with_visible_version.out b/regression-test/data/compaction/test_compaction_with_visible_version.out new file mode 100644 index 00000000000000..de90dd5fa2a3a7 --- /dev/null +++ b/regression-test/data/compaction/test_compaction_with_visible_version.out @@ -0,0 +1,448 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 + +-- !select_2 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 + +-- !select_3 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_4 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_5 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_6 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 +42 420 +43 430 +44 440 +45 450 +46 460 +47 470 +48 480 +49 490 +50 500 +51 510 +52 520 +53 530 +54 540 +55 550 +56 560 +57 570 +58 580 +59 590 +60 600 +61 610 +62 620 +63 630 +64 640 +65 650 +66 660 +67 670 +68 680 +69 690 +70 700 +71 710 +72 720 +73 730 +74 740 +75 750 +76 760 +77 770 +78 780 +79 790 +80 800 +81 810 +82 820 +83 830 +84 840 +85 850 +86 860 +87 870 +88 880 +89 890 +90 900 +91 910 +92 920 +93 930 +94 940 +95 950 +96 960 +97 970 +98 980 +99 990 +100 1000 +101 1010 +102 1020 +103 1030 +104 1040 +105 1050 +106 1060 +107 1070 +108 1080 +109 1090 +110 1100 +111 1110 +112 1120 +113 1130 +114 1140 +115 1150 +116 1160 +117 1170 +118 1180 +119 1190 +120 1200 +121 1210 + +-- !select_7 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 +42 420 +43 430 +44 440 +45 450 +46 460 +47 470 +48 480 +49 490 +50 500 +51 510 +52 520 +53 530 +54 540 +55 550 +56 560 +57 570 +58 580 +59 590 +60 600 +61 610 +62 620 +63 630 +64 640 +65 650 +66 660 +67 670 +68 680 +69 690 +70 700 +71 710 +72 720 +73 730 +74 740 +75 750 +76 760 +77 770 +78 780 +79 790 +80 800 +81 810 +82 820 +83 830 +84 840 +85 850 +86 860 +87 870 +88 880 +89 890 +90 900 +91 910 +92 920 +93 930 +94 940 +95 950 +96 960 +97 970 +98 980 +99 990 +100 1000 +101 1010 +102 1020 +103 1030 +104 1040 +105 1050 +106 1060 +107 1070 +108 1080 +109 1090 +110 1100 +111 1110 +112 1120 +113 1130 +114 1140 +115 1150 +116 1160 +117 1170 +118 1180 +119 1190 +120 1200 +121 1210 +122 1220 +123 1230 +124 1240 +125 1250 +126 1260 +127 1270 +128 1280 +129 1290 +130 1300 +131 1310 +132 1320 +133 1330 +134 1340 +135 1350 +136 1360 +137 1370 +138 1380 +139 1390 +140 1400 +141 1410 +142 1420 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index a915851b9389e8..49bfbc18792f0c 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -34,13 +34,16 @@ class ClusterOptions { int feNum = 1 int beNum = 3 + List feConfigs = [ 'heartbeat_interval_second=5', ] List beConfigs = [ + 'report_disk_state_interval_seconds=2', 'report_random_wait=false', ] + boolean connectToFollower = false // 1. cloudMode = true, only create cloud cluster. diff --git a/regression-test/suites/compaction/test_compaction_with_visible_version.groovy b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy new file mode 100644 index 00000000000000..194a1b67566192 --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.NodeType + +suite('test_compaction_with_visible_version') { + def options = new ClusterOptions() + def compaction_keep_invisible_version_min_count = 50L + options.feConfigs += [ + 'partition_info_update_interval_secs=5', + ] + options.beConfigs += [ + 'disable_auto_compaction=true', + 'report_tablet_interval_seconds=1', + 'tablet_rowset_stale_sweep_by_size=true', + 'tablet_rowset_stale_sweep_threshold_size=0', + 'compaction_keep_invisible_version_timeout_sec=6000', + "compaction_keep_invisible_version_min_count=${compaction_keep_invisible_version_min_count}".toString(), + 'compaction_keep_invisible_version_max_count=500', + ] + options.enableDebugPoints() + + docker(options) { + def E_CUMULATIVE_NO_SUITABLE_VERSION = 'E-2000' + def E_FULL_MISS_VERSION = 'E-2009' + + sql 'SET GLOBAL insert_visible_timeout_ms = 3000' + + def tableName = 'test_compaction_with_visible_version' + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def triggerCompaction = { tablet, isCompactSucc, compaction_type -> + def tabletId = tablet.TabletId + def backendId = tablet.BackendId + def backendIp = backendId_to_backendIP.get(backendId) + def backendHttpPort = backendId_to_backendHttpPort.get(backendId) + def code + def out + def err + if (compaction_type == 'base') { + (code, out, err) = be_run_base_compaction(backendIp, backendHttpPort, tabletId) + } else { + (code, out, err) = be_run_cumulative_compaction(backendIp, backendHttpPort, tabletId) + } + logger.info("Run compaction: code=${code}, out=${out}, err=${err}") + assertEquals(0, code) + def compactJson = parseJson(out.trim()) + if (isCompactSucc) { + assertEquals('success', compactJson.status.toLowerCase()) + } else { + if (compaction_type == 'base') { + assertEquals(E_FULL_MISS_VERSION, compactJson.status) + } else { + assertEquals(E_CUMULATIVE_NO_SUITABLE_VERSION, compactJson.status) + } + } + } + + def waitCompaction = { tablet, startTs -> + def tabletId = tablet.TabletId + def backendId = tablet.BackendId + def backendIp = backendId_to_backendIP.get(backendId) + def backendHttpPort = backendId_to_backendHttpPort.get(backendId) + def running = true + while (running) { + assertTrue(System.currentTimeMillis() - startTs < 60 * 1000) + Thread.sleep(1000) + def (code, out, err) = be_get_compaction_status(backendIp, backendHttpPort, tabletId) + logger.info("Get compaction: code=${code}, out=${out}, err=${err}") + assertEquals(0, code) + + def compactionStatus = parseJson(out.trim()) + assertEquals('success', compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + } + + def checkCompact = { isCumuCompactSucc, runBaseCompact, isInvisibleTimeout, version, visibleVersion -> + def partition = sql_return_maparray("SHOW PARTITIONS FROM ${tableName}")[0] + assertEquals(visibleVersion, partition.VisibleVersion as long) + + // wait be report version count + Thread.sleep(3 * 1000) + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + def lastInvisibleVersionCountMap = [:] + def lastVisibleVersionCountMap = [:] + tablets.each { + lastVisibleVersionCountMap[it.BackendId] = it.VisibleVersionCount as long + lastInvisibleVersionCountMap[it.BackendId] = + (it.VersionCount as long) - (it.VisibleVersionCount as long) + triggerCompaction it, isCumuCompactSucc, 'cumulative' + } + + if (isCumuCompactSucc) { + // wait compaction done + def startTs = System.currentTimeMillis() + tablets.each { + waitCompaction it, startTs + } + } + + if (runBaseCompact) { + tablets.each { + triggerCompaction it, true, 'base' + } + + def startTs = System.currentTimeMillis() + tablets.each { + waitCompaction it, startTs + } + } + + // wait report + Thread.sleep(3 * 1000) + + tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + tablets.each { + def backendId = it.BackendId + def visibleVersionCount = it.VisibleVersionCount as long + def totalVersionCount = it.VersionCount as long + def invisibleVersionCount = totalVersionCount - visibleVersionCount + assertEquals(version, it.Version as long) + + if (isInvisibleTimeout) { + def values = [Math.min(version - visibleVersion, compaction_keep_invisible_version_min_count), + Math.min(version - visibleVersion, compaction_keep_invisible_version_min_count + 1)] + + // part of invisible version was compact + assertTrue(invisibleVersionCount in values, + "not match, invisibleVersionCount: ${invisibleVersionCount}, candidate values: ${values}") + } else { + // invisible version couldn't compact + assertEquals(version - visibleVersion, invisibleVersionCount) + } + + def lastVisibleVersionCount = lastVisibleVersionCountMap.get(backendId) + def lastInvisibleVersionCount = lastInvisibleVersionCountMap.get(backendId) + if (isCumuCompactSucc) { + assertTrue(lastInvisibleVersionCount > invisibleVersionCount || lastInvisibleVersionCount <= 1, + "not met with: lastInvisibleVersionCount ${lastInvisibleVersionCount} > " + + "invisibleVersionCount ${invisibleVersionCount}") + if (runBaseCompact) { + assertEquals(1L, visibleVersionCount) + } + } else { + assertEquals(lastVisibleVersionCount, visibleVersionCount) + } + } + } + + sql " CREATE TABLE ${tableName} (k1 int, k2 int) DISTRIBUTED BY HASH(k1) BUCKETS 1 " + + // normal + def rowNum = 0L + def insertNRecords = { num -> + // if enable debug point DatabaseTransactionMgr.stop_finish_transaction, + // insert will need to wait insert_visible_timeout_ms. + // so use multiple threads to reduce the wait time. + def futures = [] + for (def i = 0; i < num; i++, rowNum++) { + def index = rowNum + futures.add(thread { + sql " INSERT INTO ${tableName} VALUES (${index}, ${index * 10}) " + }) + } + futures.each { it.get() } + } + insertNRecords(21) + // after insert 21 rows, be can run compact ok. + checkCompact(true, false, false, rowNum + 1, rowNum + 1) + qt_select_1 "SELECT * FROM ${tableName} ORDER BY k1" + + // publish but not visible + def lastRowNum = rowNum + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + insertNRecords(21) + // after enable debugpoint, be will add rowsets, but visible version will not increase. + // then no rowsets can pick to compact. + // so expect compact failed. + checkCompact(false, false, false, rowNum + 1, lastRowNum + 1) + qt_select_2 "SELECT * FROM ${tableName} ORDER BY k1" + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + // after clear debug point, visible version will increase. + // then some rowsets can pick to compact. + // so expect compact succ. + checkCompact(true, true, false, rowNum + 1, rowNum + 1) + qt_select_3 "SELECT * FROM ${tableName} ORDER BY k1" + + lastRowNum = rowNum + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + insertNRecords(80) + // 80 versions are not invisible yet, BE will not compact them. + // if we send http to compact them, BE will reply no rowsets can compact now + checkCompact(false, false, false, rowNum + 1, lastRowNum + 1) + // Because BE not compact, so query should be ok. + qt_select_4 "SELECT * FROM ${tableName} ORDER BY k1" + + update_all_be_config('compaction_keep_invisible_version_timeout_sec', 1) + checkCompact(true, false, true, rowNum + 1, lastRowNum + 1) + qt_select_5 "SELECT * FROM ${tableName} ORDER BY k1" + + def getVersionCountMap = { -> + def versionCountMap = [:] + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + tablets.each { + versionCountMap.put(it.BackendId as long, [it.VisibleVersionCount as long, it.VersionCount as long]) + } + return versionCountMap + } + + // after backend restart, it should update its visible version from FE + // and then it report its visible version count and total version count + def oldVersionCountMap = getVersionCountMap() + cluster.restartBackends() + Thread.sleep(20000) + def newVersionCountMap = getVersionCountMap() + assertEquals(oldVersionCountMap, newVersionCountMap) + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + // after clear fe's debug point, the 80 version are visible now. + // so compact is ok + checkCompact(true, false, false, rowNum + 1, rowNum + 1) + qt_select_6 "SELECT * FROM ${tableName} ORDER BY k1" + + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + def compaction_keep_invisible_version_timeout_sec = 1 + compaction_keep_invisible_version_min_count = 0L + update_all_be_config('compaction_keep_invisible_version_timeout_sec', compaction_keep_invisible_version_timeout_sec) + update_all_be_config('compaction_keep_invisible_version_min_count', compaction_keep_invisible_version_min_count) + + lastRowNum = rowNum + insertNRecords(21) + + Thread.sleep((compaction_keep_invisible_version_timeout_sec + 1) * 1000) + + // after compaction_keep_invisible_version_timeout_sec, + // all version had been compact (compaction_keep_invisible_version_min_count=0), + checkCompact(true, false, true, rowNum + 1, lastRowNum + 1) + + // visible version had been compact + test { + sql "SELECT * FROM ${tableName} ORDER BY k1" + + // E-230: + //(1105, 'errCode = 2, detailMessage = (128.2.51.2)[CANCELLED]missed_versions is empty, spec_version 43, + // max_version 123, tablet_id 10062') + exception 'missed_versions is empty' + } + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + qt_select_7 "SELECT * FROM ${tableName} ORDER BY k1" + } +}