Skip to content

Commit

Permalink
KVStore: Get addressed engine-addr from proxy (#9174)
Browse files Browse the repository at this point in the history
close #8382

Signed-off-by: CalvinNeo <calvinneo1995@gmail.com>
  • Loading branch information
CalvinNeo authored Jul 3, 2024
1 parent 3adcf7d commit 20d4616
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 8 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockKVStore/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ struct MockRaftStoreProxy : MutexLockWrap
, table_id(1)
, cluster_ver(RaftstoreVer::V1)
{
proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":4}})";
proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":4},"server":{"engine-addr":"123"}})";
}

LoggerPtr log;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
S3::ClientFactory::instance().setKVCluster(tmt.getKVCluster());
}
}

LOG_INFO(log, "Init S3 GC Manager");
global_context->getTMTContext().initS3GCManager(tiflash_instance_wrap.proxy_helper);
// Initialize the thread pool of storage before the storage engine is initialized.
LOG_INFO(log, "dt_enable_read_thread {}", global_context->getSettingsRef().dt_enable_read_thread);
// `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`.
Expand Down
14 changes: 13 additions & 1 deletion dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ KVStore::KVStore(Context & context)
{
LOG_WARNING(log, "JointThreadInfoJeallocMap is not inited from context");
}
fetchProxyConfig(proxy_helper);
}

void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy_helper)
Expand Down Expand Up @@ -119,6 +120,10 @@ void KVStore::restore(PathPool & path_pool, const TiFlashRaftProxyHelper * proxy

void KVStore::fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper)
{
if (proxy_config_summary.valid)
{
LOG_INFO(log, "Skip duplicated parsing proxy config");
}
// Try fetch proxy's config as a json string
if (proxy_helper && proxy_helper->fn_get_config_json)
{
Expand All @@ -133,7 +138,13 @@ void KVStore::fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper)
auto ptr = obj.extract<Poco::JSON::Object::Ptr>();
auto raftstore = ptr->getObject("raftstore");
proxy_config_summary.snap_handle_pool_size = raftstore->getValue<uint64_t>("snap-handle-pool-size");
LOG_INFO(log, "Parsed proxy config snap_handle_pool_size {}", proxy_config_summary.snap_handle_pool_size);
auto server = ptr->getObject("server");
proxy_config_summary.engine_addr = server->getValue<std::string>("engine-addr");
LOG_INFO(
log,
"Parsed proxy config: snap_handle_pool_size={} engine_addr={}",
proxy_config_summary.snap_handle_pool_size,
proxy_config_summary.engine_addr);
proxy_config_summary.valid = true;
}
catch (...)
Expand Down Expand Up @@ -448,6 +459,7 @@ size_t KVStore::getOngoingPrehandleTaskCount() const
{
return std::max(0, ongoing_prehandle_task_count.load());
}

size_t KVStore::getOngoingPrehandleSubtaskCount() const
{
return std::max(0, prehandling_trace.ongoing_prehandle_subtask_count.load());
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/KVStore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ struct ProxyConfigSummary
{
bool valid = false;
size_t snap_handle_pool_size = 0;
std::string engine_addr;
};

/// KVStore manages raft replication and transactions.
Expand Down Expand Up @@ -151,6 +152,7 @@ class KVStore final : private boost::noncopyable
void reportThreadAllocInfo(std::string_view, ReportThreadAllocateInfoType type, uint64_t value);
static void reportThreadAllocBatch(std::string_view, ReportThreadAllocateInfoBatch data);
JointThreadInfoJeallocMapPtr getJointThreadInfoJeallocMap() const { return joint_memory_allocation_map; }
void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper);

public: // Region Management
void restore(PathPool & path_pool, const TiFlashRaftProxyHelper *);
Expand Down Expand Up @@ -280,7 +282,6 @@ class KVStore final : private boost::noncopyable
};
StoreMeta & getStore();
const StoreMeta & getStore() const;
void fetchProxyConfig(const TiFlashRaftProxyHelper * proxy_helper);

