diff --git a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h index 328c9d37bf1..e346b5155f6 100644 --- a/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h +++ b/dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h @@ -274,7 +274,7 @@ struct MockRaftStoreProxy : MutexLockWrap , table_id(1) , cluster_ver(RaftstoreVer::V1) { - proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":4}})"; + proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":4},"server":{"engine-addr":"123"}})"; } LoggerPtr log; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index e895ece5d86..11f7cef5820 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1503,7 +1503,8 @@ int Server::main(const std::vector & /*args*/) S3::ClientFactory::instance().setKVCluster(tmt.getKVCluster()); } } - + LOG_INFO(log, "Init S3 GC Manager"); + global_context->getTMTContext().initS3GCManager(tiflash_instance_wrap.proxy_helper); // Initialize the thread pool of storage before the storage engine is initialized. LOG_INFO(log, "dt_enable_read_thread {}", global_context->getSettingsRef().dt_enable_read_thread); // `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`. diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 8df76b8780c..0ae98d6e362 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -73,6 +73,7 @@ KVStore::KVStore(Context & context) { LOG_WARNING(log, "JointThreadInfoJeallocMap is not inited from context"); } + fetchProxyConfig(proxy_helper); } void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper) @@ -119,6 +120,10 @@ void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy void KVStore::fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper) { + if (proxy_config_summary.valid) + { + LOG_INFO(log, "Skip duplicated parsing proxy config"); + } // Try fetch proxy's config as a json string if (proxy_helper && proxy_helper->fn_get_config_json) { @@ -133,7 +138,13 @@ void KVStore::fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper) auto ptr = obj.extract(); auto raftstore = ptr->getObject("raftstore"); proxy_config_summary.snap_handle_pool_size = raftstore->getValue("snap-handle-pool-size"); - LOG_INFO(log, "Parsed proxy config snap_handle_pool_size {}", proxy_config_summary.snap_handle_pool_size); + auto server = ptr->getObject("server"); + proxy_config_summary.engine_addr = server->getValue("engine-addr"); + LOG_INFO( + log, + "Parsed proxy config: snap_handle_pool_size={} engine_addr={}", + proxy_config_summary.snap_handle_pool_size, + proxy_config_summary.engine_addr); proxy_config_summary.valid = true; } catch (...) @@ -448,6 +459,7 @@ size_t KVStore::getOngoingPrehandleTaskCount() const { return std::max(0, ongoing_prehandle_task_count.load()); } + size_t KVStore::getOngoingPrehandleSubtaskCount() const { return std::max(0, prehandling_trace.ongoing_prehandle_subtask_count.load()); diff --git a/dbms/src/Storages/KVStore/KVStore.h b/dbms/src/Storages/KVStore/KVStore.h index 65cc0c2336f..f03a9f9e7fd 100644 --- a/dbms/src/Storages/KVStore/KVStore.h +++ b/dbms/src/Storages/KVStore/KVStore.h @@ -118,6 +118,7 @@ struct ProxyConfigSummary { bool valid = false; size_t snap_handle_pool_size = 0; + std::string engine_addr; }; /// KVStore manages raft replication and transactions. @@ -151,6 +152,7 @@ class KVStore final : private boost::noncopyable void reportThreadAllocInfo(std::string_view, ReportThreadAllocateInfoType type, uint64_t value); static void reportThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data); JointThreadInfoJeallocMapPtr getJointThreadInfoJeallocMap() const { return joint_memory_allocation_map; } + void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper); public: // Region Management void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *); @@ -280,7 +282,6 @@ class KVStore final : private boost::noncopyable }; StoreMeta & getStore(); const StoreMeta & getStore() const; - void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper); // ---- Raft Snapshot ---- // diff --git a/dbms/src/Storages/KVStore/TMTContext.cpp b/dbms/src/Storages/KVStore/TMTContext.cpp index 1ef0a03e06d..953f6024639 100644 --- a/dbms/src/Storages/KVStore/TMTContext.cpp +++ b/dbms/src/Storages/KVStore/TMTContext.cpp @@ -163,14 +163,38 @@ TMTContext::TMTContext( , wait_index_timeout_ms(DEFAULT_WAIT_INDEX_TIMEOUT_MS) , read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS) , wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC) + , raftproxy_config(raft_config) { startMonitorMPPTaskThread(mpp_task_manager); - etcd_client = Etcd::Client::create(cluster->pd_client, cluster_config); - if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled() +} + +void TMTContext::initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper) +{ + kvstore->fetchProxyConfig(proxy_helper); + if (!raftproxy_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled() && !context.getSharedContextDisagg()->isDisaggregatedComputeMode()) { - s3gc_owner = OwnerManager::createS3GCOwner(context, /*id*/ raft_config.advertise_engine_addr, etcd_client); + if (kvstore->getProxyConfigSummay().valid) + { + LOG_INFO( + Logger::get(), + "Build s3gc manager from proxy's conf engine_addr={}", + kvstore->getProxyConfigSummay().engine_addr); + s3gc_owner = OwnerManager::createS3GCOwner( + context, + /*id*/ kvstore->getProxyConfigSummay().engine_addr, + etcd_client); + } + else + { + LOG_INFO( + Logger::get(), + "Build s3gc manager from tiflash's conf engine_addr={}", + raftproxy_config.advertise_engine_addr); + s3gc_owner + = OwnerManager::createS3GCOwner(context, /*id*/ raftproxy_config.advertise_engine_addr, etcd_client); + } s3gc_owner->campaignOwner(); // start campaign s3lock_client = std::make_shared(cluster.get(), s3gc_owner); diff --git a/dbms/src/Storages/KVStore/TMTContext.h b/dbms/src/Storages/KVStore/TMTContext.h index 10a234384ed..55694ce3bc8 100644 --- a/dbms/src/Storages/KVStore/TMTContext.h +++ b/dbms/src/Storages/KVStore/TMTContext.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -42,6 +43,8 @@ using GCManagerPtr = std::shared_ptr; struct TiFlashRaftConfig; +struct TiFlashRaftProxyHelper; + // We define a shared ptr here, because TMTContext / SchemaSyncer / IndexReader all need to // `share` the resource of cluster. using KVClusterPtr = std::shared_ptr; @@ -144,6 +147,7 @@ class TMTContext : private boost::noncopyable uint64_t readIndexWorkerTick() const; Etcd::ClientPtr getEtcdClient() const { return etcd_client; } + void initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper); private: Context & context; @@ -174,6 +178,8 @@ class TMTContext : private boost::noncopyable std::atomic_uint64_t wait_index_timeout_ms; std::atomic_uint64_t read_index_worker_tick_ms; std::atomic_int64_t wait_region_ready_timeout_sec; + + TiFlashRaftConfig raftproxy_config; }; const std::string & IntoStoreStatusName(TMTContext::StoreStatus status); diff --git a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp index 322413180e6..191247c78c7 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp @@ -871,7 +871,7 @@ try { auto & ctx = TiFlashTestEnv::getGlobalContext(); proxy_instance->cluster_ver = RaftstoreVer::V2; - proxy_instance->proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":3}})"; + proxy_instance->proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":3},"server":{"engine-addr":"123"}})"; KVStore & kvs = getKVS(); kvs.fetchProxyConfig(proxy_helper.get()); ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr);