From 3cc604145549f153da29b38a05778a618ae59f1f Mon Sep 17 00:00:00 2001 From: yujun Date: Sun, 28 Apr 2024 14:46:21 +0800 Subject: [PATCH 1/2] fix error -230 --- be/src/agent/agent_server.cpp | 13 + be/src/agent/agent_server.h | 1 + be/src/agent/task_worker_pool.cpp | 13 + be/src/agent/task_worker_pool.h | 2 + be/src/common/config.cpp | 7 + be/src/common/config.h | 7 + be/src/olap/full_compaction.cpp | 2 +- be/src/olap/olap_common.h | 21 + be/src/olap/tablet.cpp | 74 ++- be/src/olap/tablet.h | 13 + be/src/olap/tablet_manager.cpp | 72 ++- be/src/olap/tablet_manager.h | 19 +- be/src/olap/tablet_meta.cpp | 10 + be/src/olap/tablet_meta.h | 1 + be/src/olap/task/engine_clone_task.cpp | 2 + .../java/org/apache/doris/common/Config.java | 6 +- .../org/apache/doris/alter/AlterHandler.java | 3 +- .../org/apache/doris/backup/RestoreJob.java | 3 +- .../java/org/apache/doris/catalog/Env.java | 8 +- .../apache/doris/catalog/MetadataViewer.java | 2 +- .../org/apache/doris/catalog/Replica.java | 91 ++-- .../java/org/apache/doris/catalog/Tablet.java | 22 +- .../doris/catalog/TabletInvertedIndex.java | 50 +- .../apache/doris/catalog/TabletStatMgr.java | 11 +- .../apache/doris/clone/TabletSchedCtx.java | 13 +- .../apache/doris/clone/TabletScheduler.java | 6 +- .../doris/common/proc/ReplicasProcNode.java | 5 +- .../common/proc/TabletHealthProcDir.java | 3 +- .../doris/common/proc/TabletsProcDir.java | 11 +- .../doris/datasource/InternalCatalog.java | 5 +- ...ector.java => PartitionInfoCollector.java} | 47 +- .../apache/doris/master/ReportHandler.java | 67 ++- .../org/apache/doris/system/Diagnoser.java | 5 +- .../org/apache/doris/task/AgentBatchTask.java | 10 + .../doris/task/UpdateVisibleVersionTask.java | 40 ++ .../transaction/DatabaseTransactionMgr.java | 24 +- .../transaction/GlobalTransactionMgr.java | 5 +- .../transaction/PublishVersionDaemon.java | 30 +- .../apache/doris/alter/RollupJobV2Test.java | 10 +- .../doris/alter/SchemaChangeJobV2Test.java | 6 +- .../doris/analysis/ShowReplicaTest.java | 3 +- .../org/apache/doris/catalog/ReplicaTest.java | 21 +- .../clone/DiskReblanceWhenSchedulerIdle.java | 3 +- .../doris/clone/RebalancerTestUtil.java | 5 +- .../apache/doris/clone/RepairVersionTest.java | 8 +- .../doris/clone/TabletReplicaTooSlowTest.java | 4 +- .../doris/clone/TabletSchedCtxTest.java | 16 +- .../apache/doris/planner/QueryPlanTest.java | 12 +- .../DatabaseTransactionMgrTest.java | 6 +- .../transaction/GlobalTransactionMgrTest.java | 29 +- gensrc/thrift/AgentService.thrift | 5 + gensrc/thrift/BackendService.thrift | 5 +- gensrc/thrift/MasterService.thrift | 4 +- gensrc/thrift/Types.thrift | 3 +- .../test_compaction_with_visible_version.out | 448 ++++++++++++++++++ .../regression/suite/SuiteCluster.groovy | 3 + ...est_compaction_with_visible_version.groovy | 275 +++++++++++ 57 files changed, 1349 insertions(+), 241 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/master/{PartitionInMemoryInfoCollector.java => PartitionInfoCollector.java} (68%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java create mode 100644 regression-test/data/compaction/test_compaction_with_visible_version.out create mode 100644 regression-test/suites/compaction/test_compaction_with_visible_version.groovy diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index e7217cdcff0bf5..5355c037b19d0e 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -163,6 +163,10 @@ void AgentServer::start_workers(ExecEnv* exec_env) { _clean_trash_workers = std::make_unique( "CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); }); + + _update_visible_version_workers = std::make_unique( + "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); }); + // clang-format on } @@ -278,6 +282,15 @@ void AgentServer::submit_tasks(TAgentResult& agent_result, "task(signature={}) has wrong request member = clean_trash_req", signature); } break; + case TTaskType::UPDATE_VISIBLE_VERSION: + if (task.__isset.visible_version_req) { + _update_visible_version_workers->submit_task(task); + } else { + ret_st = Status::InvalidArgument( + "task(signature={}) has wrong request member = visible_version_req", + signature); + } + break; default: ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type", signature, task_type); diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index b789bbe98de52d..9f3d91d5621261 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -97,6 +97,7 @@ class AgentServer { std::unique_ptr _topic_subscriber; std::unique_ptr _gc_binlog_workers; std::unique_ptr _clean_trash_workers; + std::unique_ptr _update_visible_version_workers; }; } // end namespace doris diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 686de72e1b9978..c9d222114e0727 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -355,6 +355,7 @@ bvar::Adder ALTER_count("task", "ALTER_TABLE"); bvar::Adder CLONE_count("task", "CLONE"); bvar::Adder STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE"); bvar::Adder GC_BINLOG_count("task", "GC_BINLOG"); +bvar::Adder UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION"); void add_task_count(const TAgentTaskRequest& task, int n) { // clang-format off @@ -382,6 +383,7 @@ void add_task_count(const TAgentTaskRequest& task, int n) { ADD_TASK_COUNT(CLONE) ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE) ADD_TASK_COUNT(GC_BINLOG) + ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION) #undef ADD_TASK_COUNT case TTaskType::REALTIME_PUSH: case TTaskType::PUSH: @@ -970,6 +972,11 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); return; } + + std::map partitions_version; + engine.tablet_manager()->get_partitions_visible_version(&partitions_version); + request.__set_partitions_version(std::move(partitions_version)); + int64_t max_compaction_score = std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), DorisMetrics::instance()->tablet_base_max_compaction_score->value()); @@ -1699,6 +1706,12 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) { engine.gc_binlogs(gc_tablet_infos); } +void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) { + const TVisibleVersionReq& visible_version_req = req.visible_version_req; + engine.tablet_manager()->update_partitions_visible_version( + visible_version_req.partition_version); +} + void clone_callback(StorageEngine& engine, const TMasterInfo& master_info, const TAgentTaskRequest& req) { const auto& clone_req = req.clone_req; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index f95a866a57aa56..14d9ff32686a6b 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -163,6 +163,8 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req); void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req); +void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req); + void report_task_callback(const TMasterInfo& master_info); void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ba173b0d03feda..493ad699aac33e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -430,6 +430,13 @@ DEFINE_Validator(compaction_task_num_per_fast_disk, // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. DEFINE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9"); +// Not compact the invisible versions, but with some limitations: +// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions; +// if timeout, keep no more than compaction_keep_invisible_version_min_count versions. +DEFINE_mInt32(compaction_keep_invisible_version_timeout_sec, "1800"); +DEFINE_mInt32(compaction_keep_invisible_version_min_count, "50"); +DEFINE_mInt32(compaction_keep_invisible_version_max_count, "500"); + // Threshold to logging compaction trace, in seconds. DEFINE_mInt32(base_compaction_trace_threshold, "60"); DEFINE_mInt32(cumulative_compaction_trace_threshold, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 5c60ffae258c43..177bb03e02b25f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -477,6 +477,13 @@ DECLARE_mInt32(compaction_task_num_per_fast_disk); // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. DECLARE_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round); +// Not compact the invisible versions, but with some limitations: +// if not timeout, keep no more than compaction_keep_invisible_version_max_count versions; +// if timeout, keep no more than compaction_keep_invisible_version_min_count versions. +DECLARE_mInt32(compaction_keep_invisible_version_timeout_sec); +DECLARE_mInt32(compaction_keep_invisible_version_min_count); +DECLARE_mInt32(compaction_keep_invisible_version_max_count); + // Threshold to logging compaction trace, in seconds. DECLARE_mInt32(base_compaction_trace_threshold); DECLARE_mInt32(cumulative_compaction_trace_threshold); diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index ba45eacb2c4689..c11b87c06ad8c8 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -136,7 +136,7 @@ Status FullCompaction::_check_all_version(const std::vector& ro "Full compaction rowsets' versions not equal to all exist rowsets' versions. " "full compaction rowsets max version={}-{}" ", current rowsets max version={}-{}" - "full compaction rowsets min version={}-{}, current rowsets min version=0-1", + ", full compaction rowsets min version={}-{}, current rowsets min version=0-1", last_rowset->start_version(), last_rowset->end_version(), _tablet->max_version().first, _tablet->max_version().second, first_rowset->start_version(), first_rowset->end_version()); diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 80bfab6f4b93d6..c1a2e3c18b5b66 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -37,6 +38,7 @@ #include "olap/olap_define.h" #include "olap/rowset/rowset_fwd.h" #include "util/hash_util.hpp" +#include "util/time.h" #include "util/uid_util.h" namespace doris { @@ -517,6 +519,25 @@ struct RidAndPos { using PartialUpdateReadPlan = std::map>>; +// used for controll compaction +struct VersionWithTime { + std::atomic version; + int64_t update_ts; + + VersionWithTime() : version(0), update_ts(MonotonicMillis()) {} + + void update_version_monoto(int64_t new_version) { + int64_t cur_version = version.load(std::memory_order_relaxed); + while (cur_version < new_version) { + if (version.compare_exchange_strong(cur_version, new_version, std::memory_order_relaxed, + std::memory_order_relaxed)) { + update_ts = MonotonicMillis(); + break; + } + } + } +}; + } // namespace doris // This intended to be a "good" hash function. It may change from time to time. diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 27b1f94530d140..4cf8fae0ded24f 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1349,25 +1349,41 @@ std::vector Tablet::pick_candidate_rowsets_to_cumulative_compac if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) { return candidate_rowsets; } - { - std::shared_lock rlock(_meta_lock); - for (const auto& [version, rs] : _rs_version_map) { - if (version.first >= _cumulative_point && rs->is_local()) { - candidate_rowsets.push_back(rs); - } - } - } - std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); - return candidate_rowsets; + return _pick_visible_rowsets_to_compaction(_cumulative_point, + std::numeric_limits::max()); } std::vector Tablet::pick_candidate_rowsets_to_base_compaction() { + return _pick_visible_rowsets_to_compaction(std::numeric_limits::min(), + _cumulative_point - 1); +} + +std::vector Tablet::_pick_visible_rowsets_to_compaction( + int64_t min_start_version, int64_t max_start_version) { + auto [visible_version, update_ts] = get_visible_version_and_time(); + bool update_time_long = MonotonicMillis() - update_ts > + config::compaction_keep_invisible_version_timeout_sec * 1000L; + int32_t keep_invisible_version_limit = + update_time_long ? config::compaction_keep_invisible_version_min_count + : config::compaction_keep_invisible_version_max_count; + std::vector candidate_rowsets; { std::shared_lock rlock(_meta_lock); for (const auto& [version, rs] : _rs_version_map) { - // Do compaction on local rowsets only. - if (version.first < _cumulative_point && rs->is_local()) { + int64_t version_start = version.first; + // rowset is remote or rowset is not in given range + if (!rs->is_local() || version_start < min_start_version || + version_start > max_start_version) { + continue; + } + + // can compact, met one of the conditions: + // 1. had been visible; + // 2. exceeds the limit of keep invisible versions. + int64_t version_end = version.second; + if (version_end <= visible_version || + version_end > visible_version + keep_invisible_version_limit) { candidate_rowsets.push_back(rs); } } @@ -1390,13 +1406,8 @@ std::vector Tablet::pick_candidate_rowsets_to_full_compaction() std::vector Tablet::pick_first_consecutive_empty_rowsets(int limit) { std::vector consecutive_empty_rowsets; - std::vector candidate_rowsets; - traverse_rowsets([&candidate_rowsets, this](const auto& rs) { - if (rs->is_local() && rs->start_version() >= _cumulative_point) { - candidate_rowsets.emplace_back(rs); - } - }); - std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); + std::vector candidate_rowsets = + pick_candidate_rowsets_to_cumulative_compaction(); int len = candidate_rowsets.size(); for (int i = 0; i < len - 1; ++i) { auto rowset = candidate_rowsets[i]; @@ -1475,6 +1486,19 @@ std::string Tablet::_get_rowset_info_str(RowsetSharedPtr rowset, bool delete_fla rowset->rowset_id().to_string(), disk_size); } +std::tuple Tablet::get_visible_version_and_time() const { + // some old tablet has bug, its partition_id is 0, fe couldn't update its visible version. + // so let this tablet's visible version become int64 max. + auto version_info = std::atomic_load_explicit(&_visible_version, std::memory_order_relaxed); + if (version_info != nullptr && partition_id() != 0) { + return std::make_tuple(version_info->version.load(std::memory_order_relaxed), + version_info->update_ts); + } else { + return std::make_tuple(std::numeric_limits::max(), + std::numeric_limits::max()); + } +} + // For http compaction action void Tablet::get_compaction_status(std::string* json_result) { rapidjson::Document root; @@ -1837,13 +1861,23 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, } }); + int64_t total_version_count = _tablet_meta->version_count(); + + // For compatibility. + // For old fe, it wouldn't send visible version request to be, then be's visible version is always 0. + // Let visible_version_count set to total_version_count in be's report. + int64_t visible_version_count = total_version_count; + if (auto [visible_version, _] = get_visible_version_and_time(); visible_version > 0) { + visible_version_count = _tablet_meta->version_count_cross_with_range({0, visible_version}); + } // the report version is the largest continuous version, same logic as in FE side tablet_info->__set_version(cversion.second); // Useless but it is a required filed in TTabletInfo tablet_info->__set_version_hash(0); tablet_info->__set_partition_id(_tablet_meta->partition_id()); tablet_info->__set_storage_medium(_data_dir->storage_medium()); - tablet_info->__set_version_count(_tablet_meta->version_count()); + tablet_info->__set_total_version_count(total_version_count); + tablet_info->__set_visible_version_count(visible_version_count); tablet_info->__set_path_hash(_data_dir->path_hash()); tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory()); tablet_info->__set_replica_id(replica_id()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 46bd6802495ab8..4de6a05d74532c 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -347,6 +347,12 @@ class Tablet final : public BaseTablet { bool should_fetch_from_peer(); + std::tuple get_visible_version_and_time() const; + + void set_visible_version(const std::shared_ptr& visible_version) { + std::atomic_store_explicit(&_visible_version, visible_version, std::memory_order_relaxed); + } + inline bool all_beta() const { std::shared_lock rdlock(_meta_lock); return _tablet_meta->all_beta(); @@ -625,6 +631,10 @@ class Tablet final : public BaseTablet { // When the proportion of empty edges in the adjacency matrix used to represent the version graph // in the version tracker is greater than the threshold, rebuild the version tracker bool _reconstruct_version_tracker_if_necessary(); + + std::vector _pick_visible_rowsets_to_compaction(int64_t min_start_version, + int64_t max_start_version); + void _init_context_common_fields(RowsetWriterContext& context); void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre, @@ -740,6 +750,9 @@ class Tablet final : public BaseTablet { int64_t _io_error_times = 0; + // partition's visible version. it sync from fe, but not real-time. + std::shared_ptr _visible_version; + std::atomic_bool _is_full_compaction_running = false; }; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 196b6dc954310a..43cbba990876b1 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -1024,13 +1024,14 @@ void TabletManager::build_all_report_tablets_info(std::map* tablet_info.__set_transaction_ids(find->second); expire_txn_map.erase(find); } - tablet_version_num_hist.add(tablet->version_count()); + tablet_version_num_hist.add(tablet_info.total_version_count); auto& t_tablet_stat = local_cache->emplace_back(); t_tablet_stat.__set_tablet_id(tablet_info.tablet_id); t_tablet_stat.__set_data_size(tablet_info.data_size); t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size); - t_tablet_stat.__set_row_num(tablet_info.row_count); - t_tablet_stat.__set_version_count(tablet_info.version_count); + t_tablet_stat.__set_row_count(tablet_info.row_count); + t_tablet_stat.__set_total_version_count(tablet_info.total_version_count); + t_tablet_stat.__set_visible_version_count(tablet_info.visible_version_count); }; for_each_tablet(handler, filter_all_tablets); @@ -1265,9 +1266,29 @@ void TabletManager::update_root_path_info(std::map* path_ma void TabletManager::get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos) { - std::shared_lock rdlock(_partition_tablet_map_lock); - if (_partition_tablet_map.find(partition_id) != _partition_tablet_map.end()) { - *tablet_infos = _partition_tablet_map[partition_id]; + std::shared_lock rdlock(_partitions_lock); + auto it = _partitions.find(partition_id); + if (it != _partitions.end()) { + *tablet_infos = it->second.tablets; + } +} + +void TabletManager::get_partitions_visible_version(std::map* partitions_version) { + std::shared_lock rdlock(_partitions_lock); + for (const auto& [partition_id, partition] : _partitions) { + partitions_version->insert( + {partition_id, partition.visible_version->version.load(std::memory_order_relaxed)}); + } +} + +void TabletManager::update_partitions_visible_version( + const std::map& partitions_version) { + std::shared_lock rdlock(_partitions_lock); + for (auto [partition_id, version] : partitions_version) { + auto it = _partitions.find(partition_id); + if (it != _partitions.end()) { + it->second.visible_version->update_version_monoto(version); + } } } @@ -1356,15 +1377,25 @@ TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id) { } void TabletManager::_add_tablet_to_partition(const TabletSharedPtr& tablet) { - std::lock_guard wrlock(_partition_tablet_map_lock); - _partition_tablet_map[tablet->partition_id()].insert(tablet->get_tablet_info()); + std::lock_guard wrlock(_partitions_lock); + auto& partition = _partitions[tablet->partition_id()]; + partition.tablets.insert(tablet->get_tablet_info()); + tablet->set_visible_version( + std::static_pointer_cast(partition.visible_version)); } void TabletManager::_remove_tablet_from_partition(const TabletSharedPtr& tablet) { - std::lock_guard wrlock(_partition_tablet_map_lock); - _partition_tablet_map[tablet->partition_id()].erase(tablet->get_tablet_info()); - if (_partition_tablet_map[tablet->partition_id()].empty()) { - _partition_tablet_map.erase(tablet->partition_id()); + tablet->set_visible_version(nullptr); + std::lock_guard wrlock(_partitions_lock); + auto it = _partitions.find(tablet->partition_id()); + if (it == _partitions.end()) { + return; + } + + auto& tablets = it->second.tablets; + tablets.erase(tablet->get_tablet_info()); + if (tablets.empty()) { + _partitions.erase(it); } } @@ -1401,22 +1432,23 @@ void TabletManager::get_tablets_distribution_on_different_disks( std::map>& tablets_num_on_disk, std::map>>& tablets_info_on_disk) { std::vector data_dirs = _engine.get_stores(); - std::map> partition_tablet_map; + std::map partitions; { - // When drop tablet, '_partition_tablet_map_lock' is locked in 'tablet_shard_lock'. - // To avoid locking 'tablet_shard_lock' in '_partition_tablet_map_lock', we lock and - // copy _partition_tablet_map here. - std::shared_lock rdlock(_partition_tablet_map_lock); - partition_tablet_map = _partition_tablet_map; + // When drop tablet, '_partitions_lock' is locked in 'tablet_shard_lock'. + // To avoid locking 'tablet_shard_lock' in '_partitions_lock', we lock and + // copy _partitions here. + std::shared_lock rdlock(_partitions_lock); + partitions = _partitions; } - for (auto& [partition_id, tablet_infos] : partition_tablet_map) { + + for (const auto& [partition_id, partition] : partitions) { std::map tablets_num; std::map> tablets_info; for (auto* data_dir : data_dirs) { tablets_num[data_dir] = 0; } - for (const auto& tablet_info : tablet_infos) { + for (const auto& tablet_info : partition.tablets) { // get_tablet() will hold 'tablet_shard_lock' TabletSharedPtr tablet = get_tablet(tablet_info.tablet_id); if (tablet == nullptr) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 9f8164b853f44d..b090277677b5fb 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -144,6 +144,10 @@ class TabletManager { void get_partition_related_tablets(int64_t partition_id, std::set* tablet_infos); + void get_partitions_visible_version(std::map* partitions_version); + + void update_partitions_visible_version(const std::map& partitions_version); + void do_tablet_meta_checkpoint(DataDir* data_dir); void obtain_specific_quantity_tablets(std::vector& tablets_info, int64_t num); @@ -229,22 +233,27 @@ class TabletManager { std::set tablets_under_clone; }; + struct Partition { + std::set tablets; + std::shared_ptr visible_version {new VersionWithTime}; + }; + StorageEngine& _engine; // TODO: memory size of TabletSchema cannot be accurately tracked. - // trace the memory use by meta of tablet std::shared_ptr _tablet_meta_mem_tracker; const int32_t _tablets_shards_size; const int32_t _tablets_shards_mask; std::vector _tablets_shards; - // Protect _partition_tablet_map, should not be obtained before _tablet_map_lock to avoid dead lock - std::shared_mutex _partition_tablet_map_lock; + // Protect _partitions, should not be obtained before _tablet_map_lock to avoid dead lock + std::shared_mutex _partitions_lock; + // partition_id => partition + std::map _partitions; + // Protect _shutdown_tablets, should not be obtained before _tablet_map_lock to avoid dead lock std::shared_mutex _shutdown_tablets_lock; - // partition_id => tablet_info - std::map> _partition_tablet_map; // the delete tablets. notice only allow function `start_trash_sweep` can erase tablets in _shutdown_tablets std::list _shutdown_tablets; std::mutex _gc_tablets_lock; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 4414f7187cd839..72bf0f0ee39ad2 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -745,6 +745,16 @@ Version TabletMeta::max_version() const { return max_version; } +size_t TabletMeta::version_count_cross_with_range(const Version& range) const { + size_t count = 0; + for (const auto& rs_meta : _rs_metas) { + if (!(range.first > rs_meta->version().second || range.second < rs_meta->version().first)) { + count++; + } + } + return count; +} + Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& rs_meta) { // check RowsetMeta is valid for (auto& rs : _rs_metas) { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 460851bc772fa6..ae038fa2c9203d 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -166,6 +166,7 @@ class TabletMeta { // Remote disk space occupied by tablet. size_t tablet_remote_size() const; size_t version_count() const; + size_t version_count_cross_with_range(const Version& range) const; Version max_version() const; TabletState tablet_state() const; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 81c9973fcf234b..ae5fc4c18dc448 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -161,6 +161,8 @@ Status EngineCloneTask::execute() { } Status st = _do_clone(); StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); + StorageEngine::instance()->tablet_manager()->update_partitions_visible_version( + {{_clone_req.partition_id, _clone_req.version}}); return st; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 6adf03c56cd4df..e1b29b7f84975e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1508,11 +1508,11 @@ public class Config extends ConfigBase { public static int default_max_query_instances = -1; /* - * One master daemon thread will update global partition in memory - * info every partition_in_memory_update_interval_secs + * One master daemon thread will update global partition info, include in memory and visible version + * info every partition_info_update_interval_secs */ @ConfField(mutable = false, masterOnly = true) - public static int partition_in_memory_update_interval_secs = 300; + public static int partition_info_update_interval_secs = 60; @ConfField(masterOnly = true) public static boolean enable_concurrent_update = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 030fd17452df08..4231b11c98763a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -241,8 +241,7 @@ public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundExce task.getSignature(), replica, task.getVersion()); boolean versionChanged = false; if (replica.getVersion() < task.getVersion()) { - replica.updateVersionInfo(task.getVersion(), replica.getDataSize(), replica.getRemoteDataSize(), - replica.getRowCount()); + replica.updateVersion(task.getVersion()); versionChanged = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 12c42aee737032..63459e5da53d75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1785,8 +1785,7 @@ private Status allTabletCommitted(boolean isReplay) { for (Tablet tablet : idx.getTablets()) { for (Replica replica : tablet.getReplicas()) { if (!replica.checkVersionCatchUp(part.getVisibleVersion(), false)) { - replica.updateVersionInfo(part.getVisibleVersion(), replica.getDataSize(), - replica.getRemoteDataSize(), replica.getRowCount()); + replica.updateVersion(part.getVisibleVersion()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 09cb46fdf27408..380eb361c0027a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -178,7 +178,7 @@ import org.apache.doris.load.sync.SyncJobManager; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; -import org.apache.doris.master.PartitionInMemoryInfoCollector; +import org.apache.doris.master.PartitionInfoCollector; import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mtmv.MTMVAlterOpType; @@ -370,7 +370,7 @@ public class Env { private PublishVersionDaemon publishVersionDaemon; private DeleteHandler deleteHandler; private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector; - private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector; + private PartitionInfoCollector partitionInfoCollector; private CooldownConfHandler cooldownConfHandler; private ExternalMetaIdMgr externalMetaIdMgr; private MetastoreEventsProcessor metastoreEventsProcessor; @@ -665,7 +665,7 @@ public Env(boolean isCheckpointCatalog) { this.publishVersionDaemon = new PublishVersionDaemon(); this.deleteHandler = new DeleteHandler(); this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector(); - this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector(); + this.partitionInfoCollector = new PartitionInfoCollector(); if (Config.enable_storage_policy) { this.cooldownConfHandler = new CooldownConfHandler(); } @@ -1691,7 +1691,7 @@ private void startMasterOnlyDaemonThreads() { // start daemon thread to update db used data quota for db txn manager periodically dbUsedDataQuotaInfoCollector.start(); // start daemon thread to update global partition in memory information periodically - partitionInMemoryInfoCollector.start(); + partitionInfoCollector.start(); if (Config.enable_storage_policy) { cooldownConfHandler.start(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 333b7b146acf27..1f1e2599d9c064 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -110,7 +110,7 @@ private static List> getTabletStatus(String dbName, String tblName, row.add(String.valueOf(replica.getLastSuccessVersion())); row.add(String.valueOf(visibleVersion)); row.add(String.valueOf(replica.getSchemaHash())); - row.add(String.valueOf(replica.getVersionCount())); + row.add(String.valueOf(replica.getTotalVersionCount())); row.add(String.valueOf(replica.isBad())); row.add(String.valueOf(replica.isUserDrop())); row.add(replica.getState().name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index ae8f4f37eed3b3..a515bbd2f4dbf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -22,6 +22,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -109,7 +110,8 @@ public enum ReplicaStatus { @SerializedName(value = "lastSuccessVersionHash") private long lastSuccessVersionHash = 0L; - private volatile long versionCount = -1; + private volatile long totalVersionCount = -1; + private volatile long visibleVersionCount = -1; private long pathHash = -1; @@ -233,14 +235,42 @@ public long getDataSize() { return dataSize; } + public void setDataSize(long dataSize) { + this.dataSize = dataSize; + } + public long getRemoteDataSize() { return remoteDataSize; } + public void setRemoteDataSize(long remoteDataSize) { + this.remoteDataSize = remoteDataSize; + } + public long getRowCount() { return rowCount; } + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } + + public long getSegmentCount() { + return segmentCount; + } + + public void setSegmentCount(long segmentCount) { + this.segmentCount = segmentCount; + } + + public long getRowsetCount() { + return rowsetCount; + } + + public void setRowsetCount(long rowsetCount) { + this.rowsetCount = rowsetCount; + } + public long getLastFailedVersion() { return lastFailedVersion; } @@ -321,28 +351,24 @@ public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) { this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd; } - // for compatibility - public synchronized void updateStat(long dataSize, long rowNum) { - this.dataSize = dataSize; - this.rowCount = rowNum; - } - - public synchronized void updateStat(long dataSize, long remoteDataSize, long rowNum, long versionCount) { - this.dataSize = dataSize; - this.remoteDataSize = remoteDataSize; - this.rowCount = rowNum; - this.versionCount = versionCount; + public void updateWithReport(TTabletInfo backendReplica) { + updateVersion(backendReplica.getVersion()); + setDataSize(backendReplica.getDataSize()); + setRemoteDataSize(backendReplica.getRemoteDataSize()); + setRowCount(backendReplica.getRowCount()); + setTotalVersionCount(backendReplica.getTotalVersionCount()); + setVisibleVersionCount( + backendReplica.isSetVisibleVersionCount() ? backendReplica.getVisibleVersionCount() + : backendReplica.getTotalVersionCount()); } - public synchronized void updateVersionInfo(long newVersion, long newDataSize, long newRemoteDataSize, - long newRowCount) { - updateReplicaInfo(newVersion, this.lastFailedVersion, this.lastSuccessVersion, newDataSize, newRemoteDataSize, - newRowCount); + public synchronized void updateVersion(long newVersion) { + updateReplicaVersion(newVersion, this.lastFailedVersion, this.lastSuccessVersion); } - public synchronized void updateVersionWithFailedInfo( + public synchronized void updateVersionWithFailed( long newVersion, long lastFailedVersion, long lastSuccessVersion) { - updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(newVersion, lastFailedVersion, lastSuccessVersion); } public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion, @@ -405,9 +431,7 @@ public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVer * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. * We just reset the LFV(hash) to recovery this replica. */ - private void updateReplicaInfo(long newVersion, - long lastFailedVersion, long lastSuccessVersion, - long newDataSize, long newRemoteDataSize, long newRowCount) { + private void updateReplicaVersion(long newVersion, long lastFailedVersion, long lastSuccessVersion) { if (LOG.isDebugEnabled()) { LOG.debug("before update: {}", this.toString()); } @@ -432,9 +456,6 @@ private void updateReplicaInfo(long newVersion, long oldLastFailedVersion = this.lastFailedVersion; this.version = newVersion; - this.dataSize = newDataSize; - this.remoteDataSize = newRemoteDataSize; - this.rowCount = newRowCount; // just check it if (lastSuccessVersion <= this.version) { @@ -497,7 +518,7 @@ private void updateReplicaInfo(long newVersion, } public synchronized void updateLastFailedVersion(long lastFailedVersion) { - updateReplicaInfo(this.version, lastFailedVersion, this.lastSuccessVersion, dataSize, remoteDataSize, rowCount); + updateReplicaVersion(this.version, lastFailedVersion, this.lastSuccessVersion); } /* @@ -542,16 +563,28 @@ public boolean tooSlow() { return state == ReplicaState.COMPACTION_TOO_SLOW; } + public boolean tooBigVersionCount() { + return visibleVersionCount >= Config.min_version_count_indicate_replica_compaction_too_slow; + } + public boolean isNormal() { return state == ReplicaState.NORMAL; } - public long getVersionCount() { - return versionCount; + public long getTotalVersionCount() { + return totalVersionCount; + } + + public void setTotalVersionCount(long totalVersionCount) { + this.totalVersionCount = totalVersionCount; + } + + public long getVisibleVersionCount() { + return visibleVersionCount; } - public void setVersionCount(long versionCount) { - this.versionCount = versionCount; + public void setVisibleVersionCount(long visibleVersionCount) { + this.visibleVersionCount = visibleVersionCount; } public boolean checkVersionRegressive(long newVersion) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 708334bb353c43..7a18da60ad4a18 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -269,16 +269,16 @@ public List getQueryableReplicas(long visibleVersion, boolean allowFail } if (Config.skip_compaction_slower_replica && allQueryableReplica.size() > 1) { - long minVersionCount = Long.MAX_VALUE; - for (Replica replica : allQueryableReplica) { - if (replica.getVersionCount() != -1 && replica.getVersionCount() < minVersionCount) { - minVersionCount = replica.getVersionCount(); - } + long minVersionCount = allQueryableReplica.stream().mapToLong(Replica::getVisibleVersionCount) + .filter(count -> count != -1).min().orElse(Long.MAX_VALUE); + long maxVersionCount = Config.min_version_count_indicate_replica_compaction_too_slow; + if (minVersionCount != Long.MAX_VALUE) { + maxVersionCount = Math.max(maxVersionCount, minVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT); } - final long finalMinVersionCount = minVersionCount; - return allQueryableReplica.stream().filter(replica -> replica.getVersionCount() == -1 - || replica.getVersionCount() < Config.min_version_count_indicate_replica_compaction_too_slow - || replica.getVersionCount() < finalMinVersionCount * QUERYABLE_TIMES_OF_MIN_VERSION_COUNT) + + final long finalMaxVersionCount = maxVersionCount; + return allQueryableReplica.stream() + .filter(replica -> replica.getVisibleVersionCount() < finalMaxVersionCount) .collect(Collectors.toList()); } return allQueryableReplica; @@ -508,7 +508,7 @@ public Pair getHealthStatusWithPriority(S if (versionCompleted) { stable++; - versions.add(replica.getVersionCount()); + versions.add(replica.getVisibleVersionCount()); allocNum = stableVersionCompleteAllocMap.getOrDefault(backend.getLocationTag(), (short) 0); stableVersionCompleteAllocMap.put(backend.getLocationTag(), (short) (allocNum + 1)); @@ -599,7 +599,7 @@ public Pair getHealthStatusWithPriority(S // get the max version diff long delta = versions.get(versions.size() - 1) - versions.get(0); double ratio = (double) delta / versions.get(versions.size() - 1); - if (versions.get(versions.size() - 1) > Config.min_version_count_indicate_replica_compaction_too_slow + if (versions.get(versions.size() - 1) >= Config.min_version_count_indicate_replica_compaction_too_slow && ratio > Config.valid_version_count_delta_ratio_between_replicas) { return Pair.of(TabletStatus.REPLICA_COMPACTION_TOO_SLOW, Priority.HIGH); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 5fd7ae721f2063..c46d2a9b99f8b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -22,6 +22,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.cooldown.CooldownConf; +import org.apache.doris.master.PartitionInfoCollector.PartitionCollectInfo; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TStorageMedium; @@ -37,7 +38,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -96,7 +97,10 @@ public class TabletInvertedIndex { // backend id -> (tablet id -> replica) private Table backingReplicaMetaTable = HashBasedTable.create(); - private volatile ImmutableSet partitionIdInMemorySet = ImmutableSet.of(); + // partition id -> partition info. + // notice partition info update every Config.partition_info_update_interval_secs seconds, + // so it may be stale. + private volatile ImmutableMap partitionCollectInfoMap = ImmutableMap.of(); private ForkJoinPool taskPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); @@ -120,11 +124,13 @@ private void writeUnlock(long stamp) { } public void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, final HashMap storageMediumMap, ListMultimap tabletSyncMap, ListMultimap tabletDeleteFromMeta, Set tabletFoundInMeta, ListMultimap tabletMigrationMap, + Map partitionVersionSyncMap, Map> transactionsToPublish, ListMultimap transactionsToClear, ListMultimap tabletRecoveryMap, @@ -132,6 +138,7 @@ public void tabletReport(long backendId, Map backendTablets, List cooldownConfToPush, List cooldownConfToUpdate) { List> cooldownTablets = new ArrayList<>(); + long feTabletNum = 0; long stamp = readLock(); long start = System.currentTimeMillis(); try { @@ -140,6 +147,7 @@ public void tabletReport(long backendId, Map backendTablets, } Map replicaMetaWithBackend = backingReplicaMetaTable.row(backendId); if (replicaMetaWithBackend != null) { + feTabletNum = replicaMetaWithBackend.size(); taskPool.submit(() -> { // traverse replicas in meta with this backend replicaMetaWithBackend.entrySet().parallelStream().forEach(entry -> { @@ -160,11 +168,13 @@ public void tabletReport(long backendId, Map backendTablets, tabletMetaInfo = new TTabletMetaInfo(); tabletMetaInfo.setReplicaId(replica.getId()); } - if (partitionIdInMemorySet.contains( - backendTabletInfo.getPartitionId()) != backendTabletInfo.isIsInMemory()) { + PartitionCollectInfo partitionCollectInfo = + partitionCollectInfoMap.get(backendTabletInfo.getPartitionId()); + boolean isInMemory = partitionCollectInfo != null && partitionCollectInfo.isInMemory(); + if (isInMemory != backendTabletInfo.isIsInMemory()) { if (tabletMetaInfo == null) { tabletMetaInfo = new TTabletMetaInfo(); - tabletMetaInfo.setIsInMemory(!backendTabletInfo.isIsInMemory()); + tabletMetaInfo.setIsInMemory(isInMemory); } } if (Config.fix_tablet_partition_id_eq_0 @@ -323,8 +333,11 @@ && isLocal(tabletMeta.getStorageMedium())) { // update replicase's version count // no need to write log, and no need to get db lock. - if (backendTabletInfo.isSetVersionCount()) { - replica.setVersionCount(backendTabletInfo.getVersionCount()); + if (backendTabletInfo.isSetTotalVersionCount()) { + replica.setTotalVersionCount(backendTabletInfo.getTotalVersionCount()); + replica.setVisibleVersionCount(backendTabletInfo.isSetVisibleVersionCount() + ? backendTabletInfo.getVisibleVersionCount() + : backendTabletInfo.getTotalVersionCount()); } if (tabletMetaInfo != null) { tabletMetaInfo.setTabletId(tabletId); @@ -343,6 +356,15 @@ && isLocal(tabletMeta.getStorageMedium())) { } } }); + + backendPartitionsVersion.entrySet().parallelStream().forEach(entry -> { + long partitionId = entry.getKey(); + long backendVersion = entry.getValue(); + PartitionCollectInfo partitionInfo = partitionCollectInfoMap.get(partitionId); + if (partitionInfo != null && partitionInfo.getVisibleVersion() > backendVersion) { + partitionVersionSyncMap.put(partitionId, partitionInfo.getVisibleVersion()); + } + }); }).join(); } } finally { @@ -351,11 +373,13 @@ && isLocal(tabletMeta.getStorageMedium())) { cooldownTablets.forEach(p -> handleCooldownConf(p.first, p.second, cooldownConfToPush, cooldownConfToUpdate)); long end = System.currentTimeMillis(); - LOG.info("finished to do tablet diff with backend[{}]. sync: {}." - + " metaDel: {}. foundInMeta: {}. migration: {}. " - + "found invalid transactions {}. found republish transactions {}. tabletToUpdate: {}." - + " need recovery: {}. cost: {} ms", backendId, tabletSyncMap.size(), + LOG.info("finished to do tablet diff with backend[{}]. fe tablet num: {}, backend tablet num: {}. sync: {}." + + " metaDel: {}. foundInMeta: {}. migration: {}. backend partition num: {}, backend need " + + "update: {}. found invalid transactions {}. found republish " + + "transactions {}. tabletToUpdate: {}. need recovery: {}. cost: {} ms", + backendId, feTabletNum, backendTablets.size(), tabletSyncMap.size(), tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(), + backendPartitionsVersion.size(), partitionVersionSyncMap.size(), transactionsToClear.size(), transactionsToPublish.size(), tabletToUpdate.size(), tabletRecoveryMap.size(), (end - start)); } @@ -757,8 +781,8 @@ public void clear() { } } - public void setPartitionIdInMemorySet(ImmutableSet partitionIdInMemorySet) { - this.partitionIdInMemorySet = partitionIdInMemorySet; + public void setPartitionCollectInfoMap(ImmutableMap partitionCollectInfoMap) { + this.partitionCollectInfoMap = partitionCollectInfoMap; } public Map getReplicaToTabletMap() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index e2b65a95f6ea63..896ecac6f8ef05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -151,8 +151,12 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); if (replica != null) { - replica.updateStat(stat.getDataSize(), stat.getRemoteDataSize(), stat.getRowNum(), - stat.getVersionCount()); + replica.setDataSize(stat.getDataSize()); + replica.setRemoteDataSize(stat.getRemoteDataSize()); + replica.setRowCount(stat.getRowCount()); + replica.setTotalVersionCount(stat.getTotalVersionCount()); + replica.setVisibleVersionCount(stat.isSetVisibleVersionCount() ? stat.getVisibleVersionCount() + : stat.getTotalVersionCount()); } } } @@ -168,7 +172,8 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { continue; } // TODO(cmy) no db lock protected. I think it is ok even we get wrong row num - replica.updateStat(entry.getValue().getDataSize(), entry.getValue().getRowNum()); + replica.setDataSize(entry.getValue().getDataSize()); + replica.setRowCount(entry.getValue().getRowCount()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 8401ec17bbd043..e676774afcf11b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -574,15 +574,15 @@ public boolean compactionRecovered() { Replica chosenReplica = null; long maxVersionCount = Integer.MIN_VALUE; for (Replica replica : tablet.getReplicas()) { - if (replica.getVersionCount() > maxVersionCount) { - maxVersionCount = replica.getVersionCount(); + if (replica.getVisibleVersionCount() > maxVersionCount) { + maxVersionCount = replica.getVisibleVersionCount(); chosenReplica = replica; } } boolean recovered = false; for (Replica replica : tablet.getReplicas()) { if (replica.isAlive() && replica.tooSlow() && (!replica.equals(chosenReplica) - || replica.getVersionCount() < Config.min_version_count_indicate_replica_compaction_too_slow)) { + || !replica.tooBigVersionCount())) { if (chosenReplica != null) { chosenReplica.setState(ReplicaState.NORMAL); recovered = true; @@ -1177,8 +1177,7 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) "replica does not exist. backend id: " + destBackendId); } - replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), - reportedTablet.getRemoteDataSize(), reportedTablet.getRowCount()); + replica.updateWithReport(reportedTablet); if (replica.getLastFailedVersion() > partition.getCommittedVersion() && reportedTablet.getVersion() >= partition.getCommittedVersion() //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() @@ -1365,8 +1364,8 @@ public String toString() { public static class VersionCountComparator implements Comparator { @Override public int compare(Replica r1, Replica r2) { - long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : r1.getVersionCount(); - long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : r2.getVersionCount(); + long verCount1 = r1.getVisibleVersionCount() == -1 ? Long.MAX_VALUE : r1.getVisibleVersionCount(); + long verCount2 = r2.getVisibleVersionCount() == -1 ? Long.MAX_VALUE : r2.getVisibleVersionCount(); if (verCount1 < verCount2) { return -1; } else if (verCount1 > verCount2) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 094beca0425302..421dd1e05e04ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1110,13 +1110,13 @@ private void handleReplicaTooSlow(TabletSchedCtx tabletCtx) throws SchedExceptio if (replica.isAlive() && !replica.tooSlow()) { normalReplicaCount++; } - if (replica.getVersionCount() > maxVersionCount) { - maxVersionCount = replica.getVersionCount(); + if (replica.getVisibleVersionCount() > maxVersionCount) { + maxVersionCount = replica.getVisibleVersionCount(); chosenReplica = replica; } } if (chosenReplica != null && chosenReplica.isAlive() && !chosenReplica.tooSlow() - && chosenReplica.getVersionCount() > Config.min_version_count_indicate_replica_compaction_too_slow + && chosenReplica.tooBigVersionCount() && normalReplicaCount - 1 >= tabletCtx.getReplicas().size() / 2 + 1) { chosenReplica.setState(ReplicaState.COMPACTION_TOO_SLOW); LOG.info("set replica id :{} tablet id: {}, backend id: {} to COMPACTION_TOO_SLOW", diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java index 9b2a8f6d8f7eee..edf2a9d35179e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java @@ -44,7 +44,7 @@ public class ReplicasProcNode implements ProcNodeInterface { .add("BackendId").add("Version").add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") .add("SchemaHash").add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State").add("IsBad") .add("IsUserDrop") - .add("VersionCount").add("PathHash").add("Path") + .add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path") .add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId") .add("CooldownMetaId").add("QueryHits").build(); @@ -117,7 +117,8 @@ public ProcResult fetchResult() { String.valueOf(replica.getState()), String.valueOf(replica.isBad()), String.valueOf(replica.isUserDrop()), - String.valueOf(replica.getVersionCount()), + String.valueOf(replica.getVisibleVersionCount()), + String.valueOf(replica.getTotalVersionCount()), String.valueOf(replica.getPathHash()), path, metaUrl, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java index ca50c4183052fc..e28c74c327eff9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java @@ -261,8 +261,7 @@ static class DBTabletStatistic { oversizeTabletIds.add(tablet.getId()); } for (Replica replica : tablet.getReplicas()) { - if (replica.getVersionCount() - > Config.min_version_count_indicate_replica_compaction_too_slow) { + if (replica.tooBigVersionCount()) { replicaCompactionTooSlowNum++; replicaCompactionTooSlowTabletIds.add(tablet.getId()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index c82a55bd3801ba..46c89eb3253e50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -52,9 +52,10 @@ public class TabletsProcDir implements ProcDirInterface { .add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime") .add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State") .add("LstConsistencyCheckTime").add("CheckVersion") - .add("VersionCount").add("QueryHits").add("PathHash").add("Path") + .add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path") .add("MetaUrl").add("CompactionStatus") - .add("CooldownReplicaId").add("CooldownMetaId").build(); + .add("CooldownReplicaId").add("CooldownMetaId") + .build(); private Table table; private MaterializedIndex index; @@ -113,7 +114,8 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(-1); // lst consistency check time tabletInfo.add(-1); // check version tabletInfo.add(-1); // check version hash - tabletInfo.add(-1); // version count + tabletInfo.add(-1); // visible version count + tabletInfo.add(-1); // total version count tabletInfo.add(0L); // query hits tabletInfo.add(-1); // path hash tabletInfo.add(FeConstants.null_string); // path @@ -147,7 +149,8 @@ public List> fetchComparableResult(long version, long backendId tabletInfo.add(TimeUtils.longToTimeString(tablet.getLastCheckTime())); tabletInfo.add(tablet.getCheckedVersion()); - tabletInfo.add(replica.getVersionCount()); + tabletInfo.add(replica.getVisibleVersionCount()); + tabletInfo.add(replica.getTotalVersionCount()); tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L)); tabletInfo.add(replica.getPathHash()); tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), "")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index c8f52d5b4b21e6..b6c0c73eae2f96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1036,7 +1036,10 @@ private void unprotectUpdateReplica(OlapTable olapTable, ReplicaPersistInfo info Tablet tablet = materializedIndex.getTablet(info.getTabletId()); Replica replica = tablet.getReplicaById(info.getReplicaId()); Preconditions.checkNotNull(replica, info); - replica.updateVersionInfo(info.getVersion(), info.getDataSize(), info.getRemoteDataSize(), info.getRowCount()); + replica.updateVersion(info.getVersion()); + replica.setDataSize(info.getDataSize()); + replica.setRemoteDataSize(info.getRemoteDataSize()); + replica.setRowCount(info.getRowCount()); replica.setBad(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java similarity index 68% rename from fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java rename to fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java index 77ed5829799363..f4bf87ad1099d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInMemoryInfoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/PartitionInfoCollector.java @@ -27,29 +27,49 @@ import org.apache.doris.common.Config; import org.apache.doris.common.util.MasterDaemon; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; -public class PartitionInMemoryInfoCollector extends MasterDaemon { +public class PartitionInfoCollector extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(PartitionInMemoryInfoCollector.class); + // notice since collect partition info every Config.partition_info_update_interval_secs seconds, + // so partition collect info may be stale + public static class PartitionCollectInfo { + private long visibleVersion; + private boolean isInMemory; - public PartitionInMemoryInfoCollector() { - super("PartitionInMemoryInfoCollector", Config.partition_in_memory_update_interval_secs * 1000); + PartitionCollectInfo(long visibleVersion, boolean isInMemory) { + this.visibleVersion = visibleVersion; + this.isInMemory = isInMemory; + } + + public long getVisibleVersion() { + return this.visibleVersion; + } + + public boolean isInMemory() { + return this.isInMemory; + } + } + + private static final Logger LOG = LogManager.getLogger(PartitionInfoCollector.class); + + public PartitionInfoCollector() { + super("PartitionInfoCollector", Config.partition_info_update_interval_secs * 1000); } @Override protected void runAfterCatalogReady() { - updatePartitionInMemoryInfo(); + updatePartitionCollectInfo(); } - private void updatePartitionInMemoryInfo() { + private void updatePartitionCollectInfo() { Env env = Env.getCurrentEnv(); TabletInvertedIndex tabletInvertedIndex = env.getTabletInvertedIndex(); - ImmutableSet.Builder builder = ImmutableSet.builder(); + ImmutableMap.Builder builder = ImmutableMap.builder(); List dbIdList = env.getInternalCatalog().getDbIds(); for (Long dbId : dbIdList) { Database db = env.getInternalCatalog().getDbNullable(dbId); @@ -70,10 +90,12 @@ private void updatePartitionInMemoryInfo() { try { OlapTable olapTable = (OlapTable) table; for (Partition partition : olapTable.getAllPartitions()) { - if (olapTable.getPartitionInfo().getIsInMemory(partition.getId())) { + boolean isInMemory = olapTable.getPartitionInfo().getIsInMemory(partition.getId()); + if (isInMemory) { partitionInMemoryCount++; - builder.add(partition.getId()); } + builder.put(partition.getId(), + new PartitionCollectInfo(partition.getVisibleVersion(), isInMemory)); } } finally { table.readUnlock(); @@ -87,9 +109,6 @@ private void updatePartitionInMemoryInfo() { LOG.warn("Update database[" + db.getFullName() + "] partition in memory info failed", e); } } - ImmutableSet partitionIdInMemorySet = builder.build(); - tabletInvertedIndex.setPartitionIdInMemorySet(partitionIdInMemorySet); + tabletInvertedIndex.setPartitionCollectInfoMap(builder.build()); } - - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 7c77c1dfad36bb..635a8bb675f249 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -71,6 +71,7 @@ import org.apache.doris.task.PushStoragePolicyTask; import org.apache.doris.task.StorageMediaMigrationTask; import org.apache.doris.task.UpdateTabletMetaInfoTask; +import org.apache.doris.task.UpdateVisibleVersionTask; import org.apache.doris.thrift.TBackend; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TMasterResult; @@ -155,6 +156,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException { Map> tasks = null; Map disks = null; Map tablets = null; + Map partitionsVersion = null; long reportVersion = -1; ReportType reportType = ReportType.UNKNOWN; @@ -180,11 +182,15 @@ public TMasterResult handleReport(TReportRequest request) throws TException { reportType = ReportType.TABLET; } + if (request.isSetPartitionsVersion()) { + partitionsVersion = request.getPartitionsVersion(); + } + if (request.isSetTabletMaxCompactionScore()) { backend.setTabletMaxCompactionScore(request.getTabletMaxCompactionScore()); } - ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, + ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, partitionsVersion, reportVersion, request.getStoragePolicy(), request.getResource(), request.getNumCores(), request.getPipelineExecutorSize()); try { @@ -231,6 +237,7 @@ private class ReportTask extends MasterTask { private Map> tasks; private Map disks; private Map tablets; + private Map partitionsVersion; private long reportVersion; private List storagePolicies; @@ -239,14 +246,15 @@ private class ReportTask extends MasterTask { private int pipelineExecutorSize; public ReportTask(long beId, Map> tasks, - Map disks, - Map tablets, long reportVersion, + Map disks, Map tablets, + Map partitionsVersion, long reportVersion, List storagePolicies, List storageResources, int cpuCores, int pipelineExecutorSize) { this.beId = beId; this.tasks = tasks; this.disks = disks; this.tablets = tablets; + this.partitionsVersion = partitionsVersion; this.reportVersion = reportVersion; this.storagePolicies = storagePolicies; this.storageResources = storageResources; @@ -273,7 +281,11 @@ protected void exec() { LOG.warn("out of date report version {} from backend[{}]. current report version[{}]", reportVersion, beId, backendReportVersion); } else { - ReportHandler.tabletReport(beId, tablets, reportVersion); + Map partitions = this.partitionsVersion; + if (partitions == null) { + partitions = Maps.newHashMap(); + } + ReportHandler.tabletReport(beId, tablets, partitions, reportVersion); } } } @@ -408,7 +420,8 @@ private static void diffResource(List storageResourcesInBe, Li } // public for fe ut - public static void tabletReport(long backendId, Map backendTablets, long backendReportVersion) { + public static void tabletReport(long backendId, Map backendTablets, + Map backendPartitionsVersion, long backendReportVersion) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); @@ -426,6 +439,9 @@ public static void tabletReport(long backendId, Map backendTablet // storage medium -> tablet id ListMultimap tabletMigrationMap = LinkedListMultimap.create(); + // partition id -> visible version + Map partitionVersionSyncMap = Maps.newConcurrentMap(); + // dbid -> txn id -> [partition info] Map> transactionsToPublish = Maps.newHashMap(); ListMultimap transactionsToClear = LinkedListMultimap.create(); @@ -439,11 +455,13 @@ public static void tabletReport(long backendId, Map backendTablet List cooldownConfToUpdate = new LinkedList<>(); // 1. do the diff. find out (intersection) / (be - meta) / (meta - be) - Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, storageMediumMap, + Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, backendPartitionsVersion, + storageMediumMap, tabletSyncMap, tabletDeleteFromMeta, tabletFoundInMeta, tabletMigrationMap, + partitionVersionSyncMap, transactionsToPublish, transactionsToClear, tabletRecoveryMap, @@ -499,6 +517,9 @@ public static void tabletReport(long backendId, Map backendTablet if (!cooldownConfToUpdate.isEmpty()) { Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate); } + if (!partitionVersionSyncMap.isEmpty()) { + handleUpdatePartitionVersion(partitionVersionSyncMap, backendId); + } final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo(); Backend reportBackend = currentSystemInfo.getBackend(backendId); @@ -659,27 +680,17 @@ private static void sync(Map backendTablets, ListMultimap t.getSchemaHash() == schemaHash).findFirst().orElse(null); + if (tabletInfo == null) { continue; } + long metaVersion = replica.getVersion(); + long backendVersion = tabletInfo.getVersion(); boolean needSync = false; if (metaVersion < backendVersion) { needSync = true; @@ -702,7 +713,7 @@ private static void sync(Map backendTablets, ListMultimap backendTablets, ListMultimap partitionVersionSyncMap, long backendId) { + AgentBatchTask batchTask = new AgentBatchTask(); + UpdateVisibleVersionTask task = new UpdateVisibleVersionTask(backendId, partitionVersionSyncMap, + System.currentTimeMillis()); + batchTask.addTask(task); + AgentTaskExecutor.submit(batchTask); + } + private static void handleRecoverTablet(ListMultimap tabletRecoveryMap, Map backendTablets, long backendId) { // print a warn log here to indicate the exceptions on the backend diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java index 2b084fcfa23431..5e7748a35247be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Diagnoser.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.common.Config; import com.google.common.collect.Lists; import org.json.simple.JSONObject; @@ -154,9 +153,9 @@ public static List> diagnoseTablet(long tabletId) { + ", and is bad: " + (replica.isBad() ? "Yes" : "No") + ", and is going to drop: " + (replica.isUserDrop() ? "Yes" : "No")); } - if (replica.getVersionCount() > Config.min_version_count_indicate_replica_compaction_too_slow) { + if (replica.tooBigVersionCount()) { compactionErr.append("Replica on backend " + replica.getBackendId() + "'s version count is too high: " - + replica.getVersionCount()); + + replica.getVisibleVersionCount()); } } results.add(Lists.newArrayList("ReplicaBackendStatus", (backendErr.length() == 0 diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index 03a82cbb56bf5a..360d24c573c9d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -48,6 +48,7 @@ import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUpdateTabletMetaInfoReq; import org.apache.doris.thrift.TUploadReq; +import org.apache.doris.thrift.TVisibleVersionReq; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; @@ -402,6 +403,15 @@ private TAgentTaskRequest toAgentTaskRequest(AgentTask task) { tAgentTaskRequest.setCleanTrashReq(request); return tAgentTaskRequest; } + case UPDATE_VISIBLE_VERSION: { + UpdateVisibleVersionTask visibleTask = (UpdateVisibleVersionTask) task; + TVisibleVersionReq request = visibleTask.toThrift(); + if (LOG.isDebugEnabled()) { + LOG.debug(request.toString()); + } + tAgentTaskRequest.setVisibleVersionReq(request); + return tAgentTaskRequest; + } default: if (LOG.isDebugEnabled()) { LOG.debug("could not find task type for task [{}]", task); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java new file mode 100644 index 00000000000000..52ed3b1c490381 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateVisibleVersionTask.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.thrift.TTaskType; +import org.apache.doris.thrift.TVisibleVersionReq; + +import java.util.Map; + +public class UpdateVisibleVersionTask extends AgentTask { + private Map partitionVisibleVersions; + + public UpdateVisibleVersionTask(long backendId, Map partitionVisibleVersions, long createTime) { + super(null, backendId, TTaskType.UPDATE_VISIBLE_VERSION, -1L, -1L, -1L, -1L, -1L, -1L, createTime); + this.partitionVisibleVersions = partitionVisibleVersions; + } + + public TVisibleVersionReq toThrift() { + TVisibleVersionReq request = new TVisibleVersionReq(); + partitionVisibleVersions.forEach((partitionId, version) -> { + request.putToPartitionVersion(partitionId, version); + }); + return request; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index efcc760f0d2605..3996664708a51f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -44,6 +44,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.InternalDatabaseUtil; import org.apache.doris.common.util.MetaLockUtils; @@ -986,7 +987,12 @@ protected List getCommittedTxnList() { } } - public void finishTransaction(long transactionId) throws UserException { + public void finishTransaction(long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { + if (DebugPointUtil.isEnable("DatabaseTransactionMgr.stop_finish_transaction")) { + return; + } + TransactionState transactionState = null; readLock(); try { @@ -1049,7 +1055,7 @@ public void finishTransaction(long transactionId) throws UserException { LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } } - updateCatalogAfterVisible(transactionState, db); + updateCatalogAfterVisible(transactionState, db, partitionVisibleVersions, backendPartitions); } finally { MetaLockUtils.writeUnlockTables(tableList); } @@ -1964,7 +1970,8 @@ private void updateCatalogAfterCommitted(TransactionState transactionState, Data } } - private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) { + private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db, + Map partitionVisibleVersions, Map> backendPartitions) { Set errorReplicaIds = transactionState.getErrorReplicas(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); List newPartitionLoadedTableIds = new ArrayList<>(); @@ -2021,7 +2028,13 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat lastFailedVersion = newCommitVersion; } } - replica.updateVersionWithFailedInfo(newVersion, lastFailedVersion, lastSuccessVersion); + replica.updateVersionWithFailed(newVersion, lastFailedVersion, lastSuccessVersion); + Set partitionIds = backendPartitions.get(replica.getBackendId()); + if (partitionIds == null) { + partitionIds = Sets.newHashSet(); + backendPartitions.put(replica.getBackendId(), partitionIds); + } + partitionIds.add(partitionId); } } } // end for indices @@ -2032,6 +2045,7 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat newPartitionLoadedTableIds.add(tableId); } partition.updateVisibleVersionAndTime(version, versionTime); + partitionVisibleVersions.put(partition.getId(), version); if (LOG.isDebugEnabled()) { LOG.debug("transaction state {} set partition {}'s version to [{}]", transactionState, partition.getId(), version); @@ -2180,7 +2194,7 @@ public void replayUpsertTransactionState(TransactionState transactionState) thro if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { updateCatalogAfterCommitted(transactionState, db, true); } else if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { - updateCatalogAfterVisible(transactionState, db); + updateCatalogAfterVisible(transactionState, db, Maps.newHashMap(), Maps.newHashMap()); } unprotectUpsertTransactionState(transactionState, true); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index dd62c02f408682..24d92849b4978c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -414,9 +414,10 @@ public boolean existCommittedTxns(Long dbId, Long tableId, Long partitionId) { * @param transactionId * @return */ - public void finishTransaction(long dbId, long transactionId) throws UserException { + public void finishTransaction(long dbId, long transactionId, Map partitionVisibleVersions, + Map> backendPartitions) throws UserException { DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); - dbTransactionMgr.finishTransaction(transactionId); + dbTransactionMgr.finishTransaction(transactionId, partitionVisibleVersions, backendPartitions); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index a6665c2e22038a..22ca57f2399cfe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -32,6 +32,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.PublishVersionTask; +import org.apache.doris.task.UpdateVisibleVersionTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TTaskType; @@ -61,14 +62,18 @@ public PublishVersionDaemon() { @Override protected void runAfterCatalogReady() { + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + try { - publishVersion(); + publishVersion(partitionVisibleVersions, backendPartitions); + sendBackendVisibleVersion(partitionVisibleVersions, backendPartitions); } catch (Throwable t) { LOG.error("errors while publish version to all backends", t); } } - private void publishVersion() { + private void publishVersion(Map partitionVisibleVersions, Map> backendPartitions) { if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) { return; } @@ -177,7 +182,7 @@ private void publishVersion() { try { // one transaction exception should not affect other transaction globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId()); + transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); } catch (Exception e) { LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); } @@ -232,4 +237,23 @@ private Map> getBaseTabletIdsForEachBe(TransactionState transact .collect(Collectors.groupingBy(p -> p.first, Collectors.mapping(p -> p.second, Collectors.toSet()))); } + + private void sendBackendVisibleVersion(Map partitionVisibleVersions, + Map> backendPartitions) { + if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty()) { + return; + } + + long createTime = System.currentTimeMillis(); + AgentBatchTask batchTask = new AgentBatchTask(); + backendPartitions.forEach((backendId, partitionIds) -> { + Map backendPartitionVersions = partitionVisibleVersions.entrySet().stream() + .filter(entry -> partitionIds.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + UpdateVisibleVersionTask task = new UpdateVisibleVersionTask(backendId, backendPartitionVersions, + createTime); + batchTask.addTask(task); + }); + AgentTaskExecutor.submit(batchTask); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index e93f524a9199da..143dd92039704d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -221,10 +221,7 @@ public void testSchemaChange1() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), - shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), - shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } @@ -301,10 +298,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), - shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), - shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index 60020d08a8c892..7a49a413750c36 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -213,8 +213,7 @@ public void testSchemaChange1() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } @@ -296,8 +295,7 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { MaterializedIndex shadowIndex = testPartition.getMaterializedIndices(IndexExtState.SHADOW).get(0); for (Tablet shadowTablet : shadowIndex.getTablets()) { for (Replica shadowReplica : shadowTablet.getReplicas()) { - shadowReplica.updateVersionInfo(testPartition.getVisibleVersion(), shadowReplica.getDataSize(), - shadowReplica.getRemoteDataSize(), shadowReplica.getRowCount()); + shadowReplica.updateVersion(testPartition.getVisibleVersion()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java index 21fe967a96dd68..54debab9a6362d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowReplicaTest.java @@ -73,7 +73,8 @@ public void testShowReplicaDistribution() throws Exception { for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) { for (Tablet tablet : index.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateStat(1024, 2); + replica.setDataSize(1024L); + replica.setRowCount(2L); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java index d6a81cdd88339f..eb7dbca0775e54 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaTest.java @@ -65,12 +65,7 @@ public void getMethodTest() { // update new version long newVersion = version + 1; - long newDataSize = dataSize + 100; - long newRowCount = rowCount + 10; - replica.updateVersionInfo(newVersion, newDataSize, 0, newRowCount); - Assert.assertEquals(newVersion, replica.getVersion()); - Assert.assertEquals(newDataSize, replica.getDataSize()); - Assert.assertEquals(newRowCount, replica.getRowCount()); + replica.updateVersion(newVersion); // check version catch up Assert.assertFalse(replica.checkVersionCatchUp(5, false)); @@ -132,14 +127,14 @@ public void testSerialization() throws Exception { public void testUpdateVersion1() { Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 3); // new version is little than original version, it is invalid the version will not update - originalReplica.updateVersionInfo(2, 100, 0, 78); + originalReplica.updateVersion(2); Assert.assertEquals(3, originalReplica.getVersion()); } @Test public void testUpdateVersion2() { Replica originalReplica = new Replica(10000, 20000, 3, 0, 100, 0, 78, ReplicaState.NORMAL, 0, 0); - originalReplica.updateVersionInfo(3, 100, 0, 78); + originalReplica.updateVersion(3); // if new version >= current version and last success version <= new version, then last success version should be updated Assert.assertEquals(3, originalReplica.getLastSuccessVersion()); Assert.assertEquals(3, originalReplica.getVersion()); @@ -155,7 +150,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update last success version 10 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 10); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); @@ -163,7 +158,7 @@ public void testUpdateVersion3() { Assert.assertEquals(8, originalReplica.getLastFailedVersion()); // update version to 8, the last success version and version should be 10 - originalReplica.updateVersionInfo(8, 100, 0, 78); + originalReplica.updateVersion(8); Assert.assertEquals(10, originalReplica.getLastSuccessVersion()); Assert.assertEquals(10, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); @@ -175,7 +170,7 @@ public void testUpdateVersion3() { Assert.assertEquals(12, originalReplica.getLastFailedVersion()); // update last success version to 15 - originalReplica.updateVersionWithFailedInfo(originalReplica.getVersion(), + originalReplica.updateVersionWithFailed(originalReplica.getVersion(), originalReplica.getLastFailedVersion(), 15); Assert.assertEquals(15, originalReplica.getLastSuccessVersion()); @@ -189,13 +184,13 @@ public void testUpdateVersion3() { Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 17 then version and success version is 17 - originalReplica.updateVersionInfo(17, 100, 0, 78); + originalReplica.updateVersion(17); Assert.assertEquals(17, originalReplica.getLastSuccessVersion()); Assert.assertEquals(17, originalReplica.getVersion()); Assert.assertEquals(18, originalReplica.getLastFailedVersion()); // update version to 18, then version and last success version should be 18 and failed version should be -1 - originalReplica.updateVersionInfo(18, 100, 0, 78); + originalReplica.updateVersion(18); Assert.assertEquals(18, originalReplica.getLastSuccessVersion()); Assert.assertEquals(18, originalReplica.getVersion()); Assert.assertEquals(-1, originalReplica.getLastFailedVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java index f563b846ed7da2..40b6683da3dc6b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java @@ -110,7 +110,8 @@ public void testDiskReblanceWhenSchedulerIdle() throws Exception { Lists.newArrayList(tablet.getReplicas()).forEach( replica -> { if (replica.getBackendId() == backends.get(1).getId()) { - replica.updateStat(totalCapacity / 4, 1); + replica.setDataSize(totalCapacity / 4); + replica.setRowCount(1); tablet.deleteReplica(replica); replica.setBackendId(backends.get(0).getId()); replica.setPathHash(diskInfo0.getPathHash()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java index 063faf2d3b2ea6..1e6af5c7324e85 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java @@ -104,7 +104,7 @@ public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex replica.setPathHash(beIds.get(i)); if (replicaSizes != null) { // for disk rebalancer, every beId corresponding to a replicaSize - replica.updateStat(replicaSizes.get(i), 0); + replica.setDataSize(replicaSizes.get(i)); } // isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex tablet.addReplica(replica, true); @@ -165,7 +165,8 @@ public static void updateReplicaDataSize(long minReplicaSize, int tableSkew, in for (Tablet tablet : idx.getTablets()) { long tabletSize = tableBaseSize * (1 + random.nextInt(tabletSkew)); for (Replica replica : tablet.getReplicas()) { - replica.updateStat(tabletSize, 1000L); + replica.setDataSize(tabletSize); + replica.setRowCount(1000L); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java index 1150b32192f341..1ac497dbebe28d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -106,7 +106,7 @@ public void testRepairLastFailedVersionByReport() throws Exception { tablets.put(tablet.getId(), tTablet); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); @@ -136,11 +136,11 @@ public void testVersionRegressive() throws Exception { Map tablets = Maps.newHashMap(); tablets.put(tablet.getId(), tTablet); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(-1L, replica.getLastFailedVersion()); DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + ReportHandler.tabletReport(replica.getBackendId(), tablets, Maps.newHashMap(), 100L); Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); @@ -160,7 +160,7 @@ private TableInfo prepareTableForTest(String tableName) throws Exception { long visibleVersion = 2L; partition.updateVisibleVersion(visibleVersion); partition.setNextVersion(visibleVersion + 1); - tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); + tablet.getReplicas().forEach(replica -> replica.updateVersion(visibleVersion)); Replica replica = tablet.getReplicas().iterator().next(); Assertions.assertEquals(visibleVersion, replica.getVersion()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index 6a38985b73f6e7..7d918ef7db54ae 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -141,7 +141,7 @@ private static void updateReplicaVersionCount() { List pathHashes = be.getDisks().values().stream() .map(DiskInfo::getPathHash).collect(Collectors.toList()); Replica replica = cell.getValue(); - replica.setVersionCount(versionCount); + replica.setVisibleVersionCount(versionCount); versionCount = versionCount + 200; replica.setPathHash(pathHashes.get(0)); @@ -171,7 +171,7 @@ public void test() throws Exception { boolean found = false; for (Table.Cell cell : replicaMetaTable.cellSet()) { Replica replica = cell.getValue(); - if (replica.getVersionCount() == 401) { + if (replica.getVisibleVersionCount() == 401) { if (replica.tooSlow()) { LOG.info("set to TOO_SLOW."); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java index d4578e17d7fb38..852f072eca1c35 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java @@ -84,19 +84,19 @@ public void testVersionCountComparator() { TabletSchedCtx.VersionCountComparator countComparator = new TabletSchedCtx.VersionCountComparator(); List replicaList = Lists.newArrayList(); Replica replica1 = new Replica(); - replica1.setVersionCount(100); + replica1.setVisibleVersionCount(100); replica1.setState(Replica.ReplicaState.NORMAL); Replica replica2 = new Replica(); - replica2.setVersionCount(50); + replica2.setVisibleVersionCount(50); replica2.setState(Replica.ReplicaState.NORMAL); Replica replica3 = new Replica(); - replica3.setVersionCount(-1); + replica3.setVisibleVersionCount(-1); replica3.setState(Replica.ReplicaState.NORMAL); Replica replica4 = new Replica(); - replica4.setVersionCount(200); + replica4.setVisibleVersionCount(200); replica4.setState(Replica.ReplicaState.NORMAL); replicaList.add(replica1); @@ -105,10 +105,10 @@ public void testVersionCountComparator() { replicaList.add(replica4); Collections.sort(replicaList, countComparator); - Assert.assertEquals(50, replicaList.get(0).getVersionCount()); - Assert.assertEquals(100, replicaList.get(1).getVersionCount()); - Assert.assertEquals(200, replicaList.get(2).getVersionCount()); - Assert.assertEquals(-1, replicaList.get(3).getVersionCount()); + Assert.assertEquals(50, replicaList.get(0).getVisibleVersionCount()); + Assert.assertEquals(100, replicaList.get(1).getVisibleVersionCount()); + Assert.assertEquals(200, replicaList.get(2).getVisibleVersionCount()); + Assert.assertEquals(-1, replicaList.get(3).getVisibleVersionCount()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 76fe82e6599a87..9ff6e7ac3ca693 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1101,7 +1101,7 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1115,7 +1115,7 @@ public void testBucketShuffleJoin() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1199,7 +1199,7 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1229,7 +1229,7 @@ public void testJoinWithMysqlTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2); } } } @@ -1249,7 +1249,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(10000); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 200000, 0, 10000); + replica.updateVersion(2); } } } @@ -1278,7 +1278,7 @@ public void testJoinWithOdbcTable() throws Exception { mIndex.setRowCount(0); for (Tablet tablet : mIndex.getTablets()) { for (Replica replica : tablet.getReplicas()) { - replica.updateVersionInfo(2, 0, 0, 0); + replica.updateVersion(2); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index ea63a5e18b10d0..437f1bcb209b4f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -42,6 +42,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; +import java.util.Set; public class DatabaseTransactionMgrTest { @@ -115,7 +116,10 @@ public Map addTransactionToTransactionMgr() throws UserException { setTransactionFinishPublish(transactionState1, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId1, + partitionVisibleVersions, backendPartitions); labelToTxnId.put(CatalogTestUtil.testTxnLabel1, transactionId1); // txn 2, 3, 4 diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 4f22d95c60bdf1..5c8f72723adacd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -53,8 +53,10 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import mockit.Injectable; import mockit.Mocked; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -65,6 +67,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; public class GlobalTransactionMgrTest { @@ -474,7 +477,10 @@ public void testFinishTransaction() throws UserException { CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId1).getErrorTablets().add(CatalogTestUtil.testTabletId1); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); transactionState = fakeEditLog.getTransaction(transactionId); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); // check replica version @@ -493,6 +499,14 @@ public void testFinishTransaction() throws UserException { } } + + Assert.assertEquals(ImmutableMap.of(testPartition.getId(), CatalogTestUtil.testStartVersion + 1), + partitionVisibleVersions); + Set partitionIds = Sets.newHashSet(testPartition.getId()); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId1)); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId2)); + Assert.assertEquals(partitionIds, backendPartitions.get(CatalogTestUtil.testBackendId3)); + // slave replay new state and compare catalog slaveTransMgr.replayUpsertTransactionState(transactionState); Assert.assertTrue(CatalogTestUtil.compareCatalog(masterEnv, slaveEnv)); @@ -535,8 +549,13 @@ public void testFinishTransactionWithOneFailed() throws UserException { // backend2 publish failed transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId2).getErrorTablets().add(CatalogTestUtil.testTabletId1); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + Map partitionVisibleVersions = Maps.newHashMap(); + Map> backendPartitions = Maps.newHashMap(); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus()); + Assert.assertTrue(partitionVisibleVersions.isEmpty()); + Assert.assertTrue(backendPartitions.isEmpty()); Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1); Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2); Replica replica3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3); @@ -554,7 +573,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { backend2SuccTablets.put(CatalogTestUtil.testTabletId1, 0L); transactionState.getPublishVersionTasks() .get(CatalogTestUtil.testBackendId2).setSuccTablets(backend2SuccTablets); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 1, replica2.getVersion()); @@ -619,7 +639,8 @@ public void testFinishTransactionWithOneFailed() throws UserException { DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2, CatalogTestUtil.testBackendId3)); - masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2); + masterTransMgr.finishTransaction(CatalogTestUtil.testDbId1, transactionId2, + partitionVisibleVersions, backendPartitions); Assert.assertEquals(TransactionStatus.VISIBLE, transactionState.getTransactionStatus()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica1.getVersion()); Assert.assertEquals(CatalogTestUtil.testStartVersion + 2, replica2.getVersion()); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 9eb4ece8d7243b..6da55c66c5a00b 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -395,6 +395,10 @@ struct TPublishVersionRequest { 4: optional set base_tablet_ids } +struct TVisibleVersionReq { + 1: required map partition_version +} + struct TClearAlterTaskRequest { 1: required Types.TTabletId tablet_id 2: required Types.TSchemaHash schema_hash @@ -496,6 +500,7 @@ struct TAgentTaskRequest { 32: optional TAlterInvertedIndexReq alter_inverted_index_req 33: optional TGcBinlogReq gc_binlog_req 34: optional TCleanTrashReq clean_trash_req + 35: optional TVisibleVersionReq visible_version_req } struct TAgentResult { diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 0a2edb8ccbf12d..12036ef93cef68 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -33,9 +33,10 @@ struct TTabletStat { 1: required i64 tablet_id // local data size 2: optional i64 data_size - 3: optional i64 row_num - 4: optional i64 version_count + 3: optional i64 row_count + 4: optional i64 total_version_count 5: optional i64 remote_data_size + 6: optional i64 visible_version_count } struct TTabletStatResult { diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 9acd3f85f7b043..7442a86a9904a1 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -33,7 +33,7 @@ struct TTabletInfo { 6: required Types.TSize data_size 7: optional Types.TStorageMedium storage_medium 8: optional list transaction_ids - 9: optional i64 version_count + 9: optional i64 total_version_count 10: optional i64 path_hash 11: optional bool version_miss 12: optional bool used @@ -46,6 +46,7 @@ struct TTabletInfo { // 18: optional bool is_cooldown 19: optional i64 cooldown_term 20: optional Types.TUniqueId cooldown_meta_id + 21: optional i64 visible_version_count } struct TFinishTaskRequest { @@ -106,6 +107,7 @@ struct TReportRequest { 10: optional list resource // only id and version 11: i32 num_cores 12: i32 pipeline_executor_size + 13: optional map partitions_version } struct TMasterResult { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 3c2fcfbd6fa5a2..3cebb5a81ad57d 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -223,7 +223,8 @@ enum TTaskType { PUSH_STORAGE_POLICY, ALTER_INVERTED_INDEX, GC_BINLOG, - CLEAN_TRASH + CLEAN_TRASH, + UPDATE_VISIBLE_VERSION } enum TStmtType { diff --git a/regression-test/data/compaction/test_compaction_with_visible_version.out b/regression-test/data/compaction/test_compaction_with_visible_version.out new file mode 100644 index 00000000000000..de90dd5fa2a3a7 --- /dev/null +++ b/regression-test/data/compaction/test_compaction_with_visible_version.out @@ -0,0 +1,448 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 + +-- !select_2 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 + +-- !select_3 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_4 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_5 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 + +-- !select_6 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 +42 420 +43 430 +44 440 +45 450 +46 460 +47 470 +48 480 +49 490 +50 500 +51 510 +52 520 +53 530 +54 540 +55 550 +56 560 +57 570 +58 580 +59 590 +60 600 +61 610 +62 620 +63 630 +64 640 +65 650 +66 660 +67 670 +68 680 +69 690 +70 700 +71 710 +72 720 +73 730 +74 740 +75 750 +76 760 +77 770 +78 780 +79 790 +80 800 +81 810 +82 820 +83 830 +84 840 +85 850 +86 860 +87 870 +88 880 +89 890 +90 900 +91 910 +92 920 +93 930 +94 940 +95 950 +96 960 +97 970 +98 980 +99 990 +100 1000 +101 1010 +102 1020 +103 1030 +104 1040 +105 1050 +106 1060 +107 1070 +108 1080 +109 1090 +110 1100 +111 1110 +112 1120 +113 1130 +114 1140 +115 1150 +116 1160 +117 1170 +118 1180 +119 1190 +120 1200 +121 1210 + +-- !select_7 -- +0 0 +1 10 +2 20 +3 30 +4 40 +5 50 +6 60 +7 70 +8 80 +9 90 +10 100 +11 110 +12 120 +13 130 +14 140 +15 150 +16 160 +17 170 +18 180 +19 190 +20 200 +21 210 +22 220 +23 230 +24 240 +25 250 +26 260 +27 270 +28 280 +29 290 +30 300 +31 310 +32 320 +33 330 +34 340 +35 350 +36 360 +37 370 +38 380 +39 390 +40 400 +41 410 +42 420 +43 430 +44 440 +45 450 +46 460 +47 470 +48 480 +49 490 +50 500 +51 510 +52 520 +53 530 +54 540 +55 550 +56 560 +57 570 +58 580 +59 590 +60 600 +61 610 +62 620 +63 630 +64 640 +65 650 +66 660 +67 670 +68 680 +69 690 +70 700 +71 710 +72 720 +73 730 +74 740 +75 750 +76 760 +77 770 +78 780 +79 790 +80 800 +81 810 +82 820 +83 830 +84 840 +85 850 +86 860 +87 870 +88 880 +89 890 +90 900 +91 910 +92 920 +93 930 +94 940 +95 950 +96 960 +97 970 +98 980 +99 990 +100 1000 +101 1010 +102 1020 +103 1030 +104 1040 +105 1050 +106 1060 +107 1070 +108 1080 +109 1090 +110 1100 +111 1110 +112 1120 +113 1130 +114 1140 +115 1150 +116 1160 +117 1170 +118 1180 +119 1190 +120 1200 +121 1210 +122 1220 +123 1230 +124 1240 +125 1250 +126 1260 +127 1270 +128 1280 +129 1290 +130 1300 +131 1310 +132 1320 +133 1330 +134 1340 +135 1350 +136 1360 +137 1370 +138 1380 +139 1390 +140 1400 +141 1410 +142 1420 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index a915851b9389e8..49bfbc18792f0c 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -34,13 +34,16 @@ class ClusterOptions { int feNum = 1 int beNum = 3 + List feConfigs = [ 'heartbeat_interval_second=5', ] List beConfigs = [ + 'report_disk_state_interval_seconds=2', 'report_random_wait=false', ] + boolean connectToFollower = false // 1. cloudMode = true, only create cloud cluster. diff --git a/regression-test/suites/compaction/test_compaction_with_visible_version.groovy b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy new file mode 100644 index 00000000000000..194a1b67566192 --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_with_visible_version.groovy @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.NodeType + +suite('test_compaction_with_visible_version') { + def options = new ClusterOptions() + def compaction_keep_invisible_version_min_count = 50L + options.feConfigs += [ + 'partition_info_update_interval_secs=5', + ] + options.beConfigs += [ + 'disable_auto_compaction=true', + 'report_tablet_interval_seconds=1', + 'tablet_rowset_stale_sweep_by_size=true', + 'tablet_rowset_stale_sweep_threshold_size=0', + 'compaction_keep_invisible_version_timeout_sec=6000', + "compaction_keep_invisible_version_min_count=${compaction_keep_invisible_version_min_count}".toString(), + 'compaction_keep_invisible_version_max_count=500', + ] + options.enableDebugPoints() + + docker(options) { + def E_CUMULATIVE_NO_SUITABLE_VERSION = 'E-2000' + def E_FULL_MISS_VERSION = 'E-2009' + + sql 'SET GLOBAL insert_visible_timeout_ms = 3000' + + def tableName = 'test_compaction_with_visible_version' + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + + def triggerCompaction = { tablet, isCompactSucc, compaction_type -> + def tabletId = tablet.TabletId + def backendId = tablet.BackendId + def backendIp = backendId_to_backendIP.get(backendId) + def backendHttpPort = backendId_to_backendHttpPort.get(backendId) + def code + def out + def err + if (compaction_type == 'base') { + (code, out, err) = be_run_base_compaction(backendIp, backendHttpPort, tabletId) + } else { + (code, out, err) = be_run_cumulative_compaction(backendIp, backendHttpPort, tabletId) + } + logger.info("Run compaction: code=${code}, out=${out}, err=${err}") + assertEquals(0, code) + def compactJson = parseJson(out.trim()) + if (isCompactSucc) { + assertEquals('success', compactJson.status.toLowerCase()) + } else { + if (compaction_type == 'base') { + assertEquals(E_FULL_MISS_VERSION, compactJson.status) + } else { + assertEquals(E_CUMULATIVE_NO_SUITABLE_VERSION, compactJson.status) + } + } + } + + def waitCompaction = { tablet, startTs -> + def tabletId = tablet.TabletId + def backendId = tablet.BackendId + def backendIp = backendId_to_backendIP.get(backendId) + def backendHttpPort = backendId_to_backendHttpPort.get(backendId) + def running = true + while (running) { + assertTrue(System.currentTimeMillis() - startTs < 60 * 1000) + Thread.sleep(1000) + def (code, out, err) = be_get_compaction_status(backendIp, backendHttpPort, tabletId) + logger.info("Get compaction: code=${code}, out=${out}, err=${err}") + assertEquals(0, code) + + def compactionStatus = parseJson(out.trim()) + assertEquals('success', compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } + } + + def checkCompact = { isCumuCompactSucc, runBaseCompact, isInvisibleTimeout, version, visibleVersion -> + def partition = sql_return_maparray("SHOW PARTITIONS FROM ${tableName}")[0] + assertEquals(visibleVersion, partition.VisibleVersion as long) + + // wait be report version count + Thread.sleep(3 * 1000) + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + def lastInvisibleVersionCountMap = [:] + def lastVisibleVersionCountMap = [:] + tablets.each { + lastVisibleVersionCountMap[it.BackendId] = it.VisibleVersionCount as long + lastInvisibleVersionCountMap[it.BackendId] = + (it.VersionCount as long) - (it.VisibleVersionCount as long) + triggerCompaction it, isCumuCompactSucc, 'cumulative' + } + + if (isCumuCompactSucc) { + // wait compaction done + def startTs = System.currentTimeMillis() + tablets.each { + waitCompaction it, startTs + } + } + + if (runBaseCompact) { + tablets.each { + triggerCompaction it, true, 'base' + } + + def startTs = System.currentTimeMillis() + tablets.each { + waitCompaction it, startTs + } + } + + // wait report + Thread.sleep(3 * 1000) + + tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + tablets.each { + def backendId = it.BackendId + def visibleVersionCount = it.VisibleVersionCount as long + def totalVersionCount = it.VersionCount as long + def invisibleVersionCount = totalVersionCount - visibleVersionCount + assertEquals(version, it.Version as long) + + if (isInvisibleTimeout) { + def values = [Math.min(version - visibleVersion, compaction_keep_invisible_version_min_count), + Math.min(version - visibleVersion, compaction_keep_invisible_version_min_count + 1)] + + // part of invisible version was compact + assertTrue(invisibleVersionCount in values, + "not match, invisibleVersionCount: ${invisibleVersionCount}, candidate values: ${values}") + } else { + // invisible version couldn't compact + assertEquals(version - visibleVersion, invisibleVersionCount) + } + + def lastVisibleVersionCount = lastVisibleVersionCountMap.get(backendId) + def lastInvisibleVersionCount = lastInvisibleVersionCountMap.get(backendId) + if (isCumuCompactSucc) { + assertTrue(lastInvisibleVersionCount > invisibleVersionCount || lastInvisibleVersionCount <= 1, + "not met with: lastInvisibleVersionCount ${lastInvisibleVersionCount} > " + + "invisibleVersionCount ${invisibleVersionCount}") + if (runBaseCompact) { + assertEquals(1L, visibleVersionCount) + } + } else { + assertEquals(lastVisibleVersionCount, visibleVersionCount) + } + } + } + + sql " CREATE TABLE ${tableName} (k1 int, k2 int) DISTRIBUTED BY HASH(k1) BUCKETS 1 " + + // normal + def rowNum = 0L + def insertNRecords = { num -> + // if enable debug point DatabaseTransactionMgr.stop_finish_transaction, + // insert will need to wait insert_visible_timeout_ms. + // so use multiple threads to reduce the wait time. + def futures = [] + for (def i = 0; i < num; i++, rowNum++) { + def index = rowNum + futures.add(thread { + sql " INSERT INTO ${tableName} VALUES (${index}, ${index * 10}) " + }) + } + futures.each { it.get() } + } + insertNRecords(21) + // after insert 21 rows, be can run compact ok. + checkCompact(true, false, false, rowNum + 1, rowNum + 1) + qt_select_1 "SELECT * FROM ${tableName} ORDER BY k1" + + // publish but not visible + def lastRowNum = rowNum + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + insertNRecords(21) + // after enable debugpoint, be will add rowsets, but visible version will not increase. + // then no rowsets can pick to compact. + // so expect compact failed. + checkCompact(false, false, false, rowNum + 1, lastRowNum + 1) + qt_select_2 "SELECT * FROM ${tableName} ORDER BY k1" + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + // after clear debug point, visible version will increase. + // then some rowsets can pick to compact. + // so expect compact succ. + checkCompact(true, true, false, rowNum + 1, rowNum + 1) + qt_select_3 "SELECT * FROM ${tableName} ORDER BY k1" + + lastRowNum = rowNum + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + insertNRecords(80) + // 80 versions are not invisible yet, BE will not compact them. + // if we send http to compact them, BE will reply no rowsets can compact now + checkCompact(false, false, false, rowNum + 1, lastRowNum + 1) + // Because BE not compact, so query should be ok. + qt_select_4 "SELECT * FROM ${tableName} ORDER BY k1" + + update_all_be_config('compaction_keep_invisible_version_timeout_sec', 1) + checkCompact(true, false, true, rowNum + 1, lastRowNum + 1) + qt_select_5 "SELECT * FROM ${tableName} ORDER BY k1" + + def getVersionCountMap = { -> + def versionCountMap = [:] + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" + tablets.each { + versionCountMap.put(it.BackendId as long, [it.VisibleVersionCount as long, it.VersionCount as long]) + } + return versionCountMap + } + + // after backend restart, it should update its visible version from FE + // and then it report its visible version count and total version count + def oldVersionCountMap = getVersionCountMap() + cluster.restartBackends() + Thread.sleep(20000) + def newVersionCountMap = getVersionCountMap() + assertEquals(oldVersionCountMap, newVersionCountMap) + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + // after clear fe's debug point, the 80 version are visible now. + // so compact is ok + checkCompact(true, false, false, rowNum + 1, rowNum + 1) + qt_select_6 "SELECT * FROM ${tableName} ORDER BY k1" + + cluster.injectDebugPoints(NodeType.FE, ['DatabaseTransactionMgr.stop_finish_transaction':null]) + def compaction_keep_invisible_version_timeout_sec = 1 + compaction_keep_invisible_version_min_count = 0L + update_all_be_config('compaction_keep_invisible_version_timeout_sec', compaction_keep_invisible_version_timeout_sec) + update_all_be_config('compaction_keep_invisible_version_min_count', compaction_keep_invisible_version_min_count) + + lastRowNum = rowNum + insertNRecords(21) + + Thread.sleep((compaction_keep_invisible_version_timeout_sec + 1) * 1000) + + // after compaction_keep_invisible_version_timeout_sec, + // all version had been compact (compaction_keep_invisible_version_min_count=0), + checkCompact(true, false, true, rowNum + 1, lastRowNum + 1) + + // visible version had been compact + test { + sql "SELECT * FROM ${tableName} ORDER BY k1" + + // E-230: + //(1105, 'errCode = 2, detailMessage = (128.2.51.2)[CANCELLED]missed_versions is empty, spec_version 43, + // max_version 123, tablet_id 10062') + exception 'missed_versions is empty' + } + + cluster.clearFrontendDebugPoints() + Thread.sleep(5000) + qt_select_7 "SELECT * FROM ${tableName} ORDER BY k1" + } +} From af890324012b266a3ae2819da1ed185700a54383 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Thu, 13 Jun 2024 14:34:27 +0800 Subject: [PATCH 2/2] fix compile fe --- .../java/org/apache/doris/catalog/Replica.java | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index a515bbd2f4dbf0..f6b2944d7541c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -255,22 +255,6 @@ public void setRowCount(long rowCount) { this.rowCount = rowCount; } - public long getSegmentCount() { - return segmentCount; - } - - public void setSegmentCount(long segmentCount) { - this.segmentCount = segmentCount; - } - - public long getRowsetCount() { - return rowsetCount; - } - - public void setRowsetCount(long rowsetCount) { - this.rowsetCount = rowsetCount; - } - public long getLastFailedVersion() { return lastFailedVersion; }