// ---- Raft Snapshot ---- //

Expand Down
30 changes: 27 additions & 3 deletions dbms/src/Storages/KVStore/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,38 @@ TMTContext::TMTContext(
, wait_index_timeout_ms(DEFAULT_WAIT_INDEX_TIMEOUT_MS)
, read_index_worker_tick_ms(DEFAULT_READ_INDEX_WORKER_TICK_MS)
, wait_region_ready_timeout_sec(DEFAULT_WAIT_REGION_READY_TIMEOUT_SEC)
, raftproxy_config(raft_config)
{
startMonitorMPPTaskThread(mpp_task_manager);

etcd_client = Etcd::Client::create(cluster->pd_client, cluster_config);
if (!raft_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled()
}

void TMTContext::initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper)
{
kvstore->fetchProxyConfig(proxy_helper);
if (!raftproxy_config.pd_addrs.empty() && S3::ClientFactory::instance().isEnabled()
&& !context.getSharedContextDisagg()->isDisaggregatedComputeMode())
{
s3gc_owner = OwnerManager::createS3GCOwner(context, /*id*/ raft_config.advertise_engine_addr, etcd_client);
if (kvstore->getProxyConfigSummay().valid)
{
LOG_INFO(
Logger::get(),
"Build s3gc manager from proxy's conf engine_addr={}",
kvstore->getProxyConfigSummay().engine_addr);
s3gc_owner = OwnerManager::createS3GCOwner(
context,
/*id*/ kvstore->getProxyConfigSummay().engine_addr,
etcd_client);
}
else
{
LOG_INFO(
Logger::get(),
"Build s3gc manager from tiflash's conf engine_addr={}",
raftproxy_config.advertise_engine_addr);
s3gc_owner
= OwnerManager::createS3GCOwner(context, /*id*/ raftproxy_config.advertise_engine_addr, etcd_client);
}
s3gc_owner->campaignOwner(); // start campaign
s3lock_client = std::make_shared<S3::S3LockClient>(cluster.get(), s3gc_owner);

Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/KVStore/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Interpreters/Context_fwd.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Server/RaftConfigParser.h>
#include <Storages/GCManager.h>
#include <Storages/KVStore/Decode/RegionTable.h>
#include <Storages/KVStore/StorageEngineType.h>
Expand All @@ -42,6 +43,8 @@ using GCManagerPtr = std::shared_ptr<GCManager>;

struct TiFlashRaftConfig;

struct TiFlashRaftProxyHelper;

// We define a shared ptr here, because TMTContext / SchemaSyncer / IndexReader all need to
// `share` the resource of cluster.
using KVClusterPtr = std::shared_ptr<pingcap::kv::Cluster>;
Expand Down Expand Up @@ -144,6 +147,7 @@ class TMTContext : private boost::noncopyable
uint64_t readIndexWorkerTick() const;

Etcd::ClientPtr getEtcdClient() const { return etcd_client; }
void initS3GCManager(const TiFlashRaftProxyHelper * proxy_helper);

private:
Context & context;
Expand Down Expand Up @@ -174,6 +178,8 @@ class TMTContext : private boost::noncopyable
std::atomic_uint64_t wait_index_timeout_ms;
std::atomic_uint64_t read_index_worker_tick_ms;
std::atomic_int64_t wait_region_ready_timeout_sec;

TiFlashRaftConfig raftproxy_config;
};

const std::string & IntoStoreStatusName(TMTContext::StoreStatus status);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ try
{
auto & ctx = TiFlashTestEnv::getGlobalContext();
proxy_instance->cluster_ver = RaftstoreVer::V2;
proxy_instance->proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":3}})";
proxy_instance->proxy_config_string = R"({"raftstore":{"snap-handle-pool-size":3},"server":{"engine-addr":"123"}})";
KVStore & kvs = getKVS();
kvs.fetchProxyConfig(proxy_helper.get());
ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr);
Expand Down

0 comments on commit 20d4616

Please sign in to comment.